Repository: logging-log4j2 Updated Branches: refs/heads/master 5c6d9bbfe -> 8b232ce9c
LOG4J-2062 Added Lookup capabilities to Kafka Key Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit/8efe3b7a Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/8efe3b7a Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/8efe3b7a Branch: refs/heads/master Commit: 8efe3b7a3443fef4663fe9cf97a657e06ffbe1b0 Parents: 5c6d9bb Author: Flowcont <[email protected]> Authored: Sun Oct 15 08:47:32 2017 +0100 Committer: Mikael Ståldal <[email protected]> Committed: Mon Oct 16 20:51:47 2017 +0200 ---------------------------------------------------------------------- .../core/appender/mom/kafka/KafkaManager.java | 17 ++++++++++++++--- .../appender/mom/kafka/KafkaAppenderTest.java | 20 ++++++++++++++++++++ .../src/test/resources/KafkaAppenderTest.xml | 5 +++++ src/site/xdoc/manual/appenders.xml | 6 ++++-- 4 files changed, 43 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/8efe3b7a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java ---------------------------------------------------------------------- diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java index 1051561..d98d5b3 100644 --- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java @@ -48,7 +48,7 @@ public class KafkaManager extends AbstractManager { private final int timeoutMillis; private final String topic; - private final byte[] key; + private final String key; private final boolean syncSend; public KafkaManager(final LoggerContext loggerContext, final String name, final String topic, final boolean syncSend, @@ -62,7 +62,9 @@ public class KafkaManager extends AbstractManager { for (final Property property : properties) { config.setProperty(property.getName(), property.getValue()); } - this.key = (key != null ) ? key.getBytes(StandardCharsets.UTF_8) : null ; + + this.key = key; + this.timeoutMillis = Integer.parseInt(config.getProperty("timeout.ms", DEFAULT_TIMEOUT_MILLIS)); } @@ -100,7 +102,16 @@ public class KafkaManager extends AbstractManager { public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException { if (producer != null) { - final ProducerRecord<byte[], byte[]> newRecord = new ProducerRecord<>(topic, key, msg); + + byte[] newKey = null; + + if(key != null && key.contains("${")) { + newKey = getLoggerContext().getConfiguration().getStrSubstitutor().replace(key).getBytes(StandardCharsets.UTF_8); + } else if (key != null) { + newKey = key.getBytes(StandardCharsets.UTF_8); + } + + final ProducerRecord<byte[], byte[]> newRecord = new ProducerRecord<>(topic, newKey, msg); if (syncSend) { final Future<RecordMetadata> response = producer.send(newRecord); response.get(timeoutMillis, TimeUnit.MILLISECONDS); http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/8efe3b7a/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java ---------------------------------------------------------------------- diff --git a/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java index 0e1c733..4932619 100644 --- a/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java +++ b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java @@ -22,6 +22,8 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectInputStream; import java.nio.charset.StandardCharsets; +import java.text.SimpleDateFormat; +import java.util.Date; import java.util.List; import java.util.Properties; import java.util.concurrent.TimeUnit; @@ -151,6 +153,24 @@ public class KafkaAppenderTest { assertEquals(LOG_MESSAGE, new String(item.value(), StandardCharsets.UTF_8)); } + @Test + public void testAppendWithKeyLookup() throws Exception { + final Appender appender = ctx.getRequiredAppender("KafkaAppenderWithKeyLookup"); + final LogEvent logEvent = createLogEvent(); + Date date = new Date(); + SimpleDateFormat format = new SimpleDateFormat("dd-MM-yyyy"); + appender.append(logEvent); + final List<ProducerRecord<byte[], byte[]>> history = kafka.history(); + assertEquals(1, history.size()); + final ProducerRecord<byte[], byte[]> item = history.get(0); + assertNotNull(item); + assertEquals(TOPIC_NAME, item.topic()); + byte[] keyValue = format.format(date).getBytes(StandardCharsets.UTF_8); + assertArrayEquals(item.key(), keyValue); + assertEquals(LOG_MESSAGE, new String(item.value(), StandardCharsets.UTF_8)); + } + + private LogEvent deserializeLogEvent(final byte[] data) throws IOException, ClassNotFoundException { final ByteArrayInputStream bis = new ByteArrayInputStream(data); try (ObjectInput ois = new ObjectInputStream(bis)) { http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/8efe3b7a/log4j-core/src/test/resources/KafkaAppenderTest.xml ---------------------------------------------------------------------- diff --git a/log4j-core/src/test/resources/KafkaAppenderTest.xml b/log4j-core/src/test/resources/KafkaAppenderTest.xml index c6bacfc..b775f88 100644 --- a/log4j-core/src/test/resources/KafkaAppenderTest.xml +++ b/log4j-core/src/test/resources/KafkaAppenderTest.xml @@ -37,6 +37,11 @@ <Property name="timeout.ms">1000</Property> <Property name="bootstrap.servers">localhost:9092</Property> </Kafka> + <Kafka name="KafkaAppenderWithKeyLookup" topic="kafka-topic" key="$${date:dd-MM-yyyy}"> + <PatternLayout pattern="%m"/> + <Property name="timeout.ms">1000</Property> + <Property name="bootstrap.servers">localhost:9092</Property> + </Kafka> </Appenders> <Loggers> <Root level="info"> http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/8efe3b7a/src/site/xdoc/manual/appenders.xml ---------------------------------------------------------------------- diff --git a/src/site/xdoc/manual/appenders.xml b/src/site/xdoc/manual/appenders.xml index b7671d4..ae86b54 100644 --- a/src/site/xdoc/manual/appenders.xml +++ b/src/site/xdoc/manual/appenders.xml @@ -1720,7 +1720,7 @@ public class JpaLogEntity extends AbstractLogEventWrapperEntity { <subsection name="KafkaAppender"> <p> The KafkaAppender logs events to an <a href="https://kafka.apache.org/">Apache Kafka</a> topic. - Each log event is sent as a Kafka record with no key. + Each log event is sent as a Kafka record. </p> <table> <caption align="top">KafkaAppender Parameters</caption> @@ -1737,7 +1737,9 @@ public class JpaLogEntity extends AbstractLogEventWrapperEntity { <tr> <td>key</td> <td>String</td> - <td>The key that will be sent to Kafka with every message. Optional value defaulting to <code>null</code> (no key).</td> + <td>The key that will be sent to Kafka with every message. Optional value defaulting to <code>null</code>. + Any of the <a href="./lookups.html">Lookups</a>) can be included. + </td> </tr> <tr> <td>filter</td>
