[ 
https://issues.apache.org/jira/browse/STORM-2343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16041573#comment-16041573
 ] 

Stig Rohde Døssing commented on STORM-2343:
-------------------------------------------

That isn't quite true. There is a limit to how far past maxUncommittedOffsets 
we can emit, because the KafkaConsumer has a limit to how many messages a call 
to poll will return (max.poll.records)

I agree that the problem is that we either need to not emit all the tuples we 
receive in order to respect maxUncommittedOffsets, or we need to do something 
else to ensure that we won't poll if we're sufficiently far past the 
maxUncommittedOffsets limit. I don't think we're treating maxUncommittedOffsets 
as a strict cap (because it is hard to enforce if we want to emit all the 
tuples we receive in poll), but we'd like to at least be able to say we won't 
go farther than x tuples past it.

I think you're missing the case where the poll returns messages from an 
unexpected partition. e.g. if you have retriable tuples on partition 1 and are 
at the cap of how many tuples you're willing to process, you would want to 
allow poll so you can retry the tuples. Unless you pause the partitions that 
don't have retriable tuples, the consumer might decide to poll from those 
instead. So you might be trying to get retries for partition 1, but the poll 
might get tuples for partition 2.

> New Kafka spout can stop emitting tuples if more than maxUncommittedOffsets 
> tuples fail at once
> -----------------------------------------------------------------------------------------------
>
>                 Key: STORM-2343
>                 URL: https://issues.apache.org/jira/browse/STORM-2343
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 2.0.0, 1.1.0
>            Reporter: Stig Rohde Døssing
>            Assignee: Stig Rohde Døssing
>            Priority: Critical
>             Fix For: 2.0.0, 1.1.1
>
>          Time Spent: 14h 50m
>  Remaining Estimate: 0h
>
> It doesn't look like the spout is respecting maxUncommittedOffsets in all 
> cases. If the underlying consumer returns more records in a call to poll() 
> than maxUncommittedOffsets, they will all be added to waitingToEmit. Since 
> poll may return up to 500 records by default (Kafka 0.10.1.1), this is pretty 
> likely to happen with low maxUncommittedOffsets.
> The spout only checks for tuples to retry if it decides to poll, and it only 
> decides to poll if numUncommittedOffsets < maxUncommittedOffsets. Since 
> maxUncommittedOffsets isn't being respected when retrieving or emitting 
> records, numUncommittedOffsets can be much larger than maxUncommittedOffsets. 
> If more than maxUncommittedOffsets messages fail, this can cause the spout to 
> stop polling entirely.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to