Yair Weinberger created STORM-579:
-------------------------------------

             Summary: Trident KafkaSpout throws RunTimeException when trying to 
re-emit a batch that is no longer in Kafka
                 Key: STORM-579
                 URL: https://issues.apache.org/jira/browse/STORM-579
             Project: Apache Storm
          Issue Type: Bug
          Components: storm-kafka
    Affects Versions: 0.9.2-incubating, 0.9.3
            Reporter: Yair Weinberger


There is some faulty code that is causing a batch to be retransmitted to 
infinity.
Kafka Spout re-emits the batch, and as intended behaviour, has no limit on how 
many times it will be re-emitted (which is OK).
At some point in the future, the offset of this batch no longer exists on Kafka.
Then the real action kicks in (code snippets are taken from the v0.9.2 tag) - 
Kafka Spout is using KafkaUtils.fetchMessages to get the batch from Kafka.
Now let us have a look at the relevant code from fetchMessages

if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && 
config.useStartOffsetTimeIfOffsetOutOfRange && errors == 0) {
  long startOffset = getOffset(consumer, topic, partitionId, 
config.startOffsetTime);
  LOG.warn("Got fetch request with offset out of range: [" + offset + "]; " +
  "retrying with default start offset time from configuration. " +
  "configured start offset time: [" + config.startOffsetTime + "] offset: [" + 
startOffset + "]");
  offset = startOffset;
}

So if the offset does not exist of Kafka anymore, we will fetch something with 
a different offset (not sure why this is a good idea). In practice, this will 
be much larger offset that originally tried to retrieve.
Now let us go back to the Kafka Spout code, now that it got some messages with 
a much larger offset than what it originally requested, the behaviour is really 
interesting.
for (MessageAndOffset msg : msgs) {
  if (offset == nextOffset) {
    break;
  }
if (offset > nextOffset) {
  throw new RuntimeException("Error when re-emitting batch. overshot the end 
offset");
}
  emit(collector, msg.message());
  offset = msg.nextOffset();
}
As you can see, at first, nothing touches the offset, so some random message 
from a different offset *is emitted*
Then, offset will be updated with nextOffset of current message which is of 
course is very large, which in the next entry to the loop will cause the 
"overshot the end offset" error.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to