[ 
https://issues.apache.org/jira/browse/STORM-2343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16044979#comment-16044979
 ] 

Stig Rohde Døssing commented on STORM-2343:
-------------------------------------------

{quote}
We need to confirm the behaviour in this scenario and handle it accordingly in 
the spout.
{quote}
As far as I know pausing/resuming is a purely local operation for the 
KafkaConsumer. It just causes it to not fetch records for the paused 
partitions. The paused state is not preserved if the client crashes (because 
the local state is then lost), or if the consumers rebalance (see 
https://github.com/apache/kafka/blob/2af4dd8653dd6717cca1630a57b2835a2698a1bc/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L49).
 I don't think we need to worry about this. Also I'm pushing for us dropping 
support for Kafka-managed subscriptions here 
https://github.com/apache/storm/pull/2151 so I'm hoping this ends up being 
irrelevant.

{quote}
I am assuming we need NOT pause partition 0 in solution #3 for the scenario 
described
{quote}
The reason we want to pause is that when the spout is at (or past) the 
maxUncommittedOffsets limit, it should only emit retries or a very limited 
number of new tuples. In the example I gave above, if we don't pause partition 
0, then the poll triggered to fetch offset 99 on partition 1 might just return 
a full batch of messages from partition 0. There is no guarantee that the poll 
will even contain the retriable tuple, so we might do this multiple times. If 
there were 10 additional partitions we might get full polls for any of those as 
well before we get the retriable tuple.

If we don't pause we can't really enforce maxUncommittedOffsets as far as I can 
tell. 

I agree that if there's only one partition it should never be paused. The rest 
of your outline seems right to me as well.

{quote}
For Storm spout, Kafka Partitions enable scaling and isolation among other 
things. It is not acceptable for a 'healthy' partition to be blocked by an 
'unhealthy' one
{quote}
I don't think the healthy partitions will be blocked for very long. Each poll 
where we pause will reemit (or discard pending the fix for STORM-2546) some 
retriable tuples. The only way the spout should be completely blocked due to 
retries is if the user hasn't configured a retry limit and the tuples fail 
consistently.

I agree that it isn't ideal, but I don't see a way to have a limit like 
maxUncommittedOffsets be properly enforced without pausing (and thus blocking) 
the healthy partitions when we get in this state where maxUncommittedOffsets is 
violated.

> New Kafka spout can stop emitting tuples if more than maxUncommittedOffsets 
> tuples fail at once
> -----------------------------------------------------------------------------------------------
>
>                 Key: STORM-2343
>                 URL: https://issues.apache.org/jira/browse/STORM-2343
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 2.0.0, 1.1.0
>            Reporter: Stig Rohde Døssing
>            Assignee: Stig Rohde Døssing
>            Priority: Critical
>             Fix For: 2.0.0, 1.1.1
>
>          Time Spent: 14h 50m
>  Remaining Estimate: 0h
>
> It doesn't look like the spout is respecting maxUncommittedOffsets in all 
> cases. If the underlying consumer returns more records in a call to poll() 
> than maxUncommittedOffsets, they will all be added to waitingToEmit. Since 
> poll may return up to 500 records by default (Kafka 0.10.1.1), this is pretty 
> likely to happen with low maxUncommittedOffsets.
> The spout only checks for tuples to retry if it decides to poll, and it only 
> decides to poll if numUncommittedOffsets < maxUncommittedOffsets. Since 
> maxUncommittedOffsets isn't being respected when retrieving or emitting 
> records, numUncommittedOffsets can be much larger than maxUncommittedOffsets. 
> If more than maxUncommittedOffsets messages fail, this can cause the spout to 
> stop polling entirely.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to