Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2537#discussion_r165830128 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -307,8 +307,12 @@ public void nextTuple() { kafkaSpoutConfig.getSubscription().refreshAssignment(); } - if (shouldCommit()) { - commitOffsetsForAckedTuples(kafkaConsumer.assignment()); + if (commitTimer != null && commitTimer.isExpiredResetOnTrue()) { //commit timer is null for AT_MOST_ONCE mode + if (isAtLeastOnceProcessing()) { + commitOffsetsForAckedTuples(kafkaConsumer.assignment()); + } else if (kafkaSpoutConfig.getProcessingGuarantee() == ProcessingGuarantee.NONE) { --- End diff -- we can delete the if condition and leave only else because if timer!=null the processing guarantee is either at-least-once or no-guarantee. If we want to make it clear we can put a line comment.
---