[
https://issues.apache.org/jira/browse/STORM-2549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Stig Rohde Døssing updated STORM-2549:
--------------------------------------
Component/s: storm-kafka-client
> The fix for STORM-2343 is incomplete, and the spout can still get stuck on
> failed tuples
> ----------------------------------------------------------------------------------------
>
> Key: STORM-2549
> URL: https://issues.apache.org/jira/browse/STORM-2549
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-kafka-client
> Affects Versions: 2.0.0, 1.1.0
> Reporter: Stig Rohde Døssing
> Assignee: Stig Rohde Døssing
> Time Spent: 2h
> Remaining Estimate: 0h
>
> Example:
> Say maxUncommittedOffsets is 10, maxPollRecords is 5, and the committedOffset
> is 0.
> The spout will initially emit up to offset 10, because it is allowed to poll
> until numNonRetriableTuples is >= maxUncommittedOffsets
> The spout will be allowed to emit another 5 tuples if offset 10 fails, so if
> that happens, offsets 10-14 will get emitted. If offset 1 fails and 2-14 get
> acked, the spout gets stuck because it will count the "extra tuples" 11-14 in
> numNonRetriableTuples.
> An similar case is the one where maxPollRecords doesn't divide
> maxUncommittedOffsets evenly. If it were 3 in the example above, the spout
> might just immediately emit offsets 1-12. If 2-12 get acked, offset 1 cannot
> be reemitted.
> The proposed solution is the following:
> * Enforce maxUncommittedOffsets on a per partition basis (i.e. actual limit
> will be multiplied by the number of partitions) by always allowing poll for
> retriable tuples that are within maxUncommittedOffsets tuples of the
> committed offset. Pause any non-retriable partitions if the partition has
> passed the maxUncommittedOffsets limit, and some other partition is polling
> for retries while also at the maxUncommittedOffsets limit.
> Example of this functionality:
> MaxUncommittedOffsets is 100
> MaxPollRecords is 10
> Committed offset for partition 0 and 1 is 0.
> Partition 0 has emitted 0
> Partition 1 has emitted 0...95, 97, 99, 101, 103 (some offsets compacted away)
> Partition 1, message 99 is retriable
> We check that message 99 is within 100 emitted tuples of offset 0 (it is the
> 97th tuple after offset 0, so it is)
> We do not pause partition 0 because that partition isn't at the
> maxUncommittedOffsets limit.
> Seek to offset 99 on partition 1 and poll
> We get back offset 99, 101, 103 and potentially 7 new tuples. Say the lowest
> of these is at offset 104.
> The spout emits offset 99, filters out 101 and 103 because they were already
> emitted, and emits the 7 new tuples.
> If offset 104 (or later) become retriable, they are not retried until the
> committed offset moves. This is because offset 104 is the 101st tuple emitted
> after offset 0, so it isn't allowed to retry until the committed offset moves.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)