[
https://issues.apache.org/jira/browse/STORM-643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14502768#comment-14502768
]
ASF GitHub Bot commented on STORM-643:
--------------------------------------
Github user tpiscitell commented on the pull request:
https://github.com/apache/storm/pull/405#issuecomment-94448772
@miguno I don't think either of those JIRAs fixed this issue. The problem
here is that PartitionManager.fill() will always attempt to fetch any failed
tuples first:
```
// Are there failed tuples? If so, fetch those first.
if (had_failed) {
offset = failed.first();
} else {
offset = _emittedToOffset;
}
```
However, even with the patches for those two JIRAs, the `failed` list is
never pruned. Instead PartitionManager.fill() just update `_emitedToOffset` and
returns:
```
} catch (TopicOffsetOutOfRangeException e) {
_emittedToOffset = KafkaUtils.getOffset(_consumer,
_spoutConfig.topic, _partition.partition, _spoutConfig);
LOG.warn("Using new offset: {}", _emittedToOffset);
// fetch failed, so don't update the metrics
return;
}
```
Just to be called again, and the process repeats:
```
public EmitState next(SpoutOutputCollector collector) {
if (_waitingToEmit.isEmpty()) {
fill();
}
```
> KafkaUtils repeatedly fetches messages whose offset is out of range
> -------------------------------------------------------------------
>
> Key: STORM-643
> URL: https://issues.apache.org/jira/browse/STORM-643
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-kafka
> Affects Versions: 0.9.2-incubating, 0.9.3
> Reporter: Xin Wang
> Assignee: Xin Wang
> Priority: Minor
>
> KafkaUtils repeat fetch messages which offset is out of range.
> This happened when failed list(SortedSet<Long> failed) is not empty and some
> offset in it is OutOfRange.
> {code}
> [worker-log]
> 2015-02-01 10:24:27.231+0800 s.k.KafkaUtils [WARN] Got fetch request with
> offset out of range: [20919071816]; retrying with default start offset time
> from configuration. configured start offset time: [-2]
> 2015-02-01 10:24:27.232+0800 s.k.PartitionManager [WARN] Using new offset:
> 20996130717
> 2015-02-01 10:24:27.333+0800 s.k.KafkaUtils [WARN] Got fetch request with
> offset out of range: [20919071816]; retrying with default start offset time
> from configuration. configured start offset time: [-2]
> 2015-02-01 10:24:27.334+0800 s.k.PartitionManager [WARN] Using new offset:
> 20996130717
> ...
> {code}
> [FIX]
> {code}
> storm.kafka.PartitionManager.fill():
> ...
> try {
> msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition,
> offset);
> } catch (UpdateOffsetException e) {
> _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic,
> _partition.partition, _spoutConfig);
> LOG.warn("Using new offset: {}", _emittedToOffset);
> // fetch failed, so don't update the metrics
> //fix bug: remove this offset from failed list when it is OutOfRange
> if (had_failed) {
> failed.remove(offset);
> }
> return;
> }
> ...
> {code}
> also: Log "retrying with default start offset time from configuration.
> configured start offset time: [-2]" is incorrect.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)