Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2454#discussion_r156224126
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
    @@ -395,7 +387,7 @@ private boolean emitOrRetryTuple(ConsumerRecord<K, V> 
record) {
             } else if (emitted.contains(msgId)) {   // has been emitted and it 
is pending ack or fail
                 LOG.trace("Tuple for record [{}] has already been emitted. 
Skipping", record);
             } else {
    -            if (kafkaConsumer.committed(tp) != null && 
(kafkaConsumer.committed(tp).offset() >= kafkaConsumer.position(tp))) {
    +            if (kafkaConsumer.committed(tp) != null && 
(kafkaConsumer.committed(tp).offset() > kafkaConsumer.position(tp))) {
    --- End diff --
    
    OK, I agree. Because initially I thought that this could be a potential 
fix, but then found out that it wouldn't work. I was wondering if I had missed 
anything. Thanks.


---

Reply via email to