Github user pershyn commented on the pull request:
https://github.com/apache/storm/pull/405#issuecomment-75567762
Hi Xin,
Thank you for the good finding and patch.
I have applied proposed patch on pure storm-0.9.3 branch, deployed and
testet, and indeed it seems to fix the issue by skipping failed offsets.
But, in case there are lots of failed messages that have outdated offset ->
the spout will slowly ask kafka for each of them and then discarding filed
message one at a time.
In logs this will look like following: (some log warnings records were
added for debugging reasons).
Note, that "new offset" doesn't change.
```
2015-02-23T14:19:30.080+0100 s.k.KafkaUtils [WARN] Got fetch request with
offset out of range: [58094410182]
2015-02-23T14:19:30.083+0100 s.k.PartitionManager [WARN] Using new offset:
58094849835
2015-02-23T14:19:30.083+0100 s.k.PartitionManager [WARN] Removing the
failed offset that is out of range: 58094410182
2015-02-23T14:19:30.186+0100 s.k.KafkaUtils [WARN] Got fetch request with
offset out of range: [58094410183]
2015-02-23T14:19:30.189+0100 s.k.PartitionManager [WARN] Using new offset:
58094849835
2015-02-23T14:19:30.189+0100 s.k.PartitionManager [WARN] Removing the
failed offset that is out of range: 58094410183
2015-02-23T14:19:30.291+0100 s.k.KafkaUtils [WARN] Got fetch request with
offset out of range: [58094411596]
2015-02-23T14:19:30.293+0100 s.k.PartitionManager [WARN] Using new offset:
58094849835
2015-02-23T14:19:30.293+0100 s.k.PartitionManager [WARN] Removing the
failed offset that is out of range: 58094411596
2015-02-23T14:19:30.396+0100 s.k.KafkaUtils [WARN] Got fetch request with
offset out of range: [58094411837]
```
So, I have added some logic to skip all the outdated failed offsets in one
step, see code snippet below:
```
//fix bug [STORM-643] : remove this offset from failed list
when it is OutOfRange
if (had_failed) {
// For the case of EarliestTime it would be better to
discard
// all the failed offsets, that are earlier than actual
EarliestTime
// offset, since they are anyway not there.
// These calls to broker API will be then saved.
// In case of LatestTime - it is a question, if we still
need to try out and
// reach those that are failed (they still may be
available).
// But, by moving to LatestTime we are discarding messages
in kafka queue.
// Since it is configured so, assume that it is ok for user
to loose information
// and user cares about newest messages first.
// It makes sense not to do exceptions for those that are
failed and discard them as well.
SortedSet<Long> omitted = failed.headSet(_emittedToOffset);
// Use tail, since sortedSet maintains its elements in
ascending order
// Using tailSet will set a 'range' on original
implementation
// so we couldn't then add objects that are out of range.
// For that reason we copy tail into new Set, where range
is not set.
failed = new
TreeSet<Long>(failed.tailSet(_emittedToOffset));
LOG.warn("Removing the failed offsets that are out of
range: {}", omitted);
}
```
So, then outdated offsets are skipped at once, and we save several
(hundreds) calls to kafka API:
```
2015-02-23T15:07:21.913+0100 s.k.KafkaUtils [WARN] Got fetch request with
offset out of range: [8786892024]
2015-02-23T15:07:21.915+0100 s.k.PartitionManager [WARN] Using new offset:
8789372723
2015-02-23T15:07:21.916+0100 s.k.PartitionManager [WARN] Removing the
failed offsets that are out of range: [8786892024, 8786892114, 8786892125,
8786892127, 8786892170, 8786892207, 8786892217, 8786892317, 8786892405,
8786892444, 8786892453, 8786892469, 8786892478, 8786892549, 8786892614,
8786892667, 8786892918, /* ... some omitted ... */ 8786977572, 8786977944,
8786986501, 8786991794, 8786991797, 8786994497, 8787001536]
2015-02-23T15:07:32.759+0100 s.k.ZkCoordinator [INFO] Task [7/8] Refreshing
partition manager connections
```
If you are agree with such improvement I would be happy if you extend the
pull request with this logic.
Best regards and Thanks,
Michael
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---