Repository: logging-log4j2
Updated Branches:
  refs/heads/master cfcdffd66 -> 194ffd2da


LOG4J2-2062 Added property to KafkaAppender to send a Key to Kafka

Set key as an attribute, updated documentation and set charset


Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo
Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit/f57b356d
Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/f57b356d
Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/f57b356d

Branch: refs/heads/master
Commit: f57b356ddded447f3e46dd1469ae6f6ac44d3497
Parents: cfcdffd
Author: Flowcont <[email protected]>
Authored: Sat Sep 30 18:18:47 2017 +0100
Committer: Mikael StÃ¥ldal <[email protected]>
Committed: Fri Oct 13 21:30:53 2017 +0200

----------------------------------------------------------------------
 .../core/appender/mom/kafka/KafkaAppender.java      | 12 +++++++++---
 .../log4j/core/appender/mom/kafka/KafkaManager.java | 12 ++++++++++--
 .../core/appender/mom/kafka/KafkaAppenderTest.java  | 16 ++++++++++++++++
 log4j-core/src/test/resources/KafkaAppenderTest.xml |  6 ++++++
 src/site/xdoc/manual/appenders.xml                  |  7 ++++++-
 5 files changed, 47 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/f57b356d/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
----------------------------------------------------------------------
diff --git 
a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
 
b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
index 81ec09b..f7d50af 100644
--- 
a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
+++ 
b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
@@ -53,6 +53,9 @@ public final class KafkaAppender extends AbstractAppender {
 
         @PluginAttribute("topic") 
         private String topic;
+
+        @PluginAttribute("key")
+        private String key;
         
         @PluginAttribute(value = "syncSend", defaultBoolean = true)
         private boolean syncSend;
@@ -68,7 +71,8 @@ public final class KafkaAppender extends AbstractAppender {
                 AbstractLifeCycle.LOGGER.error("No layout provided for 
KafkaAppender");
                 return null;
             }
-            final KafkaManager kafkaManager = new 
KafkaManager(getConfiguration().getLoggerContext(), getName(), topic, syncSend, 
properties);
+            final KafkaManager kafkaManager =
+                    new KafkaManager(getConfiguration().getLoggerContext(), 
getName(), topic, syncSend, properties, key);
             return new KafkaAppender(getName(), layout, getFilter(), 
isIgnoreExceptions(), kafkaManager);
         }
 
@@ -108,13 +112,15 @@ public final class KafkaAppender extends AbstractAppender 
{
             final boolean ignoreExceptions,
             final String topic,
             final Property[] properties,
-            final Configuration configuration) {
+            final Configuration configuration,
+            final String key) {
 
         if (layout == null) {
             AbstractLifeCycle.LOGGER.error("No layout provided for 
KafkaAppender");
             return null;
         }
-        final KafkaManager kafkaManager = new 
KafkaManager(configuration.getLoggerContext(), name, topic, true, properties);
+        final KafkaManager kafkaManager =
+                new KafkaManager(configuration.getLoggerContext(), name, 
topic, true, properties, key);
         return new KafkaAppender(name, layout, filter, ignoreExceptions, 
kafkaManager);
     }
 

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/f57b356d/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
----------------------------------------------------------------------
diff --git 
a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
 
b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
index 2cfd366..7c3f1cd 100644
--- 
a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
+++ 
b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.logging.log4j.core.appender.mom.kafka;
 
+import java.nio.charset.StandardCharsets;
 import java.util.Objects;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
@@ -47,9 +48,11 @@ public class KafkaManager extends AbstractManager {
     private final int timeoutMillis;
 
     private final String topic;
+    private final byte[] key;
     private final boolean syncSend;
 
-    public KafkaManager(final LoggerContext loggerContext, final String name, 
final String topic, final boolean syncSend, final Property[] properties) {
+    public KafkaManager(final LoggerContext loggerContext, final String name, 
final String topic, final boolean syncSend,
+                        final Property[] properties, final String key) {
         super(loggerContext, name);
         this.topic = Objects.requireNonNull(topic, "topic");
         this.syncSend = syncSend;
@@ -57,8 +60,12 @@ public class KafkaManager extends AbstractManager {
         config.setProperty("value.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
         config.setProperty("batch.size", "0");
         for (final Property property : properties) {
+
             config.setProperty(property.getName(), property.getValue());
         }
+
+        this.key = (key != null ) ? key.getBytes(StandardCharsets.UTF_8) : 
null ;
+
         this.timeoutMillis = Integer.parseInt(config.getProperty("timeout.ms", 
DEFAULT_TIMEOUT_MILLIS));
     }
 
@@ -96,7 +103,8 @@ public class KafkaManager extends AbstractManager {
 
     public void send(final byte[] msg) throws ExecutionException, 
InterruptedException, TimeoutException {
         if (producer != null) {
-            final ProducerRecord<byte[], byte[]> newRecord = new 
ProducerRecord<>(topic, msg);
+
+            final ProducerRecord<byte[], byte[]> newRecord = new 
ProducerRecord<>(topic, key, msg);
             if (syncSend) {
                 final Future<RecordMetadata> response = 
producer.send(newRecord);
                 response.get(timeoutMillis, TimeUnit.MILLISECONDS);

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/f57b356d/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java
----------------------------------------------------------------------
diff --git 
a/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java
 
b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java
index 2f60750..0e1c733 100644
--- 
a/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java
+++ 
b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java
@@ -135,6 +135,22 @@ public class KafkaAppenderTest {
         assertEquals(LOG_MESSAGE, new String(item.value(), 
StandardCharsets.UTF_8));
     }
 
+    @Test
+    public void testAppendWithKey() throws Exception {
+        final Appender appender = 
ctx.getRequiredAppender("KafkaAppenderWithKey");
+        final LogEvent logEvent = createLogEvent();
+        appender.append(logEvent);
+        final List<ProducerRecord<byte[], byte[]>> history = kafka.history();
+        assertEquals(1, history.size());
+        final ProducerRecord<byte[], byte[]> item = history.get(0);
+        assertNotNull(item);
+        assertEquals(TOPIC_NAME, item.topic());
+        String msgKey = item.key().toString();
+        byte[] keyValue = "key".getBytes(StandardCharsets.UTF_8);
+        assertArrayEquals(item.key(), keyValue);
+        assertEquals(LOG_MESSAGE, new String(item.value(), 
StandardCharsets.UTF_8));
+    }
+
     private LogEvent deserializeLogEvent(final byte[] data) throws 
IOException, ClassNotFoundException {
         final ByteArrayInputStream bis = new ByteArrayInputStream(data);
         try (ObjectInput ois = new ObjectInputStream(bis)) {

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/f57b356d/log4j-core/src/test/resources/KafkaAppenderTest.xml
----------------------------------------------------------------------
diff --git a/log4j-core/src/test/resources/KafkaAppenderTest.xml 
b/log4j-core/src/test/resources/KafkaAppenderTest.xml
index 2af8586..c6bacfc 100644
--- a/log4j-core/src/test/resources/KafkaAppenderTest.xml
+++ b/log4j-core/src/test/resources/KafkaAppenderTest.xml
@@ -32,12 +32,18 @@
       <Property name="bootstrap.servers">localhost:9092</Property>
       <Property name="syncSend">false</Property>
     </Kafka>
+    <Kafka name="KafkaAppenderWithKey" topic="kafka-topic" key="key">
+      <PatternLayout pattern="%m"/>
+      <Property name="timeout.ms">1000</Property>
+      <Property name="bootstrap.servers">localhost:9092</Property>
+    </Kafka>
   </Appenders>
   <Loggers>
     <Root level="info">
       <AppenderRef ref="KafkaAppenderWithLayout"/>
       <AppenderRef ref="KafkaAppenderWithSerializedLayout"/>
       <AppenderRef ref="AsyncKafkaAppender"/>
+      <AppenderRef ref="KafkaAppenderWithKey"/>
     </Root>
   </Loggers>
 </Configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/f57b356d/src/site/xdoc/manual/appenders.xml
----------------------------------------------------------------------
diff --git a/src/site/xdoc/manual/appenders.xml 
b/src/site/xdoc/manual/appenders.xml
index 6fe097a..7ea039f 100644
--- a/src/site/xdoc/manual/appenders.xml
+++ b/src/site/xdoc/manual/appenders.xml
@@ -1735,6 +1735,11 @@ public class JpaLogEntity extends 
AbstractLogEventWrapperEntity {
               <td>The Kafka topic to use. Required.</td>
             </tr>
             <tr>
+              <td>key</td>
+              <td>String</td>
+              <td>The key that will be sent to Kafka with every message. 
Optional value defaulting to <code>null</code>.</td>
+            </tr>
+            <tr>
               <td>filter</td>
               <td>Filter</td>
               <td>A Filter to determine if the event should be handled by this 
Appender. More than one Filter
@@ -1777,7 +1782,7 @@ public class JpaLogEntity extends 
AbstractLogEventWrapperEntity {
               <td>
                 You can set properties in <a 
href="http://kafka.apache.org/documentation.html#producerconfigs";>Kafka 
producer properties</a>.
                 You need to set the <code>bootstrap.servers</code> property, 
there are sensible default values for the others.
-                Do not set the <code>value.serializer</code> property.
+                Do not set the <code>value.serializer</code> nor 
<code>key.serializer</code> properties.
               </td>
             </tr>
           </table>

Reply via email to