Github user dfdemar commented on a diff in the pull request:
https://github.com/apache/storm/pull/2790#discussion_r207428218
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
---
@@ -129,6 +140,27 @@ public void prepare(Map<String, Object> topoConf,
TopologyContext context, Outpu
return new KafkaProducer<>(props);
}
+ /**
+ * Make the producer Callback. Using this Callback will also execute
the user defined Callback, if provided.
+ */
+ protected Callback mkProducerCallback(final Tuple input) {
+ return (ignored, e) -> {
+ synchronized (collector) {
+ if (e != null) {
+ collector.reportError(e);
+ collector.fail(input);
+ } else {
+ collector.ack(input);
+ }
+
+ // User defined Callback
+ if (providedCallback != null) {
--- End diff --
The original behavior of the Callback is pretty important so we probably
don't want a user-defined Callback to completely erase the original behavior.
If the intent _**is**_ to totally redefine the Callback then one can simply
subclass `KafkaBolt` and explicitly override `mkProducerCallback()`.
---