This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new b3128e0  SAMZA-2627: Make StreamAppender extensible for sending 
messages to SystemProducer (#1474)
b3128e0 is described below

commit b3128e0bca7877c3c423d5b0d639803e7f837b13
Author: lakshmi-manasa-g <[email protected]>
AuthorDate: Fri Mar 12 04:55:43 2021 -0800

    SAMZA-2627: Make StreamAppender extensible for sending messages to 
SystemProducer (#1474)
    
    Issue: StreamAppender sets both partition key and record key = container 
name for OutgoingMessageEnvelope sent to underlying SystemProducer. This 
restricts how the classes extending StreamAppender send to SystemProducer.
    
    Change: Introduce method to decorate the serializedLogEvent that can be 
overridden by extenders of StreamAppender
---
 .../org/apache/samza/logging/log4j2/StreamAppender.java     | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git 
a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
 
b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
index cadf63c..2b03c31 100644
--- 
a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
+++ 
b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
@@ -439,10 +439,19 @@ public class StreamAppender extends AbstractAppender {
    * Helper method to send a serialized log-event to the systemProducer, and 
increment respective methods.
    * @param serializedLogEvent
    */
-  protected void sendEventToSystemProducer(byte[] serializedLogEvent) {
+  private void sendEventToSystemProducer(byte[] serializedLogEvent) {
     metrics.logMessagesBytesSent.inc(serializedLogEvent.length);
     metrics.logMessagesCountSent.inc();
-    systemProducer.send(SOURCE, new OutgoingMessageEnvelope(systemStream, 
keyBytes, serializedLogEvent));
+    systemProducer.send(SOURCE, decorateLogEvent(serializedLogEvent));
+  }
+
+  /**
+   * Helper method to create an OutgoingMessageEnvelope from the serialized 
log event.
+   * @param messageBytes message bytes
+   * @return OutgoingMessageEnvelope that contains the message bytes along 
with the system stream
+   */
+  protected OutgoingMessageEnvelope decorateLogEvent(byte[] messageBytes) {
+    return new OutgoingMessageEnvelope(systemStream, keyBytes, messageBytes);
   }
 
   protected String getStreamName(String jobName, String jobId) {

Reply via email to