If you use @PluginBuilderAttribute, you can omit the names if they match
the field name. Also, default values can be specified in normal Java code
as the default field values. Less verbose IMO.

On 13 October 2017 at 14:38, <mi...@apache.org> wrote:

> Repository: logging-log4j2
> Updated Branches:
>   refs/heads/master cfcdffd66 -> 194ffd2da
>
>
> LOG4J2-2062 Added property to KafkaAppender to send a Key to Kafka
>
> Set key as an attribute, updated documentation and set charset
>
>
> Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo
> Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/
> commit/f57b356d
> Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/f57b356d
> Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/f57b356d
>
> Branch: refs/heads/master
> Commit: f57b356ddded447f3e46dd1469ae6f6ac44d3497
> Parents: cfcdffd
> Author: Flowcont <jorgesg1...@gmail.com>
> Authored: Sat Sep 30 18:18:47 2017 +0100
> Committer: Mikael Ståldal <mik...@staldal.nu>
> Committed: Fri Oct 13 21:30:53 2017 +0200
>
> ----------------------------------------------------------------------
>  .../core/appender/mom/kafka/KafkaAppender.java      | 12 +++++++++---
>  .../log4j/core/appender/mom/kafka/KafkaManager.java | 12 ++++++++++--
>  .../core/appender/mom/kafka/KafkaAppenderTest.java  | 16 ++++++++++++++++
>  log4j-core/src/test/resources/KafkaAppenderTest.xml |  6 ++++++
>  src/site/xdoc/manual/appenders.xml                  |  7 ++++++-
>  5 files changed, 47 insertions(+), 6 deletions(-)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/
> f57b356d/log4j-core/src/main/java/org/apache/logging/log4j/
> core/appender/mom/kafka/KafkaAppender.java
> ----------------------------------------------------------------------
> diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/
> appender/mom/kafka/KafkaAppender.java b/log4j-core/src/main/java/
> org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
> index 81ec09b..f7d50af 100644
> --- a/log4j-core/src/main/java/org/apache/logging/log4j/core/
> appender/mom/kafka/KafkaAppender.java
> +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/
> appender/mom/kafka/KafkaAppender.java
> @@ -53,6 +53,9 @@ public final class KafkaAppender extends
> AbstractAppender {
>
>          @PluginAttribute("topic")
>          private String topic;
> +
> +        @PluginAttribute("key")
> +        private String key;
>
>          @PluginAttribute(value = "syncSend", defaultBoolean = true)
>          private boolean syncSend;
> @@ -68,7 +71,8 @@ public final class KafkaAppender extends
> AbstractAppender {
>                  AbstractLifeCycle.LOGGER.error("No layout provided for
> KafkaAppender");
>                  return null;
>              }
> -            final KafkaManager kafkaManager = new
> KafkaManager(getConfiguration().getLoggerContext(), getName(), topic,
> syncSend, properties);
> +            final KafkaManager kafkaManager =
> +                    new KafkaManager(getConfiguration().getLoggerContext(),
> getName(), topic, syncSend, properties, key);
>              return new KafkaAppender(getName(), layout, getFilter(),
> isIgnoreExceptions(), kafkaManager);
>          }
>
> @@ -108,13 +112,15 @@ public final class KafkaAppender extends
> AbstractAppender {
>              final boolean ignoreExceptions,
>              final String topic,
>              final Property[] properties,
> -            final Configuration configuration) {
> +            final Configuration configuration,
> +            final String key) {
>
>          if (layout == null) {
>              AbstractLifeCycle.LOGGER.error("No layout provided for
> KafkaAppender");
>              return null;
>          }
> -        final KafkaManager kafkaManager = new 
> KafkaManager(configuration.getLoggerContext(),
> name, topic, true, properties);
> +        final KafkaManager kafkaManager =
> +                new KafkaManager(configuration.getLoggerContext(), name,
> topic, true, properties, key);
>          return new KafkaAppender(name, layout, filter, ignoreExceptions,
> kafkaManager);
>      }
>
>
> http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/
> f57b356d/log4j-core/src/main/java/org/apache/logging/log4j/
> core/appender/mom/kafka/KafkaManager.java
> ----------------------------------------------------------------------
> diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/
> appender/mom/kafka/KafkaManager.java b/log4j-core/src/main/java/
> org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
> index 2cfd366..7c3f1cd 100644
> --- a/log4j-core/src/main/java/org/apache/logging/log4j/core/
> appender/mom/kafka/KafkaManager.java
> +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/
> appender/mom/kafka/KafkaManager.java
> @@ -17,6 +17,7 @@
>
>  package org.apache.logging.log4j.core.appender.mom.kafka;
>
> +import java.nio.charset.StandardCharsets;
>  import java.util.Objects;
>  import java.util.Properties;
>  import java.util.concurrent.ExecutionException;
> @@ -47,9 +48,11 @@ public class KafkaManager extends AbstractManager {
>      private final int timeoutMillis;
>
>      private final String topic;
> +    private final byte[] key;
>      private final boolean syncSend;
>
> -    public KafkaManager(final LoggerContext loggerContext, final String
> name, final String topic, final boolean syncSend, final Property[]
> properties) {
> +    public KafkaManager(final LoggerContext loggerContext, final String
> name, final String topic, final boolean syncSend,
> +                        final Property[] properties, final String key) {
>          super(loggerContext, name);
>          this.topic = Objects.requireNonNull(topic, "topic");
>          this.syncSend = syncSend;
> @@ -57,8 +60,12 @@ public class KafkaManager extends AbstractManager {
>          config.setProperty("value.serializer", "org.apache.kafka.common.
> serialization.ByteArraySerializer");
>          config.setProperty("batch.size", "0");
>          for (final Property property : properties) {
> +
>              config.setProperty(property.getName(), property.getValue());
>          }
> +
> +        this.key = (key != null ) ? key.getBytes(StandardCharsets.UTF_8)
> : null ;
> +
>          this.timeoutMillis = Integer.parseInt(config.getProperty("
> timeout.ms", DEFAULT_TIMEOUT_MILLIS));
>      }
>
> @@ -96,7 +103,8 @@ public class KafkaManager extends AbstractManager {
>
>      public void send(final byte[] msg) throws ExecutionException,
> InterruptedException, TimeoutException {
>          if (producer != null) {
> -            final ProducerRecord<byte[], byte[]> newRecord = new
> ProducerRecord<>(topic, msg);
> +
> +            final ProducerRecord<byte[], byte[]> newRecord = new
> ProducerRecord<>(topic, key, msg);
>              if (syncSend) {
>                  final Future<RecordMetadata> response =
> producer.send(newRecord);
>                  response.get(timeoutMillis, TimeUnit.MILLISECONDS);
>
> http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/
> f57b356d/log4j-core/src/test/java/org/apache/logging/log4j/
> core/appender/mom/kafka/KafkaAppenderTest.java
> ----------------------------------------------------------------------
> diff --git a/log4j-core/src/test/java/org/apache/logging/log4j/core/
> appender/mom/kafka/KafkaAppenderTest.java b/log4j-core/src/test/java/
> org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java
> index 2f60750..0e1c733 100644
> --- a/log4j-core/src/test/java/org/apache/logging/log4j/core/
> appender/mom/kafka/KafkaAppenderTest.java
> +++ b/log4j-core/src/test/java/org/apache/logging/log4j/core/
> appender/mom/kafka/KafkaAppenderTest.java
> @@ -135,6 +135,22 @@ public class KafkaAppenderTest {
>          assertEquals(LOG_MESSAGE, new String(item.value(),
> StandardCharsets.UTF_8));
>      }
>
> +    @Test
> +    public void testAppendWithKey() throws Exception {
> +        final Appender appender = ctx.getRequiredAppender("
> KafkaAppenderWithKey");
> +        final LogEvent logEvent = createLogEvent();
> +        appender.append(logEvent);
> +        final List<ProducerRecord<byte[], byte[]>> history =
> kafka.history();
> +        assertEquals(1, history.size());
> +        final ProducerRecord<byte[], byte[]> item = history.get(0);
> +        assertNotNull(item);
> +        assertEquals(TOPIC_NAME, item.topic());
> +        String msgKey = item.key().toString();
> +        byte[] keyValue = "key".getBytes(StandardCharsets.UTF_8);
> +        assertArrayEquals(item.key(), keyValue);
> +        assertEquals(LOG_MESSAGE, new String(item.value(),
> StandardCharsets.UTF_8));
> +    }
> +
>      private LogEvent deserializeLogEvent(final byte[] data) throws
> IOException, ClassNotFoundException {
>          final ByteArrayInputStream bis = new ByteArrayInputStream(data);
>          try (ObjectInput ois = new ObjectInputStream(bis)) {
>
> http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/
> f57b356d/log4j-core/src/test/resources/KafkaAppenderTest.xml
> ----------------------------------------------------------------------
> diff --git a/log4j-core/src/test/resources/KafkaAppenderTest.xml
> b/log4j-core/src/test/resources/KafkaAppenderTest.xml
> index 2af8586..c6bacfc 100644
> --- a/log4j-core/src/test/resources/KafkaAppenderTest.xml
> +++ b/log4j-core/src/test/resources/KafkaAppenderTest.xml
> @@ -32,12 +32,18 @@
>        <Property name="bootstrap.servers">localhost:9092</Property>
>        <Property name="syncSend">false</Property>
>      </Kafka>
> +    <Kafka name="KafkaAppenderWithKey" topic="kafka-topic" key="key">
> +      <PatternLayout pattern="%m"/>
> +      <Property name="timeout.ms">1000</Property>
> +      <Property name="bootstrap.servers">localhost:9092</Property>
> +    </Kafka>
>    </Appenders>
>    <Loggers>
>      <Root level="info">
>        <AppenderRef ref="KafkaAppenderWithLayout"/>
>        <AppenderRef ref="KafkaAppenderWithSerializedLayout"/>
>        <AppenderRef ref="AsyncKafkaAppender"/>
> +      <AppenderRef ref="KafkaAppenderWithKey"/>
>      </Root>
>    </Loggers>
>  </Configuration>
> \ No newline at end of file
>
> http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/
> f57b356d/src/site/xdoc/manual/appenders.xml
> ----------------------------------------------------------------------
> diff --git a/src/site/xdoc/manual/appenders.xml b/src/site/xdoc/manual/
> appenders.xml
> index 6fe097a..7ea039f 100644
> --- a/src/site/xdoc/manual/appenders.xml
> +++ b/src/site/xdoc/manual/appenders.xml
> @@ -1735,6 +1735,11 @@ public class JpaLogEntity extends
> AbstractLogEventWrapperEntity {
>                <td>The Kafka topic to use. Required.</td>
>              </tr>
>              <tr>
> +              <td>key</td>
> +              <td>String</td>
> +              <td>The key that will be sent to Kafka with every message.
> Optional value defaulting to <code>null</code>.</td>
> +            </tr>
> +            <tr>
>                <td>filter</td>
>                <td>Filter</td>
>                <td>A Filter to determine if the event should be handled by
> this Appender. More than one Filter
> @@ -1777,7 +1782,7 @@ public class JpaLogEntity extends
> AbstractLogEventWrapperEntity {
>                <td>
>                  You can set properties in <a href="
> http://kafka.apache.org/documentation.html#producerconfigs";>Kafka
> producer properties</a>.
>                  You need to set the <code>bootstrap.servers</code>
> property, there are sensible default values for the others.
> -                Do not set the <code>value.serializer</code> property.
> +                Do not set the <code>value.serializer</code> nor
> <code>key.serializer</code> properties.
>                </td>
>              </tr>
>            </table>
>
>


-- 
Matt Sicker <boa...@gmail.com>

Reply via email to