This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new b3128e0 SAMZA-2627: Make StreamAppender extensible for sending
messages to SystemProducer (#1474)
b3128e0 is described below
commit b3128e0bca7877c3c423d5b0d639803e7f837b13
Author: lakshmi-manasa-g <[email protected]>
AuthorDate: Fri Mar 12 04:55:43 2021 -0800
SAMZA-2627: Make StreamAppender extensible for sending messages to
SystemProducer (#1474)
Issue: StreamAppender sets both partition key and record key = container
name for OutgoingMessageEnvelope sent to underlying SystemProducer. This
restricts how the classes extending StreamAppender send to SystemProducer.
Change: Introduce method to decorate the serializedLogEvent that can be
overridden by extenders of StreamAppender
---
.../org/apache/samza/logging/log4j2/StreamAppender.java | 13 +++++++++++--
1 file changed, 11 insertions(+), 2 deletions(-)
diff --git
a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
index cadf63c..2b03c31 100644
---
a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
+++
b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
@@ -439,10 +439,19 @@ public class StreamAppender extends AbstractAppender {
* Helper method to send a serialized log-event to the systemProducer, and
increment respective methods.
* @param serializedLogEvent
*/
- protected void sendEventToSystemProducer(byte[] serializedLogEvent) {
+ private void sendEventToSystemProducer(byte[] serializedLogEvent) {
metrics.logMessagesBytesSent.inc(serializedLogEvent.length);
metrics.logMessagesCountSent.inc();
- systemProducer.send(SOURCE, new OutgoingMessageEnvelope(systemStream,
keyBytes, serializedLogEvent));
+ systemProducer.send(SOURCE, decorateLogEvent(serializedLogEvent));
+ }
+
+ /**
+ * Helper method to create an OutgoingMessageEnvelope from the serialized
log event.
+ * @param messageBytes message bytes
+ * @return OutgoingMessageEnvelope that contains the message bytes along
with the system stream
+ */
+ protected OutgoingMessageEnvelope decorateLogEvent(byte[] messageBytes) {
+ return new OutgoingMessageEnvelope(systemStream, keyBytes, messageBytes);
}
protected String getStreamName(String jobName, String jobId) {