Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1924
  
    I think it would be good to get some of the assumptions I've made down in 
writing, so I just wrote down a more thorough explanation of the issues I think 
this PR should fix, and what changes I think it should make:
    
    The intention of this PR is to fix an issue where the spout has emitted 
maxUncommittedOffsets (or more) tuples, and some of them fail, and the spout 
then hangs because the `numUncommittedOffsets < maxUncommittedOffsets` check 
when deciding whether to poll doesn't account for retriable tuples. Blindly 
polling when there are retriable tuples will fix this issue, but leads to us 
not being able to bound the number of offsets the consumer has read past the 
last committed offset.
    
    For an example of a case where we can't put a bound on uncommitted offsets, 
say that partition 0 has maxUncommittedOffsets + maxPollRecords emitted tuples 
past the commit offset, and partition 1 has no emitted tuples (or any number 
below maxUncommittedOffsets, it doesn't matter).
    
    When tuples fail and become retriable on partition 0, the spout would 
blindly poll for more tuples. If it gets tuples from partition 0 it's okay, 
since it seeks back to the committed offset for that partition, so it won't go 
beyond committedOffset + maxPollRecords (since the retry logic seeks back to 
the committed offset on every poll, the spout can't go more than maxPollRecords 
beyond that offset). If it gets tuples from partition 1 we're effectively just 
emitting new tuples while ignoring maxUncommittedOffsets. Since we aren't 
controlling which partitions the polled tuples may come from, we can't say 
anything meaningful about a cap on uncommitted offsets. While I think it would 
probably work out to being capped anyway, due to the consumer consuming 
partitions round robin (AFAIK), I'd prefer if the spout implementation doesn't 
make assumptions beyond those guaranteed by the consumer API (in which case we 
should assume any call to `KafkaConsumer.poll` could return messages for an
 y assigned non-paused partition).
    
    So the way to fix that issue is to ensure that if we're polling for 
retriable tuples, we only poll on those partitions that have retriable tuples 
(i.e. we pause the others when doing `doSeekRetriableTopicPartitions`, then 
resume all assigned after calling `KafkaConsumer.poll`). Pausing the other 
partitions should only affect that poll cycle, since once retriable tuples get 
emitted they're no longer retriable, and we won't hit 
`doSeekRetriableTopicPartitions` for those tuples again right away. In most 
polls (no failed tuples, or tuples are failed but not ready for retry), we 
won't hit the pausing case.
    
    If we pause partitions with no retriable tuples when polling on partitions 
with retriable tuples, we should be able to guarantee that any partition never 
gets more than maxUncommittedOffsets + maxPollRecords - 1 past the last 
committed offset. In the case where there are no failed tuples, we can reach 
the limit by having maxUncommittedOffsets - 1 emitted offsets, and polling 
once, getting up to maxPollRecords more. If there are retriable tuples, pausing 
will stop us from ignoring the maxUncommittedOffsets cap for partitions with no 
retriable tuples, and the partitions with retriable tuples won't get more than 
maxPollRecords beyond the last committed offset, since the consumer seeks back 
to that offset when polling for retriable offsets.
    
    There's a second minor issue I'd like this PR to address:
    If maxPollRecords isn't exactly equal to maxUncommittedOffsets, the spout 
can behave in some undesirable ways.
    * If maxPollRecords is greater than maxUncommittedOffsets, the 
maxUncommittedOffsets limit may be exceeded on any one poll. In this case 
there's no reason to have 2 separate variables, since the net effect is the 
same as setting maxUncommittedOffsets to be equal to maxPollRecords. 
    * If maxPollRecords is less than maxUncommittedOffsets, there's a risk of 
the spout getting stuck on some tuples for a while when it is retrying tuples.
      Say there are 10 retriable tuples following the last committed offset, 
and maxUncommittedOffsets is 10. If maxPollRecords is 5 and the first 5 
retriable tuples are reemitted in the first batch, the next 5 tuples can't be 
emitted until (some of) the first 5 are acked. This is because the spout will 
seek the consumer back to the last committed offset any time there are failed 
tuples, which will lead to it getting the first 5 tuples out of the consumer, 
checking that they are emitted, and skipping them. This will repeat until the 
last committed offset moves. If there are other partitions with tuples 
available, those tuples may get emitted, but the "blocked" partition won't 
progress until some tuples are acked on it.
      
    I think it might make sense to remove maxUncommittedOffsets entirely, and 
have the spout use maxPollRecords instead?
    
    The tl;dr is that I think we should allow polling if there are retriable 
offsets, even if `numUncommittedOffsets >= maxUncommittedOffsets`, we should 
pause non-retriable partitions when polling for retries, and we should maybe 
merge maxUncommittedOffsets and maxPollRecords.
    
    @hmcl Could you take a look at this explanation and let me know if I got 
something wrong, or if you'd prefer to solve the issues in another way? I think 
it got a bit hypothetical.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to