Github user dfdemar commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2790#discussion_r207428218
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
    @@ -129,6 +140,27 @@ public void prepare(Map<String, Object> topoConf, 
TopologyContext context, Outpu
             return new KafkaProducer<>(props);
         }
     
    +    /**
    +     * Make the producer Callback. Using this Callback will also execute 
the user defined Callback, if provided.
    +     */
    +    protected Callback mkProducerCallback(final Tuple input) {
    +        return (ignored, e) -> {
    +            synchronized (collector) {
    +                if (e != null) {
    +                    collector.reportError(e);
    +                    collector.fail(input);
    +                } else {
    +                    collector.ack(input);
    +                }
    +
    +                // User defined Callback
    +                if (providedCallback != null) {
    --- End diff --
    
    The original behavior of the Callback is pretty important so we probably 
don't want a user-defined Callback to completely erase the original behavior. 
If the intent _**is**_ to totally redefine the Callback then one can simply 
subclass `KafkaBolt` and explicitly override `mkProducerCallback()`.


---

Reply via email to