johnjcasey commented on code in PR #37510:
URL: https://github.com/apache/beam/pull/37510#discussion_r2817886294


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java:
##########
@@ -611,37 +615,61 @@ private void consumerPollLoop() {
 
   private void commitCheckpointMark() {
     KafkaCheckpointMark checkpointMark = 
finalizedCheckpointMark.getAndSet(null);
-    if (checkpointMark != null) {
-      LOG.debug("{}: Committing finalized checkpoint {}", this, 
checkpointMark);
-      Consumer<byte[], byte[]> consumer = 
Preconditions.checkStateNotNull(this.consumer);
-      Instant now = Instant.now();
+    if (checkpointMark == null) {
+      return;
+    }
 
-      try {
-        consumer.commitSync(
-            checkpointMark.getPartitions().stream()
-                .filter(p -> p.getNextOffset() != UNINITIALIZED_OFFSET)
-                .collect(
-                    Collectors.toMap(
-                        p -> new TopicPartition(p.getTopic(), 
p.getPartition()),
-                        p -> new OffsetAndMetadata(p.getNextOffset()))));
-        nextAllowedCommitFailLogTime = now.plus(MIN_COMMIT_FAIL_LOG_INTERVAL);
-      } catch (Exception e) {
-        // Log but ignore the exception. Committing consumer offsets to Kafka 
is not critical for
-        // KafkaIO because it relies on the offsets stored in 
KafkaCheckpointMark.
-        if (now.isAfter(nextAllowedCommitFailLogTime)) {
-          LOG.warn(
-              String.format(
-                  "%s: Did not successfully commit finalized checkpoint for > 
%s. Current checkpoint: %s",
-                  this, MIN_COMMIT_FAIL_LOG_INTERVAL, checkpointMark),
-              e);
-          nextAllowedCommitFailLogTime = 
now.plus(MIN_COMMIT_FAIL_LOG_INTERVAL);
-        } else {
-          LOG.info(
-              String.format(
-                  "%s: Could not commit finalized checkpoint. Commit will be 
retried with subsequent reads. Current checkpoint: %s",
-                  this, checkpointMark),
-              e);
+    LOG.debug("{}: Committing finalized checkpoint {}", this, checkpointMark);
+    Consumer<byte[], byte[]> consumer = 
Preconditions.checkStateNotNull(this.consumer);
+    Instant now = Instant.now();
+
+    try {
+      // Commit only partitions whose offsets have advanced since the last 
successful commit
+      // for this reader. This suppresses no-op commits for idle partitions.
+      Map<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<>();
+      for (KafkaCheckpointMark.PartitionMark p : 
checkpointMark.getPartitions()) {
+        long next = p.getNextOffset();
+        if (next == UNINITIALIZED_OFFSET) {
+          continue;
         }
+
+        TopicPartition tp = new TopicPartition(p.getTopic(), p.getPartition());
+        Long prev = lastCommittedOffsets.get(tp);
+
+        if (prev == null || next > prev) {
+          toCommit.put(tp, new OffsetAndMetadata(next));

Review Comment:
   I like the idea of this change. Can we keep the existing java streams logic, 
but simply add a new filter step?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to