Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2538#discussion_r165852737
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
    @@ -469,11 +437,14 @@ private boolean emitOrRetryTuple(ConsumerRecord<K, V> 
record) {
                 LOG.trace("Tuple for record [{}] has already been emitted. 
Skipping", record);
             } else {
                 final OffsetAndMetadata committedOffset = 
kafkaConsumer.committed(tp);
    -            if (committedOffset != null && 
isOffsetCommittedByThisTopology(tp, committedOffset)
    +            if (isAtLeastOnceProcessing()
    +                && committedOffset != null 
    +                && 
commitMetadataManager.isOffsetCommittedByThisTopology(tp, committedOffset, 
offsetManagers)
                     && committedOffset.offset() > record.offset()) {
                     // Ensures that after a topology with this id is started, 
the consumer fetch
                     // position never falls behind the committed offset 
(STORM-2844)
    -                throw new IllegalStateException("Attempting to emit a 
message that has already been committed.");
    +                throw new IllegalStateException("Attempting to emit a 
message that has already been committed."
    +                    + " This should never occur in at-least-once mode.");
    --- End diff --
    
    for at-least-once semantics.


---

Reply via email to