[
https://issues.apache.org/jira/browse/STORM-643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14632691#comment-14632691
]
ASF GitHub Bot commented on STORM-643:
--------------------------------------
Github user HeartSaVioR commented on the pull request:
https://github.com/apache/storm/pull/642#issuecomment-122625519
@vesense
Seems like your patch removes one invalid offset per each fill().
We may apply same optimization here.
When TopicOffsetOutOfRangeException occurs, we can remove invalid offsets
from ExponentialBackoffMsgRetryManager at once.
Though we should loop and examine each records to find them, it would be
better than current.
> KafkaUtils repeatedly fetches messages whose offset is out of range
> -------------------------------------------------------------------
>
> Key: STORM-643
> URL: https://issues.apache.org/jira/browse/STORM-643
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-kafka
> Affects Versions: 0.9.2-incubating, 0.9.3, 0.10.0, 0.9.4, 0.9.5
> Reporter: Xin Wang
> Assignee: Xin Wang
> Fix For: 0.9.6
>
>
> KafkaUtils repeat fetch messages which offset is out of range.
> This happened when failed list(SortedSet<Long> failed) is not empty and some
> offset in it is OutOfRange.
> [worker-log]
> {code}
> 2015-02-01 10:24:27.231+0800 s.k.KafkaUtils [WARN] Got fetch request with
> offset out of range: [20919071816]; retrying with default start offset time
> from configuration. configured start offset time: [-2]
> 2015-02-01 10:24:27.232+0800 s.k.PartitionManager [WARN] Using new offset:
> 20996130717
> 2015-02-01 10:24:27.333+0800 s.k.KafkaUtils [WARN] Got fetch request with
> offset out of range: [20919071816]; retrying with default start offset time
> from configuration. configured start offset time: [-2]
> 2015-02-01 10:24:27.334+0800 s.k.PartitionManager [WARN] Using new offset:
> 20996130717
> ...
> {code}
> [FIX]
> {code}
> storm.kafka.PartitionManager.fill():
> ...
> try {
> msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition,
> offset);
> } catch (UpdateOffsetException e) {
> _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic,
> _partition.partition, _spoutConfig);
> LOG.warn("Using new offset: {}", _emittedToOffset);
> // fetch failed, so don't update the metrics
> //fix bug: remove this offset from failed list when it is OutOfRange
> if (had_failed) {
> failed.remove(offset);
> }
> return;
> }
> ...
> {code}
> also: Log "retrying with default start offset time from configuration.
> configured start offset time: [-2]" is incorrect.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)