Repository: logging-log4j2 Updated Branches: refs/heads/master ef8e9e09a -> 9ce722cdf
Improve error handling in KafkaAppender Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit/9ce722cd Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/9ce722cd Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/9ce722cd Branch: refs/heads/master Commit: 9ce722cdf7bbdc9e80824603405705388330258f Parents: ef8e9e0 Author: Mikael Ståldal <[email protected]> Authored: Thu May 4 14:48:46 2017 +0200 Committer: Mikael Ståldal <[email protected]> Committed: Thu May 4 14:48:46 2017 +0200 ---------------------------------------------------------------------- .../core/appender/mom/kafka/KafkaAppender.java | 42 +++++++++++--------- .../core/appender/mom/kafka/KafkaManager.java | 2 +- 2 files changed, 24 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/9ce722cd/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java ---------------------------------------------------------------------- diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java index 8d37f35..2c495ae 100644 --- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java @@ -20,14 +20,15 @@ package org.apache.logging.log4j.core.appender.mom.kafka; import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.util.Objects; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.logging.log4j.core.Appender; import org.apache.logging.log4j.core.Filter; import org.apache.logging.log4j.core.Layout; import org.apache.logging.log4j.core.LogEvent; import org.apache.logging.log4j.core.appender.AbstractAppender; -import org.apache.logging.log4j.core.appender.AppenderLoggingException; import org.apache.logging.log4j.core.config.Configuration; import org.apache.logging.log4j.core.config.Node; import org.apache.logging.log4j.core.config.Property; @@ -133,29 +134,32 @@ public final class KafkaAppender extends AbstractAppender { LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName()); } else { try { - final Layout<? extends Serializable> layout = getLayout(); - byte[] data; - if (layout != null) { - if (layout instanceof SerializedLayout) { - final byte[] header = layout.getHeader(); - final byte[] body = layout.toByteArray(event); - data = new byte[header.length + body.length]; - System.arraycopy(header, 0, data, 0, header.length); - System.arraycopy(body, 0, data, header.length, body.length); - } else { - data = layout.toByteArray(event); - } - } else { - data = StringEncoder.toBytes(event.getMessage().getFormattedMessage(), StandardCharsets.UTF_8); - } - manager.send(data); + tryAppend(event); } catch (final Exception e) { - LOGGER.error("Unable to write to Kafka [{}] for appender [{}].", manager.getName(), getName(), e); - throw new AppenderLoggingException("Unable to write to Kafka in appender: " + e.getMessage(), e); + error("Unable to write to Kafka in appender [" + getName() + "]", event, e); } } } + private void tryAppend(LogEvent event) throws ExecutionException, InterruptedException, TimeoutException { + final Layout<? extends Serializable> layout = getLayout(); + byte[] data; + if (layout != null) { + if (layout instanceof SerializedLayout) { + final byte[] header = layout.getHeader(); + final byte[] body = layout.toByteArray(event); + data = new byte[header.length + body.length]; + System.arraycopy(header, 0, data, 0, header.length); + System.arraycopy(body, 0, data, header.length, body.length); + } else { + data = layout.toByteArray(event); + } + } else { + data = StringEncoder.toBytes(event.getMessage().getFormattedMessage(), StandardCharsets.UTF_8); + } + manager.send(data); + } + @Override public void start() { super.start(); http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/9ce722cd/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java ---------------------------------------------------------------------- diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java index d74abb4..52d21f4 100644 --- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java @@ -104,7 +104,7 @@ public class KafkaManager extends AbstractManager { producer.send(newRecord, new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if (e != null) { - LOGGER.error("Unable to write to Kafka [" + getName() + "].", e); + LOGGER.error("Unable to write to Kafka in appender [" + getName() + "]", e); } } });
