Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2537#discussion_r165830128
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
    @@ -307,8 +307,12 @@ public void nextTuple() {
                     kafkaSpoutConfig.getSubscription().refreshAssignment();
                 }
     
    -            if (shouldCommit()) {
    -                commitOffsetsForAckedTuples(kafkaConsumer.assignment());
    +            if (commitTimer != null && commitTimer.isExpiredResetOnTrue()) 
{ //commit timer is null for AT_MOST_ONCE mode
    +                if (isAtLeastOnceProcessing()) {
    +                    
commitOffsetsForAckedTuples(kafkaConsumer.assignment());
    +                } else if (kafkaSpoutConfig.getProcessingGuarantee() == 
ProcessingGuarantee.NONE) {
    --- End diff --
    
    we can delete the if condition and leave only else because if timer!=null 
the processing guarantee is either at-least-once or no-guarantee. If we want to 
make it clear we can put a line comment.


---

Reply via email to