lakshmi-manasa-g commented on a change in pull request #1474:
URL: https://github.com/apache/samza/pull/1474#discussion_r592621345
##########
File path:
samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -75,16 +75,16 @@
private static final String JAVA_OPTS_CONTAINER_NAME =
"samza.container.name";
private static final String JOB_COORDINATOR_TAG = "samza-job-coordinator";
- private static final String SOURCE = "log4j-log";
+ protected static final String SOURCE = "log4j-log";
// Hidden config for now. Will move to appropriate Config class when ready
to.
private static final String CREATE_STREAM_ENABLED =
"task.log4j.create.stream.enabled";
private static final long DEFAULT_QUEUE_TIMEOUT_S = 2; // Abitrary choice
private final BlockingQueue<byte[]> logQueue = new
LinkedBlockingQueue<>(DEFAULT_QUEUE_SIZE);
- private SystemStream systemStream = null;
- private SystemProducer systemProducer = null;
+ protected SystemStream systemStream = null;
+ protected SystemProducer systemProducer = null;
Review comment:
thanks for the detailed explanation.
the method that was exposed in previous pr #1470 is
`protected void sendEventToSystemProducer(byte[] serializedLogEvent) {
metrics.logMessagesBytesSent.inc(serializedLogEvent.length);
metrics.logMessagesCountSent.inc();
systemProducer.send(SOURCE, new OutgoingMessageEnvelope(systemStream,
keyBytes, serializedLogEvent));
}`
the functionality that needs to be overriden is
`systemProducer.send(SOURCE, new OutgoingMessageEnvelope(systemStream,
keyBytes, serializedLogEvent));`
to be able to override this to have OutgoingMessageEnvelope have no
partition key and no record key, the overrding statement will look like
`systemProducer.send(SOURCE, new OutgoingMessageEnvelope(systemStream, null,
null, serializedLogEvent));`
and to achieve this all 3 attributes systemproducer, source and systemstream
are needed.
Please let me know if there is a better way to achieve no partition and no
record key in the OME.
##########
File path:
samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -75,16 +75,16 @@
private static final String JAVA_OPTS_CONTAINER_NAME =
"samza.container.name";
private static final String JOB_COORDINATOR_TAG = "samza-job-coordinator";
- private static final String SOURCE = "log4j-log";
+ protected static final String SOURCE = "log4j-log";
// Hidden config for now. Will move to appropriate Config class when ready
to.
private static final String CREATE_STREAM_ENABLED =
"task.log4j.create.stream.enabled";
private static final long DEFAULT_QUEUE_TIMEOUT_S = 2; // Abitrary choice
private final BlockingQueue<byte[]> logQueue = new
LinkedBlockingQueue<>(DEFAULT_QUEUE_SIZE);
- private SystemStream systemStream = null;
- private SystemProducer systemProducer = null;
+ protected SystemStream systemStream = null;
+ protected SystemProducer systemProducer = null;
Review comment:
a subclass can opt to not have partition key and record key when sending
out the message to the underlying system producer based on how the underlying
logging system distributes the messages across partitions.
the #1470 pr exposed the method sending to the systemproducer.
for example: if the partition key and record key are null in the
OutgoingMessageEnvelope, kafka will use round robin instead of key hash to find
the partition. this will prevent partitions with TB of data while rest of
partitions within same topic are empty due to the hash.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]