[
https://issues.apache.org/jira/browse/STORM-2343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16041522#comment-16041522
]
Stig Rohde Døssing commented on STORM-2343:
-------------------------------------------
[~ranganp] You are right. I missed some cases.
Just to illustrate:
Say maxUncommittedOffsets is 10, maxPollRecords is 5, and the committedOffset
is 0.
The spout will initially emit up to offset 10, because it is allowed to poll
until numNonRetriableTuples is >= maxUncommittedOffsets
The spout will be allowed to emit another 5 tuples if offset 10 fails, so if
that happens, offsets 10-14 will get emitted. If offset 1 fails and 2-14 get
acked, the spout gets stuck because it will count the "extra tuples" 11-14 in
numNonRetriableTuples.
An similar case is the one where maxPollRecords doesn't divide
maxUncommittedOffsets evenly. If it were 3 in the example above, the spout
might just immediately emit offsets 1-12. If 2-12 get acked, offset 1 cannot be
reemitted.
Your suggestion would solve this, but also means that we can't meaningfully cap
numUncommittedOffsets. If we allow a poll whenever there's a failed tuple, the
spout might emit an arbitrary number of tuples if tuples fail in an unlucky
order.
For example, take the case where maxUncommittedOffsets is 10, maxPollRecords is
5, and the committedOffset is 0, and 10 tuples have been emitted.
If offset 10 fails and is retried, 10-14 may be emitted. If 14 fails, 14-19 get
emitted. This can be repeated as many times as needed. The issue is more likely
to happen when multiple partitions are involved, because then you might have a
failed tuple on partition 0 cause the spout to emit fresh tuples on partition
1, regardless of how far past the limit partition 1 already is.
I'm torn on whether this kind of limit violation is really serious enough to
matter. It's possible that we run a lot past maxUncommittedOffsets, but it
doesn't seem likely that the bad tuple failure pattern keeps repeating. It's
just a gut feeling though, which is why I made an attempt to put a real cap on
maxUncommittedOffsets in the linked PRs.
If it is serious enough that we want to fix it I think there are two decent
approaches.
One is to do what you mention and filter out the fresh tuples we happen to also
receive when we poll for failed tuples. The downside is that in order to avoid
losing tuples, we have to seek the consumer back to the first tuple we filtered
out for each partition, or we have to be able to emit only the retriable tuples
while keeping the rest in memory for later.
The other option would be to always allow retries (per partition) only for
those tuples that are within maxUncommittedOffsets tuples of committedOffsets.
For example, if the maxUncommittedOffsets is 10, maxPollRecords is 5, and the
committedOffset is 0, and 14 tuples have been emitted, we only allow retries
for the partition if the earliest retriable tuple is one of the first 10. The
point would be that we never fail to retry tuples within maxUncommittedOffsets
of the committed offset, but we don't retry tuples that are beyond that
boundary until the committed offset moves, so we can cap numUncommittedOffsets.
The downsides to this are we have to do this evaluation per partition, and we
might have to pause nonretriable partitions when we're beyond
maxUncommittedOffsets to ensure that we actually emit the retriable tuples and
not just some new ones on unrelated partitions.
I don't really have a particular preference here. What do you think?
> New Kafka spout can stop emitting tuples if more than maxUncommittedOffsets
> tuples fail at once
> -----------------------------------------------------------------------------------------------
>
> Key: STORM-2343
> URL: https://issues.apache.org/jira/browse/STORM-2343
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-kafka-client
> Affects Versions: 2.0.0, 1.1.0
> Reporter: Stig Rohde Døssing
> Assignee: Stig Rohde Døssing
> Priority: Critical
> Fix For: 2.0.0, 1.1.1
>
> Time Spent: 14h 50m
> Remaining Estimate: 0h
>
> It doesn't look like the spout is respecting maxUncommittedOffsets in all
> cases. If the underlying consumer returns more records in a call to poll()
> than maxUncommittedOffsets, they will all be added to waitingToEmit. Since
> poll may return up to 500 records by default (Kafka 0.10.1.1), this is pretty
> likely to happen with low maxUncommittedOffsets.
> The spout only checks for tuples to retry if it decides to poll, and it only
> decides to poll if numUncommittedOffsets < maxUncommittedOffsets. Since
> maxUncommittedOffsets isn't being respected when retrieving or emitting
> records, numUncommittedOffsets can be much larger than maxUncommittedOffsets.
> If more than maxUncommittedOffsets messages fail, this can cause the spout to
> stop polling entirely.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)