Github user hmcl commented on a diff in the pull request:
https://github.com/apache/storm/pull/2538#discussion_r165853240
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
---
@@ -311,7 +273,10 @@ public void nextTuple() {
if (isAtLeastOnceProcessing()) {
commitOffsetsForAckedTuples(kafkaConsumer.assignment());
} else if (kafkaSpoutConfig.getProcessingGuarantee() ==
ProcessingGuarantee.NONE) {
- commitConsumedOffsets(kafkaConsumer.assignment());
+ Map<TopicPartition, OffsetAndMetadata> offsetsToCommit
=
+
createOffsetsToCommitForConsumedOffsets(kafkaConsumer.assignment());
+ kafkaConsumer.commitAsync(offsetsToCommit, null);
--- End diff --
createFetchedOffsetsMetadata
---