Make KafkaAppender support SerializedLayout (LOG4J2-1195)
Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit/084db0e7 Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/084db0e7 Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/084db0e7 Branch: refs/heads/LOG4J2-89 Commit: 084db0e76dce868ce7731f467bcc696a2a462ad4 Parents: 86ebc22 Author: Mikael Ståldal <[email protected]> Authored: Thu Nov 12 11:01:58 2015 +0100 Committer: Ralph Goers <[email protected]> Committed: Fri Nov 20 17:40:13 2015 -0700 ---------------------------------------------------------------------- .../core/appender/mom/kafka/KafkaAppender.java | 24 +++++++++--- .../appender/mom/kafka/KafkaAppenderTest.java | 39 ++++++++++++++++---- .../src/test/resources/KafkaAppenderTest.xml | 5 +++ src/changes/changes.xml | 3 ++ 4 files changed, 57 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/084db0e7/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java ---------------------------------------------------------------------- diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java index 633faf3..143ff33 100644 --- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java @@ -17,9 +17,6 @@ package org.apache.logging.log4j.core.appender.mom.kafka; -import java.io.Serializable; -import java.nio.charset.StandardCharsets; - import org.apache.logging.log4j.core.Filter; import org.apache.logging.log4j.core.Layout; import org.apache.logging.log4j.core.LogEvent; @@ -31,8 +28,12 @@ import org.apache.logging.log4j.core.config.plugins.PluginAttribute; import org.apache.logging.log4j.core.config.plugins.PluginElement; import org.apache.logging.log4j.core.config.plugins.PluginFactory; import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required; +import org.apache.logging.log4j.core.layout.SerializedLayout; import org.apache.logging.log4j.core.util.StringEncoder; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; + /** * Sends log events to an Apache Kafka topic. */ @@ -68,11 +69,22 @@ public final class KafkaAppender extends AbstractAppender { LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName()); } else { try { - if (getLayout() != null) { - manager.send(getLayout().toByteArray(event)); + Layout<? extends Serializable> layout = getLayout(); + byte[] data; + if (layout != null) { + if (layout instanceof SerializedLayout) { + byte[] header = layout.getHeader(); + byte[] body = layout.toByteArray(event); + data = new byte[header.length + body.length]; + System.arraycopy(header, 0, data, 0, header.length); + System.arraycopy(body, 0, data, header.length, body.length); + } else { + data = layout.toByteArray(event); + } } else { - manager.send(StringEncoder.toBytes(event.getMessage().getFormattedMessage(), StandardCharsets.UTF_8)); + data = StringEncoder.toBytes(event.getMessage().getFormattedMessage(), StandardCharsets.UTF_8); } + manager.send(data); } catch (final Exception e) { LOGGER.error("Unable to write to Kafka [{}] for appender [{}].", manager.getName(), getName(), e); throw new AppenderLoggingException("Unable to write to Kafka in appender: " + e.getMessage(), e); http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/084db0e7/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java ---------------------------------------------------------------------- diff --git a/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java index d9544e3..2c0070c 100644 --- a/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java +++ b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java @@ -17,19 +17,12 @@ package org.apache.logging.log4j.core.appender.mom.kafka; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; - -import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.Properties; - import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.core.Appender; +import org.apache.logging.log4j.core.LogEvent; import org.apache.logging.log4j.core.impl.Log4jLogEvent; import org.apache.logging.log4j.junit.LoggerContextRule; import org.apache.logging.log4j.message.SimpleMessage; @@ -38,6 +31,16 @@ import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectInputStream; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Properties; + +import static org.junit.Assert.*; + public class KafkaAppenderTest { private static final MockProducer kafka = new MockProducer(); @@ -98,4 +101,24 @@ public class KafkaAppenderTest { assertEquals("[" + LOG_MESSAGE + "]", new String(item.value(), StandardCharsets.UTF_8)); } + @Test + public void testAppendWithSerializedLayout() throws Exception { + final Appender appender = ctx.getRequiredAppender("KafkaAppenderWithSerializedLayout"); + LogEvent logEvent = createLogEvent(); + appender.append(logEvent); + final List<ProducerRecord<byte[], byte[]>> history = kafka.history(); + assertEquals(1, history.size()); + final ProducerRecord<byte[], byte[]> item = history.get(0); + assertNotNull(item); + assertEquals(TOPIC_NAME, item.topic()); + assertNull(item.key()); + assertEquals(LOG_MESSAGE, deserializeLogEvent(item.value()).getMessage().getFormattedMessage()); + } + + private LogEvent deserializeLogEvent(byte[] data) throws IOException, ClassNotFoundException { + ByteArrayInputStream bis = new ByteArrayInputStream(data); + try (ObjectInput ois = new ObjectInputStream(bis)) { + return (LogEvent) ois.readObject(); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/084db0e7/log4j-core/src/test/resources/KafkaAppenderTest.xml ---------------------------------------------------------------------- diff --git a/log4j-core/src/test/resources/KafkaAppenderTest.xml b/log4j-core/src/test/resources/KafkaAppenderTest.xml index 758c426..c38196b 100644 --- a/log4j-core/src/test/resources/KafkaAppenderTest.xml +++ b/log4j-core/src/test/resources/KafkaAppenderTest.xml @@ -24,11 +24,16 @@ <PatternLayout pattern="[%m]"/> <Property name="bootstrap.servers">localhost:9092</Property> </Kafka> + <Kafka name="KafkaAppenderWithSerializedLayout" topic="kafka-topic"> + <SerializedLayout/> + <Property name="bootstrap.servers">localhost:9092</Property> + </Kafka> </Appenders> <Loggers> <Root level="info"> <AppenderRef ref="KafkaAppender"/> <AppenderRef ref="KafkaAppenderWithLayout"/> + <AppenderRef ref="KafkaAppenderWithSerializedLayout"/> </Root> </Loggers> </Configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/084db0e7/src/changes/changes.xml ---------------------------------------------------------------------- diff --git a/src/changes/changes.xml b/src/changes/changes.xml index 8fafc18..b6ca66f 100644 --- a/src/changes/changes.xml +++ b/src/changes/changes.xml @@ -24,6 +24,9 @@ </properties> <body> <release version="2.5" date="2015-MM-DD" description="GA Release 2.5"> + <action issue="LOG4J2-1195" dev="mikes" type="fix" due-to="Melvin Du"> + Make KafkaAppender support SerializedLayout. + </action> <action issue="LOG4J2-89" dev="rgoers" type="add"> Allow rollover to occur at any time. Add CronTriggeringPolicy </action>
