This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 78061b82a24 If the partition count or kafka IO size is large, then 
skip committin… (#37510)
78061b82a24 is described below

commit 78061b82a2443854e3f57e329a4d380745e69aa8
Author: kishorepola <[email protected]>
AuthorDate: Mon Mar 9 07:28:50 2026 -0700

    If the partition count or kafka IO size is large, then skip committin… 
(#37510)
    
    * If the partition count or kafka IO size is large, then skip committing 
offsets that are not changed. Reduce kafka commit load
    
    * Address PR review feedback for idle partition optimization
    
    - Refactor commitCheckpointMark to use Java streams (per @johnjcasey)
      Changed from explicit for-loop to streams-based filtering for better
      code consistency with existing patterns
    
    - Add debug logging for idle partitions (per @tomstepp)
      Log the count of idle partitions skipped during each commit to aid
      in monitoring and debugging the optimization
    
    - Implement time-based periodic commits (per @tomstepp)
      Track last commit time per partition and ensure commits happen at
      least every 10 minutes even for idle partitions. This supports time
      lag monitoring use cases where customers track time since last commit.
    
    - Add unit test for idle partition behavior (per @tomstepp)
      New test KafkaUnboundedReaderIdlePartitionTest verifies that:
      * Idle partitions are not committed repeatedly
      * Active partitions trigger commits correctly
      * Uses mock consumer to track commit calls
    
    All changes maintain backward compatibility and follow Apache Beam
    coding standards (spotless formatting applied).
    
    * Fix test to follow Beam patterns for MockConsumer initialization
    
    Rewrote KafkaUnboundedReaderIdlePartitionTest to follow the exact
    pattern used in KafkaIOTest.java:
    - Proper MockConsumer initialization with partition metadata
    - Correct setup of beginning/end offsets
    - Consumer records with proper offsets and timestamps
    - schedulePollTask for record enqueueing based on position
    - Override commitSync to track commit calls
    - Use reader.start() before reader.advance()
    
    This ensures the test properly initializes the Kafka consumer and
    doesn't fail with IllegalStateException during source.split().
    
    ---------
    
    Co-authored-by: Kishore Pola <[email protected]>
---
 .../beam/sdk/io/kafka/KafkaUnboundedReader.java    | 117 ++++++---
 .../KafkaUnboundedReaderIdlePartitionTest.java     | 273 +++++++++++++++++++++
 2 files changed, 361 insertions(+), 29 deletions(-)

diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
index 866dfd48710..286c2cd5d8e 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -79,6 +80,11 @@ import org.slf4j.LoggerFactory;
  */
 class KafkaUnboundedReader<K, V> extends UnboundedReader<KafkaRecord<K, V>> {
 
+  // Track last successfully committed offsets to suppress no-op commits for 
idle partitions.
+  private final Map<TopicPartition, Long> lastCommittedOffsets = new 
HashMap<>();
+  // Track last commit time per partition to ensure periodic commits for time 
lag monitoring.
+  private final Map<TopicPartition, Instant> lastCommitTimes = new HashMap<>();
+
   ///////////////////// Reader API 
////////////////////////////////////////////////////////////
   @SuppressWarnings("FutureReturnValueIgnored")
   @Override
@@ -375,6 +381,8 @@ class KafkaUnboundedReader<K, V> extends 
UnboundedReader<KafkaRecord<K, V>> {
   private static final Duration RECORDS_DEQUEUE_POLL_TIMEOUT_MAX = 
Duration.millis(20);
   private static final Duration RECORDS_ENQUEUE_POLL_TIMEOUT = 
Duration.millis(100);
   private static final Duration MIN_COMMIT_FAIL_LOG_INTERVAL = 
Duration.standardMinutes(10);
+  // Maximum time between commits for idle partitions (for time lag 
monitoring).
+  private static final Duration MAX_IDLE_COMMIT_INTERVAL = 
Duration.standardMinutes(10);
 
   // Use a separate thread to read Kafka messages. Kafka Consumer does all its 
work including
   // network I/O inside poll(). Polling only inside #advance(), especially 
with a small timeout
@@ -611,37 +619,88 @@ class KafkaUnboundedReader<K, V> extends 
UnboundedReader<KafkaRecord<K, V>> {
 
   private void commitCheckpointMark() {
     KafkaCheckpointMark checkpointMark = 
finalizedCheckpointMark.getAndSet(null);
-    if (checkpointMark != null) {
-      LOG.debug("{}: Committing finalized checkpoint {}", this, 
checkpointMark);
-      Consumer<byte[], byte[]> consumer = 
Preconditions.checkStateNotNull(this.consumer);
-      Instant now = Instant.now();
+    if (checkpointMark == null) {
+      return;
+    }
 
-      try {
-        consumer.commitSync(
-            checkpointMark.getPartitions().stream()
-                .filter(p -> p.getNextOffset() != UNINITIALIZED_OFFSET)
-                .collect(
-                    Collectors.toMap(
-                        p -> new TopicPartition(p.getTopic(), 
p.getPartition()),
-                        p -> new OffsetAndMetadata(p.getNextOffset()))));
+    LOG.debug("{}: Committing finalized checkpoint {}", this, checkpointMark);
+    Consumer<byte[], byte[]> consumer = 
Preconditions.checkStateNotNull(this.consumer);
+    Instant now = Instant.now();
+
+    try {
+      // Commit only partitions whose offsets have advanced since the last 
successful commit
+      // for this reader, or partitions that haven't been committed within 
MAX_IDLE_COMMIT_INTERVAL.
+      // This suppresses no-op commits for idle partitions while ensuring 
periodic commits
+      // for time lag monitoring.
+      Map<TopicPartition, OffsetAndMetadata> toCommit =
+          checkpointMark.getPartitions().stream()
+              .filter(p -> p.getNextOffset() != UNINITIALIZED_OFFSET)
+              .filter(
+                  p -> {
+                    TopicPartition tp = new TopicPartition(p.getTopic(), 
p.getPartition());
+                    Long prev = lastCommittedOffsets.get(tp);
+                    long next = p.getNextOffset();
+                    Instant lastCommitTime = lastCommitTimes.get(tp);
+
+                    // Commit if offset has advanced
+                    if (prev == null || next > prev) {
+                      return true;
+                    }
+
+                    // Also commit if partition hasn't been committed within 
max idle interval
+                    if (lastCommitTime == null
+                        || 
now.isAfter(lastCommitTime.plus(MAX_IDLE_COMMIT_INTERVAL))) {
+                      return true;
+                    }
+
+                    return false;
+                  })
+              .collect(
+                  Collectors.toMap(
+                      p -> new TopicPartition(p.getTopic(), p.getPartition()),
+                      p -> new OffsetAndMetadata(p.getNextOffset())));
+
+      int totalPartitions = checkpointMark.getPartitions().size();
+      int idlePartitions = totalPartitions - toCommit.size();
+      if (idlePartitions > 0) {
+        LOG.debug(
+            "{}: Skipping commit for {} idle partitions ({} of {} partitions 
active)",
+            this,
+            idlePartitions,
+            toCommit.size(),
+            totalPartitions);
+      }
+
+      if (toCommit.isEmpty()) {
+        // Nothing advanced since last successful commit; avoid noisy 
commitSync().
+        return;
+      }
+
+      consumer.commitSync(toCommit);
+
+      // Only update after a successful commit.
+      for (Map.Entry<TopicPartition, OffsetAndMetadata> e : 
toCommit.entrySet()) {
+        lastCommittedOffsets.put(e.getKey(), e.getValue().offset());
+        lastCommitTimes.put(e.getKey(), now);
+      }
+
+      nextAllowedCommitFailLogTime = now.plus(MIN_COMMIT_FAIL_LOG_INTERVAL);
+    } catch (Exception e) {
+      // Log but ignore the exception. Committing consumer offsets to Kafka is 
not critical for
+      // KafkaIO because it relies on the offsets stored in 
KafkaCheckpointMark.
+      if (now.isAfter(nextAllowedCommitFailLogTime)) {
+        LOG.warn(
+            String.format(
+                "%s: Did not successfully commit finalized checkpoint for > 
%s. Current checkpoint: %s",
+                this, MIN_COMMIT_FAIL_LOG_INTERVAL, checkpointMark),
+            e);
         nextAllowedCommitFailLogTime = now.plus(MIN_COMMIT_FAIL_LOG_INTERVAL);
-      } catch (Exception e) {
-        // Log but ignore the exception. Committing consumer offsets to Kafka 
is not critical for
-        // KafkaIO because it relies on the offsets stored in 
KafkaCheckpointMark.
-        if (now.isAfter(nextAllowedCommitFailLogTime)) {
-          LOG.warn(
-              String.format(
-                  "%s: Did not successfully commit finalized checkpoint for > 
%s. Current checkpoint: %s",
-                  this, MIN_COMMIT_FAIL_LOG_INTERVAL, checkpointMark),
-              e);
-          nextAllowedCommitFailLogTime = 
now.plus(MIN_COMMIT_FAIL_LOG_INTERVAL);
-        } else {
-          LOG.info(
-              String.format(
-                  "%s: Could not commit finalized checkpoint. Commit will be 
retried with subsequent reads. Current checkpoint: %s",
-                  this, checkpointMark),
-              e);
-        }
+      } else {
+        LOG.info(
+            String.format(
+                "%s: Could not commit finalized checkpoint. Commit will be 
retried with subsequent reads. Current checkpoint: %s",
+                this, checkpointMark),
+            e);
       }
     }
   }
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReaderIdlePartitionTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReaderIdlePartitionTest.java
new file mode 100644
index 00000000000..005cd6fa378
--- /dev/null
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReaderIdlePartitionTest.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for idle partition optimization in {@link 
KafkaUnboundedReader}. */
+@RunWith(JUnit4.class)
+public class KafkaUnboundedReaderIdlePartitionTest {
+
+  private static final Instant LOG_APPEND_START_TIME = new Instant(100000);
+
+  /**
+   * Verifies that idle partitions (partitions with no offset changes) are not 
committed repeatedly,
+   * reducing load on Kafka brokers.
+   */
+  @Test
+  public void testIdlePartitionsNotCommittedRepeatedly() throws Exception {
+    int numElements = 50;
+    int numPartitions = 10;
+    List<String> topics = ImmutableList.of("test_topic");
+
+    // Create a tracking consumer factory
+    TrackingConsumerFactory consumerFactory =
+        new TrackingConsumerFactory(topics, numPartitions, numElements);
+
+    // Create a Kafka source with commit offsets enabled
+    UnboundedSource<KafkaRecord<Integer, Long>, KafkaCheckpointMark> source =
+        KafkaIO.<Integer, Long>read()
+            .withBootstrapServers("test_server")
+            .withTopics(topics)
+            .withConsumerFactoryFn(consumerFactory)
+            .withKeyDeserializer(IntegerDeserializer.class)
+            .withValueDeserializer(LongDeserializer.class)
+            .withConsumerConfigUpdates(
+                ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, 
"test_group_id"))
+            .commitOffsetsInFinalize()
+            .makeSource()
+            .split(1, PipelineOptionsFactory.create())
+            .get(0);
+
+    UnboundedReader<KafkaRecord<Integer, Long>> reader = 
source.createReader(null, null);
+
+    // Read some elements
+    assertTrue("Reader should start", reader.start());
+    for (int i = 0; i < 10; i++) {
+      assertTrue("Reader should have more elements", reader.advance());
+    }
+
+    // Get first checkpoint and finalize it
+    KafkaCheckpointMark mark1 = (KafkaCheckpointMark) 
reader.getCheckpointMark();
+    mark1.finalizeCheckpoint();
+
+    // Allow commit to happen (it's async)
+    Thread.sleep(2000);
+
+    int initialCommitCount = consumerFactory.commitCounter.get();
+    assertTrue("Should have committed at least once", initialCommitCount > 0);
+
+    // Create another checkpoint without reading more data (all partitions 
idle)
+    KafkaCheckpointMark mark2 = (KafkaCheckpointMark) 
reader.getCheckpointMark();
+    mark2.finalizeCheckpoint();
+
+    // Allow commit attempt to happen
+    Thread.sleep(2000);
+
+    int secondCommitCount = consumerFactory.commitCounter.get();
+
+    // Verify that no new commits happened for idle partitions
+    assertEquals(
+        "Idle partitions should not trigger additional commits",
+        initialCommitCount,
+        secondCommitCount);
+
+    // Read more elements to activate partitions again
+    for (int i = 0; i < 10; i++) {
+      reader.advance();
+    }
+
+    // Create another checkpoint after reading more data
+    KafkaCheckpointMark mark3 = (KafkaCheckpointMark) 
reader.getCheckpointMark();
+    mark3.finalizeCheckpoint();
+
+    // Allow commit to happen
+    Thread.sleep(2000);
+
+    int thirdCommitCount = consumerFactory.commitCounter.get();
+
+    // Verify that commits happened for partitions with new data
+    assertTrue("Active partitions should trigger commits", thirdCommitCount > 
secondCommitCount);
+
+    reader.close();
+  }
+
+  /** Consumer factory that creates a mock consumer with commit tracking. */
+  private static class TrackingConsumerFactory
+      implements SerializableFunction<Map<String, Object>, Consumer<byte[], 
byte[]>> {
+
+    private final List<String> topics;
+    private final int partitionsPerTopic;
+    private final int numElements;
+    final AtomicInteger commitCounter = new AtomicInteger(0);
+
+    TrackingConsumerFactory(List<String> topics, int partitionsPerTopic, int 
numElements) {
+      this.topics = topics;
+      this.partitionsPerTopic = partitionsPerTopic;
+      this.numElements = numElements;
+    }
+
+    @Override
+    public Consumer<byte[], byte[]> apply(Map<String, Object> config) {
+      return createMockConsumer(
+          topics,
+          partitionsPerTopic,
+          numElements,
+          config,
+          i -> ByteBuffer.wrap(new byte[4]).putInt(i).array(),
+          i -> ByteBuffer.wrap(new byte[8]).putLong((long) i).array());
+    }
+
+    private MockConsumer<byte[], byte[]> createMockConsumer(
+        List<String> topics,
+        int partitionsPerTopic,
+        int numElements,
+        Map<String, Object> config,
+        SerializableFunction<Integer, byte[]> keyFunction,
+        SerializableFunction<Integer, byte[]> valueFunction) {
+
+      final List<TopicPartition> partitions = new ArrayList<>();
+      final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> records 
= new HashMap<>();
+      Map<String, List<PartitionInfo>> partitionMap = new HashMap<>();
+
+      for (String topic : topics) {
+        List<PartitionInfo> partIds = new ArrayList<>(partitionsPerTopic);
+        for (int i = 0; i < partitionsPerTopic; i++) {
+          TopicPartition tp = new TopicPartition(topic, i);
+          partitions.add(tp);
+          partIds.add(new PartitionInfo(topic, i, null, null, null));
+          records.put(tp, new ArrayList<>());
+        }
+        partitionMap.put(topic, partIds);
+      }
+
+      int numPartitions = partitions.size();
+      final long[] offsets = new long[numPartitions];
+
+      for (int i = 0; i < numElements; i++) {
+        int pIdx = i % numPartitions;
+        TopicPartition tp = partitions.get(pIdx);
+
+        byte[] key = keyFunction.apply(i);
+        byte[] value = valueFunction.apply(i);
+
+        records
+            .get(tp)
+            .add(
+                new ConsumerRecord<>(
+                    tp.topic(),
+                    tp.partition(),
+                    offsets[pIdx]++,
+                    LOG_APPEND_START_TIME.getMillis() + 
Duration.standardSeconds(i).getMillis(),
+                    TimestampType.LOG_APPEND_TIME,
+                    0,
+                    key.length,
+                    value.length,
+                    key,
+                    value));
+      }
+
+      final AtomicReference<List<TopicPartition>> assignedPartitions =
+          new AtomicReference<>(Collections.emptyList());
+
+      final MockConsumer<byte[], byte[]> consumer =
+          new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
+            @Override
+            public synchronized void assign(final Collection<TopicPartition> 
assigned) {
+              super.assign(assigned);
+              assignedPartitions.set(ImmutableList.copyOf(assigned));
+            }
+
+            @Override
+            public synchronized void commitSync(Map<TopicPartition, 
OffsetAndMetadata> offsets) {
+              if (!offsets.isEmpty()) {
+                commitCounter.incrementAndGet();
+              }
+              super.commitSync(offsets);
+            }
+          };
+
+      partitionMap.forEach(consumer::updatePartitions);
+      consumer.updateBeginningOffsets(
+          
records.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> 
0L)));
+      consumer.updateEndOffsets(
+          records.entrySet().stream()
+              .collect(Collectors.toMap(Map.Entry::getKey, e -> (long) 
e.getValue().size())));
+
+      Runnable recordEnqueueTask =
+          new Runnable() {
+            @Override
+            public void run() {
+              int recordsAdded = 0;
+              for (TopicPartition tp : assignedPartitions.get()) {
+                long curPos = consumer.position(tp);
+                for (ConsumerRecord<byte[], byte[]> r : records.get(tp)) {
+                  if (r.offset() >= curPos) {
+                    consumer.addRecord(r);
+                    recordsAdded++;
+                  }
+                }
+              }
+              if (recordsAdded == 0) {
+                Uninterruptibles.sleepUninterruptibly(10, 
TimeUnit.MILLISECONDS);
+              }
+              consumer.schedulePollTask(this);
+            }
+          };
+
+      consumer.schedulePollTask(recordEnqueueTask);
+      return consumer;
+    }
+  }
+}

Reply via email to