Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2537#discussion_r165852132
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
    @@ -519,6 +519,15 @@ private boolean isEmitTuple(List<Object> tuple) {
             return tuple != null || kafkaSpoutConfig.isEmitNullTuples();
         }
     
    +    private void commitConsumedOffsets(Set<TopicPartition> 
assignedPartitions) {
    +        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new 
HashMap<>();
    +        for (TopicPartition tp : assignedPartitions) {
    +            offsetsToCommit.put(tp, new 
OffsetAndMetadata(kafkaConsumer.position(tp)));
    +        }
    +        kafkaConsumer.commitAsync(offsetsToCommit, null);
    --- End diff --
    
    agree


---

Reply via email to