LOG4J2-1733 Report errors from Kafka async send

Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo
Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit/3390ebe8
Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/3390ebe8
Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/3390ebe8

Branch: refs/heads/master
Commit: 3390ebe842711e2d01bc07f71db08888a5dce21a
Parents: daf7b47
Author: Mikael StÃ¥ldal <[email protected]>
Authored: Mon Jan 23 17:15:49 2017 +0100
Committer: Mikael StÃ¥ldal <[email protected]>
Committed: Mon Jan 23 17:15:49 2017 +0100

----------------------------------------------------------------------
 .../log4j/core/appender/mom/kafka/KafkaManager.java     | 12 +++++++++++-
 1 file changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/3390ebe8/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
----------------------------------------------------------------------
diff --git 
a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
 
b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
index cb05769..d74abb4 100644
--- 
a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
+++ 
b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
@@ -24,6 +24,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
@@ -95,9 +96,18 @@ public class KafkaManager extends AbstractManager {
 
     public void send(final byte[] msg) throws ExecutionException, 
InterruptedException, TimeoutException {
         if (producer != null) {
-            Future<RecordMetadata> response = producer.send(new 
ProducerRecord<byte[], byte[]>(topic, msg));
+            ProducerRecord<byte[], byte[]> newRecord = new 
ProducerRecord<>(topic, msg);
             if (syncSend) {
+                Future<RecordMetadata> response = producer.send(newRecord);
                 response.get(timeoutMillis, TimeUnit.MILLISECONDS);
+            } else {
+                producer.send(newRecord, new Callback() {
+                    public void onCompletion(RecordMetadata metadata, 
Exception e) {
+                        if (e != null) {
+                            LOGGER.error("Unable to write to Kafka [" + 
getName() + "].", e);
+                        }
+                    }
+                });
             }
         }
     }

Reply via email to