Github user hmcl commented on a diff in the pull request:
https://github.com/apache/storm/pull/2537#discussion_r165830128
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
---
@@ -307,8 +307,12 @@ public void nextTuple() {
kafkaSpoutConfig.getSubscription().refreshAssignment();
}
- if (shouldCommit()) {
- commitOffsetsForAckedTuples(kafkaConsumer.assignment());
+ if (commitTimer != null && commitTimer.isExpiredResetOnTrue())
{ //commit timer is null for AT_MOST_ONCE mode
+ if (isAtLeastOnceProcessing()) {
+
commitOffsetsForAckedTuples(kafkaConsumer.assignment());
+ } else if (kafkaSpoutConfig.getProcessingGuarantee() ==
ProcessingGuarantee.NONE) {
--- End diff --
we can delete the if condition and leave only else because if timer!=null
the processing guarantee is either at-least-once or no-guarantee. If we want to
make it clear we can put a line comment.
---