lakshmi-manasa-g commented on a change in pull request #1532:
URL: https://github.com/apache/samza/pull/1532#discussion_r714210883
##########
File path:
samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -89,38 +78,40 @@
private byte[] keyBytes; // Serialize the key once, since we will use it for
every event.
private String containerName = null;
private int partitionCount = 0;
- private boolean isApplicationMaster;
private Serde<LogEvent> serde = null;
- private Thread transferThread;
+ private volatile Thread transferThread;
private Config config = null;
private String streamName = null;
private final boolean usingAsyncLogger;
+ private final LoggingContextHolder loggingContextHolder;
/**
* used to detect if this thread is called recursively
*/
private final AtomicBoolean recursiveCall = new AtomicBoolean(false);
protected static final int DEFAULT_QUEUE_SIZE = 100;
- protected static volatile boolean systemInitialized = false;
+ protected volatile boolean systemInitialized = false;
protected StreamAppenderMetrics metrics;
protected long queueTimeoutS = DEFAULT_QUEUE_TIMEOUT_S;
+ /**
+ * Constructor is protected so that this class can be extended.
+ */
protected StreamAppender(String name, Filter filter, Layout<? extends
Serializable> layout, boolean ignoreExceptions,
- boolean usingAsyncLogger, String streamName) {
+ boolean usingAsyncLogger, String streamName, LoggingContextHolder
loggingContextHolder) {
Review comment:
ah i see. thanks for explaining why not log4j and the need for this
extra argument.
however, we are making a breaking change (needing all extended classes to
update) for making testing easier.
i feel we could add another constructor for testing and avoid breaking
changes.
##########
File path:
samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -490,6 +455,17 @@ protected void setSerde(Log4jSystemConfig
log4jSystemConfig, String systemName)
}
}
+ /**
+ * If the partition count was explicitly specified, then use that.
Otherwise, use the container count as the partition
+ * count.
+ */
+ private int calculateStreamPartitionCount(Config config) {
Review comment:
since this is also a breaking change, i want to dig a bit more into this.
so earlier `getPartitioncount` used getConfig() which fetched the config
when config became available (different conditions for AM vs container). So
even with this earlier flow, if getPartitionCount was ever called before config
became available then there would be a problem. caller of getPartitionCount has
already ensured that this call is made only after config becomes available.
i am seeing LoggingContextHolder as a way to avoid the separate checks for
AM and container and even avoid calling them isAM and make it cleaner. it is
not going to make config available sooner than before. given this, its more a
refactor for cleaner and readable code than logic changes. Hence, i am
uncomfortable making breaking changes and would like to avoid if possible.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]