Make KafkaAppender support SerializedLayout (LOG4J2-1195)

Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo
Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit/084db0e7
Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/084db0e7
Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/084db0e7

Branch: refs/heads/LOG4J2-89
Commit: 084db0e76dce868ce7731f467bcc696a2a462ad4
Parents: 86ebc22
Author: Mikael StÃ¥ldal <[email protected]>
Authored: Thu Nov 12 11:01:58 2015 +0100
Committer: Ralph Goers <[email protected]>
Committed: Fri Nov 20 17:40:13 2015 -0700

----------------------------------------------------------------------
 .../core/appender/mom/kafka/KafkaAppender.java  | 24 +++++++++---
 .../appender/mom/kafka/KafkaAppenderTest.java   | 39 ++++++++++++++++----
 .../src/test/resources/KafkaAppenderTest.xml    |  5 +++
 src/changes/changes.xml                         |  3 ++
 4 files changed, 57 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/084db0e7/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
----------------------------------------------------------------------
diff --git 
a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
 
b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
index 633faf3..143ff33 100644
--- 
a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
+++ 
b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
@@ -17,9 +17,6 @@
 
 package org.apache.logging.log4j.core.appender.mom.kafka;
 
-import java.io.Serializable;
-import java.nio.charset.StandardCharsets;
-
 import org.apache.logging.log4j.core.Filter;
 import org.apache.logging.log4j.core.Layout;
 import org.apache.logging.log4j.core.LogEvent;
@@ -31,8 +28,12 @@ import 
org.apache.logging.log4j.core.config.plugins.PluginAttribute;
 import org.apache.logging.log4j.core.config.plugins.PluginElement;
 import org.apache.logging.log4j.core.config.plugins.PluginFactory;
 import 
org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
+import org.apache.logging.log4j.core.layout.SerializedLayout;
 import org.apache.logging.log4j.core.util.StringEncoder;
 
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+
 /**
  * Sends log events to an Apache Kafka topic.
  */
@@ -68,11 +69,22 @@ public final class KafkaAppender extends AbstractAppender {
             LOGGER.warn("Recursive logging from [{}] for appender [{}].", 
event.getLoggerName(), getName());
         } else {
             try {
-                if (getLayout() != null) {
-                    manager.send(getLayout().toByteArray(event));
+                Layout<? extends Serializable> layout = getLayout();
+                byte[] data;
+                if (layout != null) {
+                    if (layout instanceof SerializedLayout) {
+                        byte[] header = layout.getHeader();
+                        byte[] body = layout.toByteArray(event);
+                        data = new byte[header.length + body.length];
+                        System.arraycopy(header, 0, data, 0, header.length);
+                        System.arraycopy(body, 0, data, header.length, 
body.length);
+                    } else {
+                        data = layout.toByteArray(event);
+                    }
                 } else {
-                    
manager.send(StringEncoder.toBytes(event.getMessage().getFormattedMessage(), 
StandardCharsets.UTF_8));
+                    data = 
StringEncoder.toBytes(event.getMessage().getFormattedMessage(), 
StandardCharsets.UTF_8);
                 }
+                manager.send(data);
             } catch (final Exception e) {
                 LOGGER.error("Unable to write to Kafka [{}] for appender 
[{}].", manager.getName(), getName(), e);
                 throw new AppenderLoggingException("Unable to write to Kafka 
in appender: " + e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/084db0e7/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java
----------------------------------------------------------------------
diff --git 
a/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java
 
b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java
index d9544e3..2c0070c 100644
--- 
a/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java
+++ 
b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java
@@ -17,19 +17,12 @@
 
 package org.apache.logging.log4j.core.appender.mom.kafka;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.Properties;
-
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.LogEvent;
 import org.apache.logging.log4j.core.impl.Log4jLogEvent;
 import org.apache.logging.log4j.junit.LoggerContextRule;
 import org.apache.logging.log4j.message.SimpleMessage;
@@ -38,6 +31,16 @@ import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Properties;
+
+import static org.junit.Assert.*;
+
 public class KafkaAppenderTest {
 
     private static final MockProducer kafka = new MockProducer();
@@ -98,4 +101,24 @@ public class KafkaAppenderTest {
         assertEquals("[" + LOG_MESSAGE + "]", new String(item.value(), 
StandardCharsets.UTF_8));
     }
 
+    @Test
+    public void testAppendWithSerializedLayout() throws Exception {
+        final Appender appender = 
ctx.getRequiredAppender("KafkaAppenderWithSerializedLayout");
+        LogEvent logEvent = createLogEvent();
+        appender.append(logEvent);
+        final List<ProducerRecord<byte[], byte[]>> history = kafka.history();
+        assertEquals(1, history.size());
+        final ProducerRecord<byte[], byte[]> item = history.get(0);
+        assertNotNull(item);
+        assertEquals(TOPIC_NAME, item.topic());
+        assertNull(item.key());
+        assertEquals(LOG_MESSAGE, 
deserializeLogEvent(item.value()).getMessage().getFormattedMessage());
+    }
+
+    private LogEvent deserializeLogEvent(byte[] data) throws IOException, 
ClassNotFoundException {
+        ByteArrayInputStream bis = new ByteArrayInputStream(data);
+        try (ObjectInput ois = new ObjectInputStream(bis)) {
+            return (LogEvent) ois.readObject();
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/084db0e7/log4j-core/src/test/resources/KafkaAppenderTest.xml
----------------------------------------------------------------------
diff --git a/log4j-core/src/test/resources/KafkaAppenderTest.xml 
b/log4j-core/src/test/resources/KafkaAppenderTest.xml
index 758c426..c38196b 100644
--- a/log4j-core/src/test/resources/KafkaAppenderTest.xml
+++ b/log4j-core/src/test/resources/KafkaAppenderTest.xml
@@ -24,11 +24,16 @@
       <PatternLayout pattern="[%m]"/>
       <Property name="bootstrap.servers">localhost:9092</Property>
     </Kafka>
+  <Kafka name="KafkaAppenderWithSerializedLayout" topic="kafka-topic">
+    <SerializedLayout/>
+    <Property name="bootstrap.servers">localhost:9092</Property>
+  </Kafka>
   </Appenders>
   <Loggers>
     <Root level="info">
       <AppenderRef ref="KafkaAppender"/>
       <AppenderRef ref="KafkaAppenderWithLayout"/>
+      <AppenderRef ref="KafkaAppenderWithSerializedLayout"/>
     </Root>
   </Loggers>
 </Configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/084db0e7/src/changes/changes.xml
----------------------------------------------------------------------
diff --git a/src/changes/changes.xml b/src/changes/changes.xml
index 8fafc18..b6ca66f 100644
--- a/src/changes/changes.xml
+++ b/src/changes/changes.xml
@@ -24,6 +24,9 @@
   </properties>
   <body>
     <release version="2.5" date="2015-MM-DD" description="GA Release 2.5">
+      <action issue="LOG4J2-1195" dev="mikes" type="fix" due-to="Melvin Du">
+        Make KafkaAppender support SerializedLayout.
+      </action>
       <action issue="LOG4J2-89" dev="rgoers" type="add">
         Allow rollover to occur at any time. Add CronTriggeringPolicy
       </action>

Reply via email to