an2x commented on issue #36284:
URL: https://github.com/apache/beam/issues/36284#issuecomment-3339765728

   KafkaIO tracks the last processed message separately in the runner's 
Checkpoint data (see 
[here](https://beam.apache.org/releases/javadoc/2.68.0/org/apache/beam/sdk/io/kafka/KafkaIO.html#partition-assignment-and-checkpointing-heading)).
 The last committed offset will be used only on the pipeline start, but later 
it will rely on the checkpoint information (especially if DoFn creation is 
failing and it's re-trying some of the bundles). So I think there is no 
guarantee that the last processed offset you're committing manually will match 
the minimum offset in the next bundle assigned to this DoFn instance. I would 
at least throw the exception only if `minOffsets.get(partition) > 
expectedOffsets.get(partition).offset()` (i.e. `>` instead of `!=`) because 
it's totally possible the runner will decide to re-process some earlier 
messages and the min offset won't match exactly. But I think even `>` is not 
guaranteed to always work in your pipeline because there could be other race 
condition
 s.
   
   If you're manually committing offsets just to track what was processed, you 
don't have to. Just enable 
[commitOffsetsInFinalize](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize())
 and it should do the same thing more reliably.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to