Xin Wang created STORM-645:
------------------------------
Summary: KafkaUtils repeat fetch messages which offset is out of
range
Key: STORM-645
URL: https://issues.apache.org/jira/browse/STORM-645
Project: Apache Storm
Issue Type: Bug
Components: storm-kafka
Affects Versions: 0.9.2-incubating, 0.9.3
Reporter: Xin Wang
Assignee: Xin Wang
KafkaUtils repeat fetch messages which offset is out of range.
This happened when failed list(SortedSet<Long> failed) is not empty and some
offset in it is OutOfRange.
[FIX]
storm.kafka.PartitionManager.fill():
...
try {
msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition,
offset);
} catch (UpdateOffsetException e) {
_emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic,
_partition.partition, _spoutConfig);
LOG.warn("Using new offset: {}", _emittedToOffset);
// fetch failed, so don't update the metrics
//fix bug: remove this offset from failed list when it is OutOfRange
if (had_failed) {
failed.remove(offset);
}
return;
}
...
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)