an2x commented on issue #36284: URL: https://github.com/apache/beam/issues/36284#issuecomment-3339765728
KafkaIO tracks the last processed message separately in the runner's Checkpoint data (see [here](https://beam.apache.org/releases/javadoc/2.68.0/org/apache/beam/sdk/io/kafka/KafkaIO.html#partition-assignment-and-checkpointing-heading)). The last committed offset will be used only on the pipeline start, but later it will rely on the checkpoint information (especially if DoFn creation is failing and it's re-trying some of the bundles). So I think there is no guarantee that the last processed offset you're committing manually will match the minimum offset in the next bundle assigned to this DoFn instance. I would at least throw the exception only if `minOffsets.get(partition) > expectedOffsets.get(partition).offset()` (i.e. `>` instead of `!=`) because it's totally possible the runner will decide to re-process some earlier messages and the min offset won't match exactly. But I think even `>` is not guaranteed to always work in your pipeline because there could be other race condition s. If you're manually committing offsets just to track what was processed, you don't have to. Just enable [commitOffsetsInFinalize](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize()) and it should do the same thing more reliably. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
