This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 1175038 [GOBBLIN-886] add callback to kafka apis
1175038 is described below
commit 117503833eceaf4bb3a7ac2fa0c5457982d7b8ad
Author: Arjun <[email protected]>
AuthorDate: Wed Sep 25 05:28:41 2019 -0700
[GOBBLIN-886] add callback to kafka apis
Closes #2740 from arjun4084346/kafkaErrorLogs
---
.../gobblin/service/SimpleKafkaSpecProducer.java | 25 ++++++++++++++++++----
1 file changed, 21 insertions(+), 4 deletions(-)
diff --git
a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
index c56593c..9085d3a 100644
---
a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
+++
b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
@@ -89,7 +89,7 @@ public class SimpleKafkaSpecProducer implements
SpecProducer<Spec>, Closeable {
log.info("Adding Spec: " + addedSpec + " using Kafka.");
- return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec),
WriteCallback.EMPTY);
+ return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec),
new KafkaWriteCallback(avroJobSpec));
}
@Override
@@ -98,7 +98,7 @@ public class SimpleKafkaSpecProducer implements
SpecProducer<Spec>, Closeable {
log.info("Updating Spec: " + updatedSpec + " using Kafka.");
- return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec),
WriteCallback.EMPTY);
+ return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec),
new KafkaWriteCallback(avroJobSpec));
}
@Override
@@ -110,7 +110,7 @@ public class SimpleKafkaSpecProducer implements
SpecProducer<Spec>, Closeable {
log.info("Deleting Spec: " + deletedSpecURI + " using Kafka.");
- return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec),
WriteCallback.EMPTY);
+ return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec),
new KafkaWriteCallback(avroJobSpec));
}
@Override
@@ -131,7 +131,6 @@ public class SimpleKafkaSpecProducer implements
SpecProducer<Spec>, Closeable {
ConfigUtils.configToProperties(_config));
} catch (ClassNotFoundException | NoSuchMethodException |
IllegalAccessException | InstantiationException | InvocationTargetException e) {
log.error("Failed to instantiate Kafka consumer from class " +
_kafkaProducerClassName, e);
-
throw new RuntimeException("Failed to instantiate Kafka consumer", e);
}
}
@@ -156,4 +155,22 @@ public class SimpleKafkaSpecProducer implements
SpecProducer<Spec>, Closeable {
throw new RuntimeException("Unsupported spec type " + spec.getClass());
}
}
+
+ static class KafkaWriteCallback implements WriteCallback {
+ AvroJobSpec avroJobSpec;
+
+ KafkaWriteCallback(AvroJobSpec avroJobSpec) {
+ this.avroJobSpec = avroJobSpec;
+ }
+
+ @Override
+ public void onSuccess(Object result) {
+
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ log.error("Error while writing the following record to Kafka {}",
avroJobSpec.toString(), throwable);
+ }
+ }
}
\ No newline at end of file