[
https://issues.apache.org/jira/browse/STORM-2546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16151172#comment-16151172
]
Stig Rohde Døssing edited comment on STORM-2546 at 9/2/17 6:10 PM:
-------------------------------------------------------------------
Here's my suggestion for a fix, which depends on the changes in
https://issues.apache.org/jira/browse/STORM-2549:
The only way to know that a tuple has been deleted from Kafka is to try polling
for it. We can know for sure that a failed tuple has been deleted if we seek to
the failed tuple's offset (or earlier) on the relevant partition and poll, and
we then encounter a tuple that has a higher offset than the failed tuple on
that partition earlier in the result set.
For instance:
Offset 0...5 have failed and also been compacted away. Offset 6 has failed and
is present, offset 7 has failed and is not present.
We seek to offset 0 for the partition.
If we then see that the first message in the poll result is offset 6, we can be
sure that offset 0...5 are deleted, because otherwise they would have been
returned in the poll. Offset 7 cannot be removed from the spout because we
can't be sure that it was deleted, the consumer may just have received too few
messages.
I think we can use this method to remove failed, deleted tuples from the offset
manager. When we do a poll, we examine the retriable offsets for each
partition. For each partition where we received messages, we compare the
earliest received message's offset to the retriable offsets for that partition.
If a given retriable offset is lower than the offset of the earliest received
message, then the retriable offset must have been deleted.
was (Author: srdo):
Here's my suggestion for a fix, which depends on the changes in
https://issues.apache.org/jira/browse/STORM-2549:
The only way to know that a tuple has been deleted from Kafka is to try polling
for it. We can know for sure that a failed tuple has been deleted if we seek to
the failed tuple's offset (or earlier) on the relevant partition and poll, and
we then encounter a tuple that has a higher offset than the failed tuple on
that partition earlier in the result set.
For instance:
Offset 0...5 have failed and also been compacted away. Offset 6 has failed and
is present, offset 7 has failed and is not present.
We seek to offset 0 for the partition.
If we then see that the first message in the poll result is offset 6, we can be
sure that offset 0...5 are deleted, because otherwise they would have been
returned in the poll. Offset 7 cannot be removed from the spout because we
can't be sure that it was deleted, the consumer may just have received too few
messages.
I believe we can also conclude that offsets have been removed if we seek to
their offsets, poll and receive an empty result. I'm not entirely sure about
this, but I don't think the consumer will return empty polls if there are more
messages to consume.
I think we can use this method to remove failed, deleted tuples from the offset
manager. When we do a poll, we examine the retriable offsets for each
partition. For each partition where we received messages, we compare the
earliest received message's offset to the retriable offsets for that partition.
If a given retriable offset is lower than the offset of the earliest received
message, then the retriable offset must have been deleted.
> Kafka spout can stall / get stuck due to edge case with failing tuples
> ----------------------------------------------------------------------
>
> Key: STORM-2546
> URL: https://issues.apache.org/jira/browse/STORM-2546
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-kafka-client
> Affects Versions: 2.0.0, 1.x
> Reporter: Prasanna Ranganathan
> Assignee: Stig Rohde Døssing
>
> The mechanism for replaying a failed tuple involves seeking the kafka
> consumer to the failing offset and then re-emitting it into the topology. A
> tuple, when emitted the first time, will have an entry created in
> OffsetManager. This entry will be removed only after the tuple is
> successfully acknowledged and its offset successfully committed. Till then,
> commits for offsets beyond the failing offset for that TopicPartition will be
> blocked.
> It is possible that when the spout seeks the consumer to the failing offset,
> the corresponding kafka message is not returned in the poll response. This
> can happen due to that offset being deleted or compacted away. In this
> scenario that partition will be blocked from committing and progressing.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)