[
https://issues.apache.org/jira/browse/STORM-2546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16152400#comment-16152400
]
Prasanna Ranganathan edited comment on STORM-2546 at 9/4/17 10:04 AM:
----------------------------------------------------------------------
{quote}
Auto offset reset is latest
* 0 or more acked offsets when reset, no pending retries when reset happens:
Example: 0-3 are acked, 0-1 are deleted. 10 is the latest offset.
Consumer will seek to latest offset (10). When new tuples are acked after
the reset, the OffsetManager will skip past all the unemitted tuples (4-9). I
don't think we need to do anything special for this case.
* 0 or more acked offsets when reset, earliest pending retry is deleted:
Example: 0-3 are acked, 0-4 are deleted. 4-5 are retriable and 10 is the
latest offset.
Consumer will seek to latest offset (10). The spout will handle acks as
described in the previous case. When the retriable tuples become ready, the
consumer will seek back to the earliest retriable offset (4). Since 4 is
deleted, the consumer seeks to latest. Since the spout asked for retriable
offset 4 and got 10 as the earliest, it will consider 4-9 deleted and mark them
as acked. While offset 5 is technically still possible to retry, I don't think
this is necessarily unexpected or bad behavior. I'd be okay with leaving this
behavior as is.
* 0 or more acked offsets when reset, earliest pending retry is not deleted:
Example: 0-3 are acked, 0-3 are deleted. 4-5 are retriable and 10 is the
latest offset.
Same as above, except when the retriable tuples become ready, the consumer
will seek back to the earliest retriable offset (4). The spout continues
processing from offset 4. This is a little inconsistent with the case above,
but I don't think it's bad behavior, so I'm okay with leaving it like this.
{quote}
This approach sounds reasonable for #1 & #3. It will be good if we can identify
the situations where offsets are being skipped and add appropriate logging.
Kafka client prints a log when the consumer offset seek is invalid.
For #2, I am wondering if we need to identify the smallest valid offset larger
than the invalid offset that the spout was trying to seek. In other words, we
would be 'bridging' the gap as with the fix built for STORM-2505. The reason is
that there could be a situation where a single deleted offset can result in the
spout skipping ahead a lot and potentially pass over many valid offsets without
even a single processing attempt. Need to think this over a bit more but feels
like we can handle this scenario in a cleaner manner if the Kafka Consumer had
an api to the effect of _seekNextValidOffset(Offset currentOffset)_
was (Author: ranganp):
{quote}
Auto offset reset is latest
* 0 or more acked offsets when reset, no pending retries when reset happens:
Example: 0-3 are acked, 0-1 are deleted. 10 is the latest offset.
Consumer will seek to latest offset (10). When new tuples are acked after
the reset, the OffsetManager will skip past all the unemitted tuples (4-9). I
don't think we need to do anything special for this case.
* 0 or more acked offsets when reset, earliest pending retry is deleted:
Example: 0-3 are acked, 0-4 are deleted. 4-5 are retriable and 10 is the
latest offset.
Consumer will seek to latest offset (10). The spout will handle acks as
described in the previous case. When the retriable tuples become ready, the
consumer will seek back to the earliest retriable offset (4). Since 4 is
deleted, the consumer seeks to latest. Since the spout asked for retriable
offset 4 and got 10 as the earliest, it will consider 4-9 deleted and mark them
as acked. While offset 5 is technically still possible to retry, I don't think
this is necessarily unexpected or bad behavior. I'd be okay with leaving this
behavior as is.
* 0 or more acked offsets when reset, earliest pending retry is not deleted:
Example: 0-3 are acked, 0-3 are deleted. 4-5 are retriable and 10 is the
latest offset.
Same as above, except when the retriable tuples become ready, the consumer
will seek back to the earliest retriable offset (4). The spout continues
processing from offset 4. This is a little inconsistent with the case above,
but I don't think it's bad behavior, so I'm okay with leaving it like this.
{quote}
This approach sounds reasonable for #1 & #3. It will be good if we can identify
the situations where offsets are being skipped and add appropriate logging.
Kafka client prints a log when the consumer offset seek is invalid.
For #2, I am wondering if we need to identify the smallest valid offset larger
than the invalid offset that the spout was trying to seek. In other words, we
would be 'bridging' the gap as with the fix built for STORM-2505. The reason is
that there could be a situation where a single deleted offset can result in the
spout skipping ahead a lot and potentially pass over many valid offsets without
even a single processing attempt. Need to think this over a bit more but feels
like we can handle this scenario in a cleaner manner if the Kafka Consumer had
an api to the effect of seekNextValidOffset(Offset currentOffset)
> Kafka spout can stall / get stuck due to edge case with failing tuples
> ----------------------------------------------------------------------
>
> Key: STORM-2546
> URL: https://issues.apache.org/jira/browse/STORM-2546
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-kafka-client
> Affects Versions: 2.0.0, 1.x
> Reporter: Prasanna Ranganathan
> Assignee: Stig Rohde Døssing
> Time Spent: 10m
> Remaining Estimate: 0h
>
> The mechanism for replaying a failed tuple involves seeking the kafka
> consumer to the failing offset and then re-emitting it into the topology. A
> tuple, when emitted the first time, will have an entry created in
> OffsetManager. This entry will be removed only after the tuple is
> successfully acknowledged and its offset successfully committed. Till then,
> commits for offsets beyond the failing offset for that TopicPartition will be
> blocked.
> It is possible that when the spout seeks the consumer to the failing offset,
> the corresponding kafka message is not returned in the poll response. This
> can happen due to that offset being deleted or compacted away. In this
> scenario that partition will be blocked from committing and progressing.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)