This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 1175038  [GOBBLIN-886] add callback to kafka apis
1175038 is described below

commit 117503833eceaf4bb3a7ac2fa0c5457982d7b8ad
Author: Arjun <[email protected]>
AuthorDate: Wed Sep 25 05:28:41 2019 -0700

    [GOBBLIN-886] add callback to kafka apis
    
    Closes #2740 from arjun4084346/kafkaErrorLogs
---
 .../gobblin/service/SimpleKafkaSpecProducer.java   | 25 ++++++++++++++++++----
 1 file changed, 21 insertions(+), 4 deletions(-)

diff --git 
a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
 
b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
index c56593c..9085d3a 100644
--- 
a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
+++ 
b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
@@ -89,7 +89,7 @@ public class SimpleKafkaSpecProducer implements 
SpecProducer<Spec>, Closeable  {
 
     log.info("Adding Spec: " + addedSpec + " using Kafka.");
 
-    return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), 
WriteCallback.EMPTY);
+    return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), 
new KafkaWriteCallback(avroJobSpec));
   }
 
   @Override
@@ -98,7 +98,7 @@ public class SimpleKafkaSpecProducer implements 
SpecProducer<Spec>, Closeable  {
 
     log.info("Updating Spec: " + updatedSpec + " using Kafka.");
 
-    return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), 
WriteCallback.EMPTY);
+    return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), 
new KafkaWriteCallback(avroJobSpec));
   }
 
   @Override
@@ -110,7 +110,7 @@ public class SimpleKafkaSpecProducer implements 
SpecProducer<Spec>, Closeable  {
 
     log.info("Deleting Spec: " + deletedSpecURI + " using Kafka.");
 
-    return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), 
WriteCallback.EMPTY);
+    return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), 
new KafkaWriteCallback(avroJobSpec));
   }
 
   @Override
@@ -131,7 +131,6 @@ public class SimpleKafkaSpecProducer implements 
SpecProducer<Spec>, Closeable  {
             ConfigUtils.configToProperties(_config));
       } catch (ClassNotFoundException | NoSuchMethodException | 
IllegalAccessException | InstantiationException | InvocationTargetException e) {
         log.error("Failed to instantiate Kafka consumer from class " + 
_kafkaProducerClassName, e);
-
         throw new RuntimeException("Failed to instantiate Kafka consumer", e);
       }
     }
@@ -156,4 +155,22 @@ public class SimpleKafkaSpecProducer implements 
SpecProducer<Spec>, Closeable  {
       throw new RuntimeException("Unsupported spec type " + spec.getClass());
     }
   }
+
+  static class KafkaWriteCallback implements WriteCallback {
+    AvroJobSpec avroJobSpec;
+
+    KafkaWriteCallback(AvroJobSpec avroJobSpec) {
+      this.avroJobSpec = avroJobSpec;
+    }
+
+    @Override
+    public void onSuccess(Object result) {
+
+    }
+
+    @Override
+    public void onFailure(Throwable throwable) {
+      log.error("Error while writing the following record to Kafka {}", 
avroJobSpec.toString(), throwable);
+    }
+  }
 }
\ No newline at end of file

Reply via email to