tomstepp commented on code in PR #37510:
URL: https://github.com/apache/beam/pull/37510#discussion_r2817706668


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java:
##########
@@ -611,37 +615,61 @@ private void consumerPollLoop() {
 
   private void commitCheckpointMark() {
     KafkaCheckpointMark checkpointMark = 
finalizedCheckpointMark.getAndSet(null);
-    if (checkpointMark != null) {
-      LOG.debug("{}: Committing finalized checkpoint {}", this, 
checkpointMark);
-      Consumer<byte[], byte[]> consumer = 
Preconditions.checkStateNotNull(this.consumer);
-      Instant now = Instant.now();
+    if (checkpointMark == null) {
+      return;
+    }
 
-      try {
-        consumer.commitSync(
-            checkpointMark.getPartitions().stream()
-                .filter(p -> p.getNextOffset() != UNINITIALIZED_OFFSET)
-                .collect(
-                    Collectors.toMap(
-                        p -> new TopicPartition(p.getTopic(), 
p.getPartition()),
-                        p -> new OffsetAndMetadata(p.getNextOffset()))));
-        nextAllowedCommitFailLogTime = now.plus(MIN_COMMIT_FAIL_LOG_INTERVAL);
-      } catch (Exception e) {
-        // Log but ignore the exception. Committing consumer offsets to Kafka 
is not critical for
-        // KafkaIO because it relies on the offsets stored in 
KafkaCheckpointMark.
-        if (now.isAfter(nextAllowedCommitFailLogTime)) {
-          LOG.warn(
-              String.format(
-                  "%s: Did not successfully commit finalized checkpoint for > 
%s. Current checkpoint: %s",
-                  this, MIN_COMMIT_FAIL_LOG_INTERVAL, checkpointMark),
-              e);
-          nextAllowedCommitFailLogTime = 
now.plus(MIN_COMMIT_FAIL_LOG_INTERVAL);
-        } else {
-          LOG.info(
-              String.format(
-                  "%s: Could not commit finalized checkpoint. Commit will be 
retried with subsequent reads. Current checkpoint: %s",
-                  this, checkpointMark),
-              e);
+    LOG.debug("{}: Committing finalized checkpoint {}", this, checkpointMark);
+    Consumer<byte[], byte[]> consumer = 
Preconditions.checkStateNotNull(this.consumer);
+    Instant now = Instant.now();
+
+    try {
+      // Commit only partitions whose offsets have advanced since the last 
successful commit
+      // for this reader. This suppresses no-op commits for idle partitions.
+      Map<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<>();
+      for (KafkaCheckpointMark.PartitionMark p : 
checkpointMark.getPartitions()) {
+        long next = p.getNextOffset();
+        if (next == UNINITIALIZED_OFFSET) {
+          continue;
         }
+
+        TopicPartition tp = new TopicPartition(p.getTopic(), p.getPartition());
+        Long prev = lastCommittedOffsets.get(tp);
+
+        if (prev == null || next > prev) {
+          toCommit.put(tp, new OffsetAndMetadata(next));
+        }
+      }
+

Review Comment:
   Maybe we could we add a debug log for # idle partitions?



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java:
##########
@@ -79,6 +80,9 @@
  */
 class KafkaUnboundedReader<K, V> extends UnboundedReader<KafkaRecord<K, V>> {
 
+  // Track last successfully committed offsets to suppress no-op commits for 
idle partitions.
+  private final Map<TopicPartition, Long> lastCommittedOffsets = new 
HashMap<>();

Review Comment:
   Maybe we can also track last commit time per partition? We could try to 
commit if idle for more than some time (10 minutes for example).
   
   I think this could also help for cases where customers may use time lag 
monitoring (tracking time since last commit).



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java:
##########


Review Comment:
   Can we add a unit test to cover this new behavior?
   
   From a quick search maybe reuse or similar to 
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffsetTest.java



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to