[ 
https://issues.apache.org/jira/browse/STORM-2546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16151172#comment-16151172
 ] 

Stig Rohde Døssing edited comment on STORM-2546 at 9/2/17 6:10 PM:
-------------------------------------------------------------------

Here's my suggestion for a fix, which depends on the changes in 
https://issues.apache.org/jira/browse/STORM-2549:
The only way to know that a tuple has been deleted from Kafka is to try polling 
for it. We can know for sure that a failed tuple has been deleted if we seek to 
the failed tuple's offset (or earlier) on the relevant partition and poll, and 
we then encounter a tuple that has a higher offset than the failed tuple on 
that partition earlier in the result set.

For instance:
Offset 0...5 have failed and also been compacted away. Offset 6 has failed and 
is present, offset 7 has failed and is not present.
We seek to offset 0 for the partition.
If we then see that the first message in the poll result is offset 6, we can be 
sure that offset 0...5 are deleted, because otherwise they would have been 
returned in the poll. Offset 7 cannot be removed from the spout because we 
can't be sure that it was deleted, the consumer may just have received too few 
messages.

I think we can use this method to remove failed, deleted tuples from the offset 
manager. When we do a poll, we examine the retriable offsets for each 
partition. For each partition where we received messages, we compare the 
earliest received message's offset to the retriable offsets for that partition. 
If a given retriable offset is lower than the offset of the earliest received 
message, then the retriable offset must have been deleted. 


was (Author: srdo):
Here's my suggestion for a fix, which depends on the changes in 
https://issues.apache.org/jira/browse/STORM-2549:
The only way to know that a tuple has been deleted from Kafka is to try polling 
for it. We can know for sure that a failed tuple has been deleted if we seek to 
the failed tuple's offset (or earlier) on the relevant partition and poll, and 
we then encounter a tuple that has a higher offset than the failed tuple on 
that partition earlier in the result set.

For instance:
Offset 0...5 have failed and also been compacted away. Offset 6 has failed and 
is present, offset 7 has failed and is not present.
We seek to offset 0 for the partition.
If we then see that the first message in the poll result is offset 6, we can be 
sure that offset 0...5 are deleted, because otherwise they would have been 
returned in the poll. Offset 7 cannot be removed from the spout because we 
can't be sure that it was deleted, the consumer may just have received too few 
messages.

I believe we can also conclude that offsets have been removed if we seek to 
their offsets, poll and receive an empty result. I'm not entirely sure about 
this, but I don't think the consumer will return empty polls if there are more 
messages to consume.

I think we can use this method to remove failed, deleted tuples from the offset 
manager. When we do a poll, we examine the retriable offsets for each 
partition. For each partition where we received messages, we compare the 
earliest received message's offset to the retriable offsets for that partition. 
If a given retriable offset is lower than the offset of the earliest received 
message, then the retriable offset must have been deleted. 

> Kafka spout can stall / get stuck due to edge case with failing tuples
> ----------------------------------------------------------------------
>
>                 Key: STORM-2546
>                 URL: https://issues.apache.org/jira/browse/STORM-2546
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 2.0.0, 1.x
>            Reporter: Prasanna Ranganathan
>            Assignee: Stig Rohde Døssing
>
> The mechanism for replaying a failed tuple involves seeking the kafka 
> consumer to the failing offset and then re-emitting it into the topology. A 
> tuple, when emitted the first time, will have an entry created in 
> OffsetManager. This entry will be removed only after the tuple is 
> successfully acknowledged and its offset successfully committed. Till then, 
> commits for offsets beyond the failing offset for that TopicPartition will be 
> blocked.
> It is possible that when the spout seeks the consumer to the failing offset, 
> the corresponding kafka message is not returned in the poll response. This 
> can happen due to that offset being deleted or compacted away. In this 
> scenario that partition will be blocked from committing and progressing.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to