lakshmi-manasa-g commented on a change in pull request #1532:
URL: https://github.com/apache/samza/pull/1532#discussion_r714210883



##########
File path: 
samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -89,38 +78,40 @@
   private byte[] keyBytes; // Serialize the key once, since we will use it for 
every event.
   private String containerName = null;
   private int partitionCount = 0;
-  private boolean isApplicationMaster;
   private Serde<LogEvent> serde = null;
 
-  private Thread transferThread;
+  private volatile Thread transferThread;
   private Config config = null;
   private String streamName = null;
   private final boolean usingAsyncLogger;
+  private final LoggingContextHolder loggingContextHolder;
 
   /**
    * used to detect if this thread is called recursively
    */
   private final AtomicBoolean recursiveCall = new AtomicBoolean(false);
 
   protected static final int DEFAULT_QUEUE_SIZE = 100;
-  protected static volatile boolean systemInitialized = false;
+  protected volatile boolean systemInitialized = false;
   protected StreamAppenderMetrics metrics;
   protected long queueTimeoutS = DEFAULT_QUEUE_TIMEOUT_S;
 
+  /**
+   * Constructor is protected so that this class can be extended.
+   */
   protected StreamAppender(String name, Filter filter, Layout<? extends 
Serializable> layout, boolean ignoreExceptions,
-      boolean usingAsyncLogger, String streamName) {
+      boolean usingAsyncLogger, String streamName, LoggingContextHolder 
loggingContextHolder) {

Review comment:
       ah i see. thanks for explaining why not log4j and the need for this 
extra argument.
   however, we are making a breaking change (needing all extended classes to 
update) for making testing easier. 
   i feel we could add another constructor for testing and avoid breaking 
changes.

##########
File path: 
samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -490,6 +455,17 @@ protected void setSerde(Log4jSystemConfig 
log4jSystemConfig, String systemName)
     }
   }
 
+  /**
+   * If the partition count was explicitly specified, then use that. 
Otherwise, use the container count as the partition
+   * count.
+   */
+  private int calculateStreamPartitionCount(Config config) {

Review comment:
       since this is also a breaking change, i want to dig a bit more into this.
   so earlier `getPartitioncount` used getConfig() which fetched the config 
when config became available (different conditions for AM vs container). So 
even with this earlier flow, if getPartitionCount was ever called before config 
became available then there would be a problem. caller of getPartitionCount has 
already ensured that this call is made only after config becomes available. 
   
   i am seeing LoggingContextHolder as a way to avoid the separate checks for 
AM and container and even avoid calling them isAM and make it cleaner. it is 
not going to make config available sooner than before. given this, its more a 
refactor for cleaner and readable code than logic changes. Hence, i am 
uncomfortable making breaking changes and would like to avoid if possible. 
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to