Github user srdo commented on the issue: https://github.com/apache/storm/pull/2454 This PR doesn't actually fix the issue reported in STORM-2847. The problem is that we can't commit offsets for partitions the consumer is not assigned. We've previously assumed that the spout was assigned every partition it had offset managers for, but this isn't true when reactivating a deactivated spout. When we reactivate a spout, the consumer has no assigned partitions when onPartitionsRevoked is called initially. It doesn't seem ideal that we unconditionally commit offsets for all partitions we have offset managers for, when the onPartitionsRevoked method parameter tells us which partitions were assigned previously. I'll close this and reopen once the issue is fixed. My immediate ideas would be these: * Put KafkaConsumer instantiation in open(), keep the same consumer for the lifetime of the spout: Benefit is that it's easy to "get right". Drawback is that if you deactivate a spout in order to e.g. update a committed offset with the scripts Kafka ships with, when you activate the spout it will resume from where it was, not from the new committed offset. Maybe it is better if deactivate/activate behaves the same as if the spout had restarted (e.g. as it would if the worker crashed). Also the consumer remains open while the spout isn't running. * Wipe internal spout state when deactivating, including OffsetManagers: Benefit is that the spout behaves as if it had been restarted when reactivated, drawback is that some acked tuples that couldn't be committed when the spout was deactivated may be unnecessarily replayed in at-least-once mode, e.g. if offset 0-2, 4-5 had been acked before deactivating, 4-5 couldn't be committed and would be replayed. * Do a quick hack and make ManualPartitionSubscription only call onPartitionsRevoked if the assignment wasn't empty. I'd appreciate your thoughts on which of these proposals you prefer (or if you have other ideas). Sorry about the partial PR.
---