Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r147050462
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
    @@ -336,22 +335,25 @@ private void emit() {
         private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
             final TopicPartition tp = new TopicPartition(record.topic(), 
record.partition());
             final KafkaSpoutMessageId msgId = retryService.getMessageId(tp, 
record.offset());
    +
             if (offsetManagers.containsKey(tp) && 
offsetManagers.get(tp).contains(msgId)) {   // has been acked
                 LOG.trace("Tuple for record [{}] has already been acked. 
Skipping", record);
    -        } else if (emitted.contains(msgId)) {   // has been emitted and 
it's pending ack or fail
    +        } else if (emitted.contains(msgId)) {   // has been emitted and it 
is pending ack or fail
                 LOG.trace("Tuple for record [{}] has already been emitted. 
Skipping", record);
             } else {
    -            Validate.isTrue(kafkaConsumer.committed(tp) == null || 
kafkaConsumer.committed(tp).offset() < kafkaConsumer.position(tp),
    -                "The spout is about to emit a message that has already 
been committed."
    -                + " This should never occur, and indicates a bug in the 
spout");
    +            if (kafkaConsumer.committed(tp) != null && 
(kafkaConsumer.committed(tp).offset() >= kafkaConsumer.position(tp))) {
    --- End diff --
    
    Makes sense, thanks.


---

Reply via email to