[ 
https://issues.apache.org/jira/browse/STORM-2343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16044785#comment-16044785
 ] 

Stig Rohde Døssing commented on STORM-2343:
-------------------------------------------

[~ranganp] I spent a while thinking about this, but it seems to me to be 
something where there's a lot of corner cases to consider. Here's my best 
effort.
Regarding fixing STORM-2546:
The only way to know that a tuple has been deleted from Kafka is to try polling 
for it. We can know for sure that a failed tuple has been deleted if we seek to 
the failed tuple's offset (or earlier) on the relevant partition and poll, and 
we then encounter a tuple that has a higher offset than the failed tuple on 
that partition earlier in the result set.

For instance:
Offset 0...5 have failed and also been compacted away. Offset 6 has failed and 
is present, offset 7 has failed and is not present.
We seek to offset 0 for the partition.
If we then see that the first message in the poll result is offset 6, we can be 
sure that offset 0...5 are deleted, because otherwise they would have been 
returned in the poll. Offset 7 cannot be removed from the spout because we 
can't be sure that it was deleted, the consumer may just have received too few 
messages.

I believe we can also conclude that offsets have been removed if we seek to 
their offsets, poll and receive an empty result. I'm not entirely sure about 
this, but I don't think the consumer will return empty polls if there are more 
messages to consume.

I think we can use this method to remove failed, deleted tuples from the offset 
manager. When we do a poll, we examine the retriable tuples for each partition. 
For each partition where we received tuples, we compare the earliest received 
tuple to the retriable tuples for that partition. If the offset of a given 
retriable tuple is lower than the offset of the earliest received tuple, then 
the retriable tuple must have been deleted. 

About this issue:
The fact that failed tuples can be removed from Kafka before they can be 
retried is something I overlooked in what I wrote earlier. I think either 
solution can deal with it though.

One correction to what I wrote earlier regarding emitTupleIfNotEmitted 
filtering btw: We'll should also pause partitions in this solution IMO. 
Otherwise it is possible (even likely if there are few retriable partitions) to 
allow poll due to retriable tuples, and get no retriable tuples from the poll, 
in which case we'll discard all the messages and try again later. I think it 
would make that solution unacceptably wasteful (we'd risk multiple useless 
polls for unrelated partitions every time we have to retry a tuple while at the 
maxUncommittedOffsets limit), so we should pause nonretriable partitions.

The solutions I see to this issue right now are:

* Don't enforce maxUncommittedOffsets if there are retriable tuples at all. 
This is simple to implement, but I don't really have a good feeling for what 
the likelihood is that maxUncommittedOffsets will be exceeded by "too much".

Example of this functionality:
MaxUncommittedOffsets is 100
MaxPollRecords is 10
Committed offset for partition 0 and 1 is 0.
Partition 0 has emitted 0
Partition 1 has emitted 0...95, 97, 99, 101, 103 (some offsets compacted away)
Partition 1, message 97 is retriable
The spout seeks to message 97 and polls
It gets back offsets 99, 101, 103 and potentially 7 new tuples. Say the new 
tuples are in the range 104-110.
If any of 104-110 become retriable, the spout may emit another set of 9 
(maxPollRecords - 1) tuples.
This can repeat for each newly emitted set. The likelihood of this happening in 
real life is unclear to me.
----
* Enforce maxUncommittedOffsets globally by always allowing poll if there are 
retriable tuples, pause any non-retriable partitions if the spout has passed 
the maxUncommittedOffsets limit, and filter out fresh tuples from the poll 
result. This should work to enforce maxUncommittedOffsets. In order to avoid 
dropping messages, the consumer has to seek back to the earliest offset on each 
partition that was filtered out by this new check. As far as I can tell we 
won't be increasing the number of discarded tuples by an unreasonable number as 
long as we pause non-retriable partitions. This is because the spout will 
currently discard any acked or already emitted offset it receives in a poll. 
This solution will additionally discard those that are entirely new, which 
means they have to have a higher offset than the newest currently emitted tuple 
on the retried partition. It seems (assuming tuple failures are evenly 
distributed in the emitte set) more likely to me that most retries will happen 
somewhere "in the middle" of the currently emitted tuples. 

Example of this functionality:
MaxUncommittedOffsets is 100
MaxPollRecords is 10
Committed offset for partition 0 and 1 is 0.
Partition 0 has emitted 0
Partition 1 has emitted 0...95, 97, 99, 101, 103 (some offsets compacted away)
Partition 1, message 99 is retriable
We pause partition 0, seek to offset 99 on partition 1 and poll.
We get back offsets 99, 101, 103 and potentially 7 new tuples. Say the lowest 
of these is at offset 104.
We prefilter the offset list to remove acked, emitted and new tuples, leaving 
99. The 7 new tuples are filtered out.
The consumer seeks to offset 104 to pick up there on next poll.
The spout emits offset 99.

I'd like to highlight that the filtering solution only discards tuples when it 
gets new tuples back in a poll, so if the retriable tuple in the example had 
been e.g. 50, it would not have been unnecessarily discarding anything. 
----
* Enforce maxUncommittedOffsets on a per partition basis (i.e. actual limit 
will be multiplied by the number of partitions) by always allowing poll for 
retriable tuples that are within maxUncommittedOffsets tuples of the committed 
offset. Pause any non-retriable partitions if the spout has passed the 
maxUncommittedOffsets limit. There is some additional bookkeeping in this 
solution, because we have to know for each partition whether the 
maxUncommittedOffsets limit has been reached, and if so what the offset of the 
tuple at the limit is (e.g. if the limit is 10, we want to know the offset of 
the 10th tuple emitted after the current committed offset). I believe we should 
be able to get that information out of the acked and emitted sets in 
OffsetManager. 

Example of this functionality:
MaxUncommittedOffsets is 100
MaxPollRecords is 10
Committed offset for partition 0 and 1 is 0.
Partition 0 has emitted 0
Partition 1 has emitted 0...95, 97, 99, 101, 103 (some offsets compacted away)
Partition 1, message 99 is retriable
We check that message 99 is within 100 emitted tuples of offset 0 (it is the 
97th tuple after offset 0, so it is)
We pause partition 0, seek to offset 99 on partition 1 and poll
We get back offset 99, 101, 103 and potentially 7 new tuples. Say the lowest of 
these is at offset 104.
The spout emits offset 99, filters out 101 and 103 because they were already 
emitted, and emits the 7 new tuples.
If offset 104 (or later) become retriable, they are not retried until the 
committed offset moves. This is because offset 104 is the 101st tuple emitted 
after offset 0, so it isn't allowed to retry until the committed offset moves.

I think either solution of the last two solutions should work, and we should be 
able to implement the fix for STORM-2546 on top of either. I can't offhand say 
whether one solution is better than the other. I think it depends on the cost 
of discarding a few extra messages, vs. doing the extra bookkeeping for the 
other solution. 

I'd be happy if you would consider whether either of these solutions seem 
workable to you :)

> New Kafka spout can stop emitting tuples if more than maxUncommittedOffsets 
> tuples fail at once
> -----------------------------------------------------------------------------------------------
>
>                 Key: STORM-2343
>                 URL: https://issues.apache.org/jira/browse/STORM-2343
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 2.0.0, 1.1.0
>            Reporter: Stig Rohde Døssing
>            Assignee: Stig Rohde Døssing
>            Priority: Critical
>             Fix For: 2.0.0, 1.1.1
>
>          Time Spent: 14h 50m
>  Remaining Estimate: 0h
>
> It doesn't look like the spout is respecting maxUncommittedOffsets in all 
> cases. If the underlying consumer returns more records in a call to poll() 
> than maxUncommittedOffsets, they will all be added to waitingToEmit. Since 
> poll may return up to 500 records by default (Kafka 0.10.1.1), this is pretty 
> likely to happen with low maxUncommittedOffsets.
> The spout only checks for tuples to retry if it decides to poll, and it only 
> decides to poll if numUncommittedOffsets < maxUncommittedOffsets. Since 
> maxUncommittedOffsets isn't being respected when retrieving or emitting 
> records, numUncommittedOffsets can be much larger than maxUncommittedOffsets. 
> If more than maxUncommittedOffsets messages fail, this can cause the spout to 
> stop polling entirely.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to