[ 
https://issues.apache.org/jira/browse/STORM-2546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16152400#comment-16152400
 ] 

Prasanna Ranganathan edited comment on STORM-2546 at 9/4/17 10:02 AM:
----------------------------------------------------------------------

{quote}
Auto offset reset is latest
* 0 or more acked offsets when reset, no pending retries when reset happens:
    Example: 0-3 are acked, 0-1 are deleted. 10 is the latest offset.
   Consumer will seek to latest offset (10). When new tuples are acked after 
the reset, the OffsetManager will skip past all the unemitted tuples (4-9). I 
don't think we need to do anything special for this case.

* 0 or more acked offsets when reset, earliest pending retry is deleted:
   Example: 0-3 are acked, 0-4 are deleted. 4-5 are retriable and 10 is the 
latest offset.
   Consumer will seek to latest offset (10). The spout will handle acks as 
described in the previous case. When the retriable tuples become ready, the 
consumer will seek back to the earliest retriable offset (4). Since 4 is 
deleted, the consumer seeks to latest. Since the spout asked for retriable 
offset 4 and got 10 as the earliest, it will consider 4-9 deleted and mark them 
as acked. While offset 5 is technically still possible to retry, I don't think 
this is necessarily unexpected or bad behavior. I'd be okay with leaving this 
behavior as is.

*  0 or more acked offsets when reset, earliest pending retry is not deleted:
   Example: 0-3 are acked, 0-3 are deleted. 4-5 are retriable and 10 is the 
latest offset.
   Same as above, except when the retriable tuples become ready, the consumer 
will seek back to the earliest retriable offset (4). The spout continues 
processing from offset 4. This is a little inconsistent with the case above, 
but I don't think it's bad behavior, so I'm okay with leaving it like this.
{quote}

This approach sounds reasonable for #1 & #3. It will be good if we can identify 
the situations where offsets are being skipped and add appropriate logging. 
Kafka client prints a log when the consumer offset seek is invalid.

For #2, I am wondering if we need to identify the smallest valid offset larger 
than the invalid offset that the spout was trying to seek. In other words, we 
would be 'bridging' the gap as with the fix built for STORM-2505. The reason is 
that there could be a situation where a single deleted offset can result in the 
spout skipping ahead a lot and potentially pass over many valid offsets without 
even a single processing attempt. Need to think this over a bit more but feels 
like we can handle this scenario in a cleaner manner if the Kafka Consumer had 
an api to the effect of seekNextValidOffset()


was (Author: ranganp):
{quote}
Auto offset reset is latest
* 0 or more acked offsets when reset, no pending retries when reset happens:
    Example: 0-3 are acked, 0-1 are deleted. 10 is the latest offset.
   Consumer will seek to latest offset (10). When new tuples are acked after 
the reset, the OffsetManager will skip past all the unemitted tuples (4-9). I 
don't think we need to do anything special for this case.

* 0 or more acked offsets when reset, earliest pending retry is deleted:
   Example: 0-3 are acked, 0-4 are deleted. 4-5 are retriable and 10 is the 
latest offset.
   Consumer will seek to latest offset (10). The spout will handle acks as 
described in the previous case. When the retriable tuples become ready, the 
consumer will seek back to the earliest retriable offset (4). Since 4 is 
deleted, the consumer seeks to latest. Since the spout asked for retriable 
offset 4 and got 10 as the earliest, it will consider 4-9 deleted and mark them 
as acked. While offset 5 is technically still possible to retry, I don't think 
this is necessarily unexpected or bad behavior. I'd be okay with leaving this 
behavior as is.

*  0 or more acked offsets when reset, earliest pending retry is not deleted:
   Example: 0-3 are acked, 0-3 are deleted. 4-5 are retriable and 10 is the 
latest offset.
   Same as above, except when the retriable tuples become ready, the consumer 
will seek back to the earliest retriable offset (4). The spout continues 
processing from offset 4. This is a little inconsistent with the case above, 
but I don't think it's bad behavior, so I'm okay with leaving it like this.
{quote}

This approach sounds reasonable. It will be good if we can identify the 
situations where offsets are being skipped and add appropriate logging. Kafka 
client prints a log when the consumer offset seek is invalid.

> Kafka spout can stall / get stuck due to edge case with failing tuples
> ----------------------------------------------------------------------
>
>                 Key: STORM-2546
>                 URL: https://issues.apache.org/jira/browse/STORM-2546
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 2.0.0, 1.x
>            Reporter: Prasanna Ranganathan
>            Assignee: Stig Rohde Døssing
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> The mechanism for replaying a failed tuple involves seeking the kafka 
> consumer to the failing offset and then re-emitting it into the topology. A 
> tuple, when emitted the first time, will have an entry created in 
> OffsetManager. This entry will be removed only after the tuple is 
> successfully acknowledged and its offset successfully committed. Till then, 
> commits for offsets beyond the failing offset for that TopicPartition will be 
> blocked.
> It is possible that when the spout seeks the consumer to the failing offset, 
> the corresponding kafka message is not returned in the poll response. This 
> can happen due to that offset being deleted or compacted away. In this 
> scenario that partition will be blocked from committing and progressing.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to