Repository: logging-log4j2
Updated Branches:
  refs/heads/master 5c6d9bbfe -> 8b232ce9c


LOG4J-2062 Added Lookup capabilities to Kafka Key


Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo
Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit/8efe3b7a
Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/8efe3b7a
Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/8efe3b7a

Branch: refs/heads/master
Commit: 8efe3b7a3443fef4663fe9cf97a657e06ffbe1b0
Parents: 5c6d9bb
Author: Flowcont <[email protected]>
Authored: Sun Oct 15 08:47:32 2017 +0100
Committer: Mikael StÃ¥ldal <[email protected]>
Committed: Mon Oct 16 20:51:47 2017 +0200

----------------------------------------------------------------------
 .../core/appender/mom/kafka/KafkaManager.java   | 17 ++++++++++++++---
 .../appender/mom/kafka/KafkaAppenderTest.java   | 20 ++++++++++++++++++++
 .../src/test/resources/KafkaAppenderTest.xml    |  5 +++++
 src/site/xdoc/manual/appenders.xml              |  6 ++++--
 4 files changed, 43 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/8efe3b7a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
----------------------------------------------------------------------
diff --git 
a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
 
b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
index 1051561..d98d5b3 100644
--- 
a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
+++ 
b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
@@ -48,7 +48,7 @@ public class KafkaManager extends AbstractManager {
     private final int timeoutMillis;
 
     private final String topic;
-    private final byte[] key;
+    private final String key;
     private final boolean syncSend;
 
     public KafkaManager(final LoggerContext loggerContext, final String name, 
final String topic, final boolean syncSend,
@@ -62,7 +62,9 @@ public class KafkaManager extends AbstractManager {
         for (final Property property : properties) {
             config.setProperty(property.getName(), property.getValue());
         }
-        this.key = (key != null ) ? key.getBytes(StandardCharsets.UTF_8) : 
null ;
+
+        this.key = key;
+
         this.timeoutMillis = Integer.parseInt(config.getProperty("timeout.ms", 
DEFAULT_TIMEOUT_MILLIS));
     }
 
@@ -100,7 +102,16 @@ public class KafkaManager extends AbstractManager {
 
     public void send(final byte[] msg) throws ExecutionException, 
InterruptedException, TimeoutException {
         if (producer != null) {
-            final ProducerRecord<byte[], byte[]> newRecord = new 
ProducerRecord<>(topic, key, msg);
+
+            byte[] newKey = null;
+
+            if(key != null && key.contains("${")) {
+                newKey = 
getLoggerContext().getConfiguration().getStrSubstitutor().replace(key).getBytes(StandardCharsets.UTF_8);
+            } else if (key != null) {
+                newKey = key.getBytes(StandardCharsets.UTF_8);
+            }
+
+            final ProducerRecord<byte[], byte[]> newRecord = new 
ProducerRecord<>(topic, newKey, msg);
             if (syncSend) {
                 final Future<RecordMetadata> response = 
producer.send(newRecord);
                 response.get(timeoutMillis, TimeUnit.MILLISECONDS);

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/8efe3b7a/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java
----------------------------------------------------------------------
diff --git 
a/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java
 
b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java
index 0e1c733..4932619 100644
--- 
a/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java
+++ 
b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java
@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectInputStream;
 import java.nio.charset.StandardCharsets;
+import java.text.SimpleDateFormat;
+import java.util.Date;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
@@ -151,6 +153,24 @@ public class KafkaAppenderTest {
         assertEquals(LOG_MESSAGE, new String(item.value(), 
StandardCharsets.UTF_8));
     }
 
+    @Test
+    public void testAppendWithKeyLookup() throws Exception {
+        final Appender appender = 
ctx.getRequiredAppender("KafkaAppenderWithKeyLookup");
+        final LogEvent logEvent = createLogEvent();
+        Date date = new Date();
+        SimpleDateFormat format = new SimpleDateFormat("dd-MM-yyyy");
+        appender.append(logEvent);
+        final List<ProducerRecord<byte[], byte[]>> history = kafka.history();
+        assertEquals(1, history.size());
+        final ProducerRecord<byte[], byte[]> item = history.get(0);
+        assertNotNull(item);
+        assertEquals(TOPIC_NAME, item.topic());
+        byte[] keyValue = format.format(date).getBytes(StandardCharsets.UTF_8);
+        assertArrayEquals(item.key(), keyValue);
+        assertEquals(LOG_MESSAGE, new String(item.value(), 
StandardCharsets.UTF_8));
+    }
+
+
     private LogEvent deserializeLogEvent(final byte[] data) throws 
IOException, ClassNotFoundException {
         final ByteArrayInputStream bis = new ByteArrayInputStream(data);
         try (ObjectInput ois = new ObjectInputStream(bis)) {

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/8efe3b7a/log4j-core/src/test/resources/KafkaAppenderTest.xml
----------------------------------------------------------------------
diff --git a/log4j-core/src/test/resources/KafkaAppenderTest.xml 
b/log4j-core/src/test/resources/KafkaAppenderTest.xml
index c6bacfc..b775f88 100644
--- a/log4j-core/src/test/resources/KafkaAppenderTest.xml
+++ b/log4j-core/src/test/resources/KafkaAppenderTest.xml
@@ -37,6 +37,11 @@
       <Property name="timeout.ms">1000</Property>
       <Property name="bootstrap.servers">localhost:9092</Property>
     </Kafka>
+    <Kafka name="KafkaAppenderWithKeyLookup" topic="kafka-topic" 
key="$${date:dd-MM-yyyy}">
+      <PatternLayout pattern="%m"/>
+      <Property name="timeout.ms">1000</Property>
+      <Property name="bootstrap.servers">localhost:9092</Property>
+    </Kafka>
   </Appenders>
   <Loggers>
     <Root level="info">

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/8efe3b7a/src/site/xdoc/manual/appenders.xml
----------------------------------------------------------------------
diff --git a/src/site/xdoc/manual/appenders.xml 
b/src/site/xdoc/manual/appenders.xml
index b7671d4..ae86b54 100644
--- a/src/site/xdoc/manual/appenders.xml
+++ b/src/site/xdoc/manual/appenders.xml
@@ -1720,7 +1720,7 @@ public class JpaLogEntity extends 
AbstractLogEventWrapperEntity {
         <subsection name="KafkaAppender">
           <p>
             The KafkaAppender logs events to an <a 
href="https://kafka.apache.org/";>Apache Kafka</a> topic.
-            Each log event is sent as a Kafka record with no key.
+            Each log event is sent as a Kafka record.
           </p>
           <table>
             <caption align="top">KafkaAppender Parameters</caption>
@@ -1737,7 +1737,9 @@ public class JpaLogEntity extends 
AbstractLogEventWrapperEntity {
             <tr>
               <td>key</td>
               <td>String</td>
-              <td>The key that will be sent to Kafka with every message. 
Optional value defaulting to <code>null</code> (no key).</td>
+              <td>The key that will be sent to Kafka with every message. 
Optional value defaulting to <code>null</code>.
+                Any of the <a href="./lookups.html">Lookups</a>) can be 
included.
+              </td>
             </tr>
             <tr>
               <td>filter</td>

Reply via email to