Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2537#discussion_r165852132 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -519,6 +519,15 @@ private boolean isEmitTuple(List<Object> tuple) { return tuple != null || kafkaSpoutConfig.isEmitNullTuples(); } + private void commitConsumedOffsets(Set<TopicPartition> assignedPartitions) { + Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(); + for (TopicPartition tp : assignedPartitions) { + offsetsToCommit.put(tp, new OffsetAndMetadata(kafkaConsumer.position(tp))); + } + kafkaConsumer.commitAsync(offsetsToCommit, null); --- End diff -- agree
---