Yair Weinberger created STORM-579:
-------------------------------------
Summary: Trident KafkaSpout throws RunTimeException when trying to
re-emit a batch that is no longer in Kafka
Key: STORM-579
URL: https://issues.apache.org/jira/browse/STORM-579
Project: Apache Storm
Issue Type: Bug
Components: storm-kafka
Affects Versions: 0.9.2-incubating, 0.9.3
Reporter: Yair Weinberger
There is some faulty code that is causing a batch to be retransmitted to
infinity.
Kafka Spout re-emits the batch, and as intended behaviour, has no limit on how
many times it will be re-emitted (which is OK).
At some point in the future, the offset of this batch no longer exists on Kafka.
Then the real action kicks in (code snippets are taken from the v0.9.2 tag) -
Kafka Spout is using KafkaUtils.fetchMessages to get the batch from Kafka.
Now let us have a look at the relevant code from fetchMessages
if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) &&
config.useStartOffsetTimeIfOffsetOutOfRange && errors == 0) {
long startOffset = getOffset(consumer, topic, partitionId,
config.startOffsetTime);
LOG.warn("Got fetch request with offset out of range: [" + offset + "]; " +
"retrying with default start offset time from configuration. " +
"configured start offset time: [" + config.startOffsetTime + "] offset: [" +
startOffset + "]");
offset = startOffset;
}
So if the offset does not exist of Kafka anymore, we will fetch something with
a different offset (not sure why this is a good idea). In practice, this will
be much larger offset that originally tried to retrieve.
Now let us go back to the Kafka Spout code, now that it got some messages with
a much larger offset than what it originally requested, the behaviour is really
interesting.
for (MessageAndOffset msg : msgs) {
if (offset == nextOffset) {
break;
}
if (offset > nextOffset) {
throw new RuntimeException("Error when re-emitting batch. overshot the end
offset");
}
emit(collector, msg.message());
offset = msg.nextOffset();
}
As you can see, at first, nothing touches the offset, so some random message
from a different offset *is emitted*
Then, offset will be updated with nextOffset of current message which is of
course is very large, which in the next entry to the loop will cause the
"overshot the end offset" error.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)