[ 
https://issues.apache.org/jira/browse/STORM-2343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16044883#comment-16044883
 ] 

Prasanna Ranganathan commented on STORM-2343:
---------------------------------------------

Thanks a ton for the awesome writeup of the issue and potential solutions. My 
thoughts so far around potential solutions are mostly in-line with yours. I 
wanted but did not get around to confirming the behaviour of Kafka Broker / 
Group Coordinator when the client node that paused a partition crashes OR 
leaves group OR suffers network partition before calling resume() for that 
partition. We need to confirm the behaviour in this scenario and handle it 
accordingly in the spout.

About Solution #3:
I am assuming we need NOT pause partition 0 in solution #3 for the scenario 
described. This solution, to me, is basically extending the current logic 
around maxUncommittedOffsets to every partition in the spout. If a spout 
handles only one partition then we would never really pause it. We simply stop 
calling poll if a partition reaches maxUncommittedOffsets without any failed 
tuples. Otherwise the partition should continue to be polled. The logic should 
then simply take care of seeking to the appropriate offset depending on whether 
retriable tuples are present.

Agree completely that the choice is between #2 and #3. Am leaning toward #3 for 
the following reasons:
- Partition is a fundamental building block / concept in Kafka and this 
solution fits neatly into it and extends it
- For Storm spout, Kafka Partitions enable scaling and isolation among other 
things. It is not acceptable for a 'healthy' partition to be blocked by an 
'unhealthy' one
- We do a fair bit of partition-specific bookkeeping in OffsetManager already. 
More bookkeeping is a fair price to pay given the reward on offer.. :-)

> New Kafka spout can stop emitting tuples if more than maxUncommittedOffsets 
> tuples fail at once
> -----------------------------------------------------------------------------------------------
>
>                 Key: STORM-2343
>                 URL: https://issues.apache.org/jira/browse/STORM-2343
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 2.0.0, 1.1.0
>            Reporter: Stig Rohde Døssing
>            Assignee: Stig Rohde Døssing
>            Priority: Critical
>             Fix For: 2.0.0, 1.1.1
>
>          Time Spent: 14h 50m
>  Remaining Estimate: 0h
>
> It doesn't look like the spout is respecting maxUncommittedOffsets in all 
> cases. If the underlying consumer returns more records in a call to poll() 
> than maxUncommittedOffsets, they will all be added to waitingToEmit. Since 
> poll may return up to 500 records by default (Kafka 0.10.1.1), this is pretty 
> likely to happen with low maxUncommittedOffsets.
> The spout only checks for tuples to retry if it decides to poll, and it only 
> decides to poll if numUncommittedOffsets < maxUncommittedOffsets. Since 
> maxUncommittedOffsets isn't being respected when retrieving or emitting 
> records, numUncommittedOffsets can be much larger than maxUncommittedOffsets. 
> If more than maxUncommittedOffsets messages fail, this can cause the spout to 
> stop polling entirely.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to