[ 
https://issues.apache.org/jira/browse/STORM-2343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16041545#comment-16041545
 ] 

Prasanna Ranganathan commented on STORM-2343:
---------------------------------------------

[~Srdo] I need some time to properly digest your input and reply. Will do so. 
Meanwhile, I was thinking about this issue and have the following to add to 
what I stated earlier:

One issue that I see currently is that the spout emits greater than 
maxUncommittedOffsets tuples into the topology. This happens because we check 
for numUncommittedOffsets in poll() but once it succeeds and a kafka broker 
call is made, we go ahead and emit ALL the tuples received from the broker. 
This effectively means that there can be an undefined number of tuples over and 
beyond the maxUncommittedOffsets being processed by the topology.

Now, if we capped the total number of tuples under process (including those 
ready or waiting for retry) then the logic we have for poll() would always 
ensure that failed tuples are fetched from the brokers. Does this make sense? 
Or am I missing any edge case here?

> New Kafka spout can stop emitting tuples if more than maxUncommittedOffsets 
> tuples fail at once
> -----------------------------------------------------------------------------------------------
>
>                 Key: STORM-2343
>                 URL: https://issues.apache.org/jira/browse/STORM-2343
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 2.0.0, 1.1.0
>            Reporter: Stig Rohde Døssing
>            Assignee: Stig Rohde Døssing
>            Priority: Critical
>             Fix For: 2.0.0, 1.1.1
>
>          Time Spent: 14h 50m
>  Remaining Estimate: 0h
>
> It doesn't look like the spout is respecting maxUncommittedOffsets in all 
> cases. If the underlying consumer returns more records in a call to poll() 
> than maxUncommittedOffsets, they will all be added to waitingToEmit. Since 
> poll may return up to 500 records by default (Kafka 0.10.1.1), this is pretty 
> likely to happen with low maxUncommittedOffsets.
> The spout only checks for tuples to retry if it decides to poll, and it only 
> decides to poll if numUncommittedOffsets < maxUncommittedOffsets. Since 
> maxUncommittedOffsets isn't being respected when retrieving or emitting 
> records, numUncommittedOffsets can be much larger than maxUncommittedOffsets. 
> If more than maxUncommittedOffsets messages fail, this can cause the spout to 
> stop polling entirely.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to