Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2538#discussion_r165853046
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
    @@ -469,11 +437,14 @@ private boolean emitOrRetryTuple(ConsumerRecord<K, V> 
record) {
                 LOG.trace("Tuple for record [{}] has already been emitted. 
Skipping", record);
             } else {
                 final OffsetAndMetadata committedOffset = 
kafkaConsumer.committed(tp);
    -            if (committedOffset != null && 
isOffsetCommittedByThisTopology(tp, committedOffset)
    +            if (isAtLeastOnceProcessing()
    +                && committedOffset != null 
    +                && 
commitMetadataManager.isOffsetCommittedByThisTopology(tp, committedOffset, 
offsetManagers)
    --- End diff --
    
    Collections.unmodifiableMap(offsetManagers)


---

Reply via email to