[ 
https://issues.apache.org/jira/browse/STORM-2343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16040880#comment-16040880
 ] 

Prasanna Ranganathan commented on STORM-2343:
---------------------------------------------

[~Srdo] For a tuple to be retried, it needs to be emitted again which can only 
happen if a kafka consumer poll is called. Even with this fix I feel it is 
possible for the spout to get stuck if a specific tuple fails repeatedly and a 
lot of subsequent ones succeed, get acked but are blocked on commit inside 
OffsetManager.

My suggestion is that as long as there is a tuple ready to be retried 
(retryService.readyMessageCount() > 0), poll should proceed independent of the 
numUncommittedOffsets. This will allow for the possibility that that tuple is 
successfully processed and the topology is not stalled. 

We can optionally add logic in the emitTupleIfNotEmitted(ConsumerRecord<K, V> 
record) method to restrict fresh tuples from being emitted if the 
numUncommittedOffsets threshold has been breached while still allowing kafka 
consumer poll to happen as long as retryService.readyMessageCount() is non-zero.

> New Kafka spout can stop emitting tuples if more than maxUncommittedOffsets 
> tuples fail at once
> -----------------------------------------------------------------------------------------------
>
>                 Key: STORM-2343
>                 URL: https://issues.apache.org/jira/browse/STORM-2343
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 2.0.0, 1.1.0
>            Reporter: Stig Rohde Døssing
>            Assignee: Stig Rohde Døssing
>            Priority: Critical
>             Fix For: 2.0.0, 1.1.1
>
>          Time Spent: 14h 50m
>  Remaining Estimate: 0h
>
> It doesn't look like the spout is respecting maxUncommittedOffsets in all 
> cases. If the underlying consumer returns more records in a call to poll() 
> than maxUncommittedOffsets, they will all be added to waitingToEmit. Since 
> poll may return up to 500 records by default (Kafka 0.10.1.1), this is pretty 
> likely to happen with low maxUncommittedOffsets.
> The spout only checks for tuples to retry if it decides to poll, and it only 
> decides to poll if numUncommittedOffsets < maxUncommittedOffsets. Since 
> maxUncommittedOffsets isn't being respected when retrieving or emitting 
> records, numUncommittedOffsets can be much larger than maxUncommittedOffsets. 
> If more than maxUncommittedOffsets messages fail, this can cause the spout to 
> stop polling entirely.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to