LOG4J2-1733 Report errors from Kafka async send
Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit/3390ebe8 Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/3390ebe8 Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/3390ebe8 Branch: refs/heads/master Commit: 3390ebe842711e2d01bc07f71db08888a5dce21a Parents: daf7b47 Author: Mikael Ståldal <[email protected]> Authored: Mon Jan 23 17:15:49 2017 +0100 Committer: Mikael Ståldal <[email protected]> Committed: Mon Jan 23 17:15:49 2017 +0100 ---------------------------------------------------------------------- .../log4j/core/appender/mom/kafka/KafkaManager.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/3390ebe8/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java ---------------------------------------------------------------------- diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java index cb05769..d74abb4 100644 --- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java @@ -24,6 +24,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; @@ -95,9 +96,18 @@ public class KafkaManager extends AbstractManager { public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException { if (producer != null) { - Future<RecordMetadata> response = producer.send(new ProducerRecord<byte[], byte[]>(topic, msg)); + ProducerRecord<byte[], byte[]> newRecord = new ProducerRecord<>(topic, msg); if (syncSend) { + Future<RecordMetadata> response = producer.send(newRecord); response.get(timeoutMillis, TimeUnit.MILLISECONDS); + } else { + producer.send(newRecord, new Callback() { + public void onCompletion(RecordMetadata metadata, Exception e) { + if (e != null) { + LOGGER.error("Unable to write to Kafka [" + getName() + "].", e); + } + } + }); } } }
