cameronlee314 commented on a change in pull request #1532:
URL: https://github.com/apache/samza/pull/1532#discussion_r714328789



##########
File path: 
samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -89,38 +78,40 @@
   private byte[] keyBytes; // Serialize the key once, since we will use it for 
every event.
   private String containerName = null;
   private int partitionCount = 0;
-  private boolean isApplicationMaster;
   private Serde<LogEvent> serde = null;
 
-  private Thread transferThread;
+  private volatile Thread transferThread;
   private Config config = null;
   private String streamName = null;
   private final boolean usingAsyncLogger;
+  private final LoggingContextHolder loggingContextHolder;
 
   /**
    * used to detect if this thread is called recursively
    */
   private final AtomicBoolean recursiveCall = new AtomicBoolean(false);
 
   protected static final int DEFAULT_QUEUE_SIZE = 100;
-  protected static volatile boolean systemInitialized = false;
+  protected volatile boolean systemInitialized = false;
   protected StreamAppenderMetrics metrics;
   protected long queueTimeoutS = DEFAULT_QUEUE_TIMEOUT_S;
 
+  /**
+   * Constructor is protected so that this class can be extended.
+   */
   protected StreamAppender(String name, Filter filter, Layout<? extends 
Serializable> layout, boolean ignoreExceptions,
-      boolean usingAsyncLogger, String streamName) {
+      boolean usingAsyncLogger, String streamName, LoggingContextHolder 
loggingContextHolder) {

Review comment:
       Good point. I was thinking that unit tests would need to be updated for 
extenders of this class, so might as well require extenders to update the 
constructor too, but it makes more sense to leave a backwards compat 
constructor. I will update this.

##########
File path: 
samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -490,6 +455,17 @@ protected void setSerde(Log4jSystemConfig 
log4jSystemConfig, String systemName)
     }
   }
 
+  /**
+   * If the partition count was explicitly specified, then use that. 
Otherwise, use the container count as the partition
+   * count.
+   */
+  private int calculateStreamPartitionCount(Config config) {

Review comment:
       Good point that callers of `getPartitionCount` already needed to make 
sure config was available. I will change this back.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to