nbali opened a new issue, #22631:
URL: https://github.com/apache/beam/issues/22631

   ### What happened?
   
   I have been reading from Kafka and trying to figure out which offset 
management would be the best for my use-case. During that I noticed something 
odd.
   
   
https://github.com/apache/beam/blob/c9c161d018d116504fd5f83b48853a7071ec9ce4/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2359-L2362
   
       private boolean configuredKafkaCommit() {
         return getConsumerConfig().get("isolation.level") == "read_committed"
             || 
Boolean.TRUE.equals(getConsumerConfig().get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
       }
   
https://github.com/apache/beam/blob/c9c161d018d116504fd5f83b48853a7071ec9ce4/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2292-L2298
   
https://github.com/apache/beam/blob/c9c161d018d116504fd5f83b48853a7071ec9ce4/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2321-L2334
   
   The name of the method, and how it's being used in the code certainly 
suggest that using read_committed isolation level handles and commits kafka 
offsets.Seemed strange, but I'm not a Kafka pro, so let's test it. Well it does 
not.
   
   - using ONLY ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG does commit it
   - using ONLY commitOffsetsInFinalize() does commit it
   
   - using ONLY withReadCommitted() does NOT commit it
   
   Dataflow, 2.40.0 Java SDK, without explicitly enabling SDF-read
   
   So is it a bug, or what am I missing here?
   
   If it is indeed a bug, then is it with the read_committed (so it should 
commit it although found no explicit documentation about that anywhere), or 
having that isolation level shouldn't prefer the commit in the finalize and 
that method is wrong?
   
   
   ------------
   @johnjcasey: 
   withReadCommitted() doesn't commit messages when read, it instead specifies 
that the kafka consumer should only read messages that have themselves been 
committed to kafka.
   
   Its use is for exactly once applications.
   ------------
   @johnjcasey 
   Which looking at your message again, would imply that the 
configuredKafkaCommit() method shouldn't inspect isolation.level
   
   ### Issue Priority
   
   Priority: 2
   
   ### Issue Component
   
   Component: io-java-kafka


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to