Repository: logging-log4j2 Updated Branches: refs/heads/master cfcdffd66 -> 194ffd2da
LOG4J2-2062 Added property to KafkaAppender to send a Key to Kafka Set key as an attribute, updated documentation and set charset Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit/f57b356d Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/f57b356d Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/f57b356d Branch: refs/heads/master Commit: f57b356ddded447f3e46dd1469ae6f6ac44d3497 Parents: cfcdffd Author: Flowcont <[email protected]> Authored: Sat Sep 30 18:18:47 2017 +0100 Committer: Mikael Ståldal <[email protected]> Committed: Fri Oct 13 21:30:53 2017 +0200 ---------------------------------------------------------------------- .../core/appender/mom/kafka/KafkaAppender.java | 12 +++++++++--- .../log4j/core/appender/mom/kafka/KafkaManager.java | 12 ++++++++++-- .../core/appender/mom/kafka/KafkaAppenderTest.java | 16 ++++++++++++++++ log4j-core/src/test/resources/KafkaAppenderTest.xml | 6 ++++++ src/site/xdoc/manual/appenders.xml | 7 ++++++- 5 files changed, 47 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/f57b356d/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java ---------------------------------------------------------------------- diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java index 81ec09b..f7d50af 100644 --- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java @@ -53,6 +53,9 @@ public final class KafkaAppender extends AbstractAppender { @PluginAttribute("topic") private String topic; + + @PluginAttribute("key") + private String key; @PluginAttribute(value = "syncSend", defaultBoolean = true) private boolean syncSend; @@ -68,7 +71,8 @@ public final class KafkaAppender extends AbstractAppender { AbstractLifeCycle.LOGGER.error("No layout provided for KafkaAppender"); return null; } - final KafkaManager kafkaManager = new KafkaManager(getConfiguration().getLoggerContext(), getName(), topic, syncSend, properties); + final KafkaManager kafkaManager = + new KafkaManager(getConfiguration().getLoggerContext(), getName(), topic, syncSend, properties, key); return new KafkaAppender(getName(), layout, getFilter(), isIgnoreExceptions(), kafkaManager); } @@ -108,13 +112,15 @@ public final class KafkaAppender extends AbstractAppender { final boolean ignoreExceptions, final String topic, final Property[] properties, - final Configuration configuration) { + final Configuration configuration, + final String key) { if (layout == null) { AbstractLifeCycle.LOGGER.error("No layout provided for KafkaAppender"); return null; } - final KafkaManager kafkaManager = new KafkaManager(configuration.getLoggerContext(), name, topic, true, properties); + final KafkaManager kafkaManager = + new KafkaManager(configuration.getLoggerContext(), name, topic, true, properties, key); return new KafkaAppender(name, layout, filter, ignoreExceptions, kafkaManager); } http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/f57b356d/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java ---------------------------------------------------------------------- diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java index 2cfd366..7c3f1cd 100644 --- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java @@ -17,6 +17,7 @@ package org.apache.logging.log4j.core.appender.mom.kafka; +import java.nio.charset.StandardCharsets; import java.util.Objects; import java.util.Properties; import java.util.concurrent.ExecutionException; @@ -47,9 +48,11 @@ public class KafkaManager extends AbstractManager { private final int timeoutMillis; private final String topic; + private final byte[] key; private final boolean syncSend; - public KafkaManager(final LoggerContext loggerContext, final String name, final String topic, final boolean syncSend, final Property[] properties) { + public KafkaManager(final LoggerContext loggerContext, final String name, final String topic, final boolean syncSend, + final Property[] properties, final String key) { super(loggerContext, name); this.topic = Objects.requireNonNull(topic, "topic"); this.syncSend = syncSend; @@ -57,8 +60,12 @@ public class KafkaManager extends AbstractManager { config.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); config.setProperty("batch.size", "0"); for (final Property property : properties) { + config.setProperty(property.getName(), property.getValue()); } + + this.key = (key != null ) ? key.getBytes(StandardCharsets.UTF_8) : null ; + this.timeoutMillis = Integer.parseInt(config.getProperty("timeout.ms", DEFAULT_TIMEOUT_MILLIS)); } @@ -96,7 +103,8 @@ public class KafkaManager extends AbstractManager { public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException { if (producer != null) { - final ProducerRecord<byte[], byte[]> newRecord = new ProducerRecord<>(topic, msg); + + final ProducerRecord<byte[], byte[]> newRecord = new ProducerRecord<>(topic, key, msg); if (syncSend) { final Future<RecordMetadata> response = producer.send(newRecord); response.get(timeoutMillis, TimeUnit.MILLISECONDS); http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/f57b356d/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java ---------------------------------------------------------------------- diff --git a/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java index 2f60750..0e1c733 100644 --- a/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java +++ b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java @@ -135,6 +135,22 @@ public class KafkaAppenderTest { assertEquals(LOG_MESSAGE, new String(item.value(), StandardCharsets.UTF_8)); } + @Test + public void testAppendWithKey() throws Exception { + final Appender appender = ctx.getRequiredAppender("KafkaAppenderWithKey"); + final LogEvent logEvent = createLogEvent(); + appender.append(logEvent); + final List<ProducerRecord<byte[], byte[]>> history = kafka.history(); + assertEquals(1, history.size()); + final ProducerRecord<byte[], byte[]> item = history.get(0); + assertNotNull(item); + assertEquals(TOPIC_NAME, item.topic()); + String msgKey = item.key().toString(); + byte[] keyValue = "key".getBytes(StandardCharsets.UTF_8); + assertArrayEquals(item.key(), keyValue); + assertEquals(LOG_MESSAGE, new String(item.value(), StandardCharsets.UTF_8)); + } + private LogEvent deserializeLogEvent(final byte[] data) throws IOException, ClassNotFoundException { final ByteArrayInputStream bis = new ByteArrayInputStream(data); try (ObjectInput ois = new ObjectInputStream(bis)) { http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/f57b356d/log4j-core/src/test/resources/KafkaAppenderTest.xml ---------------------------------------------------------------------- diff --git a/log4j-core/src/test/resources/KafkaAppenderTest.xml b/log4j-core/src/test/resources/KafkaAppenderTest.xml index 2af8586..c6bacfc 100644 --- a/log4j-core/src/test/resources/KafkaAppenderTest.xml +++ b/log4j-core/src/test/resources/KafkaAppenderTest.xml @@ -32,12 +32,18 @@ <Property name="bootstrap.servers">localhost:9092</Property> <Property name="syncSend">false</Property> </Kafka> + <Kafka name="KafkaAppenderWithKey" topic="kafka-topic" key="key"> + <PatternLayout pattern="%m"/> + <Property name="timeout.ms">1000</Property> + <Property name="bootstrap.servers">localhost:9092</Property> + </Kafka> </Appenders> <Loggers> <Root level="info"> <AppenderRef ref="KafkaAppenderWithLayout"/> <AppenderRef ref="KafkaAppenderWithSerializedLayout"/> <AppenderRef ref="AsyncKafkaAppender"/> + <AppenderRef ref="KafkaAppenderWithKey"/> </Root> </Loggers> </Configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/f57b356d/src/site/xdoc/manual/appenders.xml ---------------------------------------------------------------------- diff --git a/src/site/xdoc/manual/appenders.xml b/src/site/xdoc/manual/appenders.xml index 6fe097a..7ea039f 100644 --- a/src/site/xdoc/manual/appenders.xml +++ b/src/site/xdoc/manual/appenders.xml @@ -1735,6 +1735,11 @@ public class JpaLogEntity extends AbstractLogEventWrapperEntity { <td>The Kafka topic to use. Required.</td> </tr> <tr> + <td>key</td> + <td>String</td> + <td>The key that will be sent to Kafka with every message. Optional value defaulting to <code>null</code>.</td> + </tr> + <tr> <td>filter</td> <td>Filter</td> <td>A Filter to determine if the event should be handled by this Appender. More than one Filter @@ -1777,7 +1782,7 @@ public class JpaLogEntity extends AbstractLogEventWrapperEntity { <td> You can set properties in <a href="http://kafka.apache.org/documentation.html#producerconfigs">Kafka producer properties</a>. You need to set the <code>bootstrap.servers</code> property, there are sensible default values for the others. - Do not set the <code>value.serializer</code> property. + Do not set the <code>value.serializer</code> nor <code>key.serializer</code> properties. </td> </tr> </table>
