Github user srdo commented on the issue: https://github.com/apache/storm/pull/1924 I think it would be good to get some of the assumptions I've made down in writing, so I just wrote down a more thorough explanation of the issues I think this PR should fix, and what changes I think it should make: The intention of this PR is to fix an issue where the spout has emitted maxUncommittedOffsets (or more) tuples, and some of them fail, and the spout then hangs because the `numUncommittedOffsets < maxUncommittedOffsets` check when deciding whether to poll doesn't account for retriable tuples. Blindly polling when there are retriable tuples will fix this issue, but leads to us not being able to bound the number of offsets the consumer has read past the last committed offset. For an example of a case where we can't put a bound on uncommitted offsets, say that partition 0 has maxUncommittedOffsets + maxPollRecords emitted tuples past the commit offset, and partition 1 has no emitted tuples (or any number below maxUncommittedOffsets, it doesn't matter). When tuples fail and become retriable on partition 0, the spout would blindly poll for more tuples. If it gets tuples from partition 0 it's okay, since it seeks back to the committed offset for that partition, so it won't go beyond committedOffset + maxPollRecords (since the retry logic seeks back to the committed offset on every poll, the spout can't go more than maxPollRecords beyond that offset). If it gets tuples from partition 1 we're effectively just emitting new tuples while ignoring maxUncommittedOffsets. Since we aren't controlling which partitions the polled tuples may come from, we can't say anything meaningful about a cap on uncommitted offsets. While I think it would probably work out to being capped anyway, due to the consumer consuming partitions round robin (AFAIK), I'd prefer if the spout implementation doesn't make assumptions beyond those guaranteed by the consumer API (in which case we should assume any call to `KafkaConsumer.poll` could return messages for an y assigned non-paused partition). So the way to fix that issue is to ensure that if we're polling for retriable tuples, we only poll on those partitions that have retriable tuples (i.e. we pause the others when doing `doSeekRetriableTopicPartitions`, then resume all assigned after calling `KafkaConsumer.poll`). Pausing the other partitions should only affect that poll cycle, since once retriable tuples get emitted they're no longer retriable, and we won't hit `doSeekRetriableTopicPartitions` for those tuples again right away. In most polls (no failed tuples, or tuples are failed but not ready for retry), we won't hit the pausing case. If we pause partitions with no retriable tuples when polling on partitions with retriable tuples, we should be able to guarantee that any partition never gets more than maxUncommittedOffsets + maxPollRecords - 1 past the last committed offset. In the case where there are no failed tuples, we can reach the limit by having maxUncommittedOffsets - 1 emitted offsets, and polling once, getting up to maxPollRecords more. If there are retriable tuples, pausing will stop us from ignoring the maxUncommittedOffsets cap for partitions with no retriable tuples, and the partitions with retriable tuples won't get more than maxPollRecords beyond the last committed offset, since the consumer seeks back to that offset when polling for retriable offsets. There's a second minor issue I'd like this PR to address: If maxPollRecords isn't exactly equal to maxUncommittedOffsets, the spout can behave in some undesirable ways. * If maxPollRecords is greater than maxUncommittedOffsets, the maxUncommittedOffsets limit may be exceeded on any one poll. In this case there's no reason to have 2 separate variables, since the net effect is the same as setting maxUncommittedOffsets to be equal to maxPollRecords. * If maxPollRecords is less than maxUncommittedOffsets, there's a risk of the spout getting stuck on some tuples for a while when it is retrying tuples. Say there are 10 retriable tuples following the last committed offset, and maxUncommittedOffsets is 10. If maxPollRecords is 5 and the first 5 retriable tuples are reemitted in the first batch, the next 5 tuples can't be emitted until (some of) the first 5 are acked. This is because the spout will seek the consumer back to the last committed offset any time there are failed tuples, which will lead to it getting the first 5 tuples out of the consumer, checking that they are emitted, and skipping them. This will repeat until the last committed offset moves. If there are other partitions with tuples available, those tuples may get emitted, but the "blocked" partition won't progress until some tuples are acked on it. I think it might make sense to remove maxUncommittedOffsets entirely, and have the spout use maxPollRecords instead? The tl;dr is that I think we should allow polling if there are retriable offsets, even if `numUncommittedOffsets >= maxUncommittedOffsets`, we should pause non-retriable partitions when polling for retries, and we should maybe merge maxUncommittedOffsets and maxPollRecords. @hmcl Could you take a look at this explanation and let me know if I got something wrong, or if you'd prefer to solve the issues in another way? I think it got a bit hypothetical.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---