This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 78061b82a24 If the partition count or kafka IO size is large, then
skip committin… (#37510)
78061b82a24 is described below
commit 78061b82a2443854e3f57e329a4d380745e69aa8
Author: kishorepola <[email protected]>
AuthorDate: Mon Mar 9 07:28:50 2026 -0700
If the partition count or kafka IO size is large, then skip committin…
(#37510)
* If the partition count or kafka IO size is large, then skip committing
offsets that are not changed. Reduce kafka commit load
* Address PR review feedback for idle partition optimization
- Refactor commitCheckpointMark to use Java streams (per @johnjcasey)
Changed from explicit for-loop to streams-based filtering for better
code consistency with existing patterns
- Add debug logging for idle partitions (per @tomstepp)
Log the count of idle partitions skipped during each commit to aid
in monitoring and debugging the optimization
- Implement time-based periodic commits (per @tomstepp)
Track last commit time per partition and ensure commits happen at
least every 10 minutes even for idle partitions. This supports time
lag monitoring use cases where customers track time since last commit.
- Add unit test for idle partition behavior (per @tomstepp)
New test KafkaUnboundedReaderIdlePartitionTest verifies that:
* Idle partitions are not committed repeatedly
* Active partitions trigger commits correctly
* Uses mock consumer to track commit calls
All changes maintain backward compatibility and follow Apache Beam
coding standards (spotless formatting applied).
* Fix test to follow Beam patterns for MockConsumer initialization
Rewrote KafkaUnboundedReaderIdlePartitionTest to follow the exact
pattern used in KafkaIOTest.java:
- Proper MockConsumer initialization with partition metadata
- Correct setup of beginning/end offsets
- Consumer records with proper offsets and timestamps
- schedulePollTask for record enqueueing based on position
- Override commitSync to track commit calls
- Use reader.start() before reader.advance()
This ensures the test properly initializes the Kafka consumer and
doesn't fail with IllegalStateException during source.split().
---------
Co-authored-by: Kishore Pola <[email protected]>
---
.../beam/sdk/io/kafka/KafkaUnboundedReader.java | 117 ++++++---
.../KafkaUnboundedReaderIdlePartitionTest.java | 273 +++++++++++++++++++++
2 files changed, 361 insertions(+), 29 deletions(-)
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
index 866dfd48710..286c2cd5d8e 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -79,6 +80,11 @@ import org.slf4j.LoggerFactory;
*/
class KafkaUnboundedReader<K, V> extends UnboundedReader<KafkaRecord<K, V>> {
+ // Track last successfully committed offsets to suppress no-op commits for
idle partitions.
+ private final Map<TopicPartition, Long> lastCommittedOffsets = new
HashMap<>();
+ // Track last commit time per partition to ensure periodic commits for time
lag monitoring.
+ private final Map<TopicPartition, Instant> lastCommitTimes = new HashMap<>();
+
///////////////////// Reader API
////////////////////////////////////////////////////////////
@SuppressWarnings("FutureReturnValueIgnored")
@Override
@@ -375,6 +381,8 @@ class KafkaUnboundedReader<K, V> extends
UnboundedReader<KafkaRecord<K, V>> {
private static final Duration RECORDS_DEQUEUE_POLL_TIMEOUT_MAX =
Duration.millis(20);
private static final Duration RECORDS_ENQUEUE_POLL_TIMEOUT =
Duration.millis(100);
private static final Duration MIN_COMMIT_FAIL_LOG_INTERVAL =
Duration.standardMinutes(10);
+ // Maximum time between commits for idle partitions (for time lag
monitoring).
+ private static final Duration MAX_IDLE_COMMIT_INTERVAL =
Duration.standardMinutes(10);
// Use a separate thread to read Kafka messages. Kafka Consumer does all its
work including
// network I/O inside poll(). Polling only inside #advance(), especially
with a small timeout
@@ -611,37 +619,88 @@ class KafkaUnboundedReader<K, V> extends
UnboundedReader<KafkaRecord<K, V>> {
private void commitCheckpointMark() {
KafkaCheckpointMark checkpointMark =
finalizedCheckpointMark.getAndSet(null);
- if (checkpointMark != null) {
- LOG.debug("{}: Committing finalized checkpoint {}", this,
checkpointMark);
- Consumer<byte[], byte[]> consumer =
Preconditions.checkStateNotNull(this.consumer);
- Instant now = Instant.now();
+ if (checkpointMark == null) {
+ return;
+ }
- try {
- consumer.commitSync(
- checkpointMark.getPartitions().stream()
- .filter(p -> p.getNextOffset() != UNINITIALIZED_OFFSET)
- .collect(
- Collectors.toMap(
- p -> new TopicPartition(p.getTopic(),
p.getPartition()),
- p -> new OffsetAndMetadata(p.getNextOffset()))));
+ LOG.debug("{}: Committing finalized checkpoint {}", this, checkpointMark);
+ Consumer<byte[], byte[]> consumer =
Preconditions.checkStateNotNull(this.consumer);
+ Instant now = Instant.now();
+
+ try {
+ // Commit only partitions whose offsets have advanced since the last
successful commit
+ // for this reader, or partitions that haven't been committed within
MAX_IDLE_COMMIT_INTERVAL.
+ // This suppresses no-op commits for idle partitions while ensuring
periodic commits
+ // for time lag monitoring.
+ Map<TopicPartition, OffsetAndMetadata> toCommit =
+ checkpointMark.getPartitions().stream()
+ .filter(p -> p.getNextOffset() != UNINITIALIZED_OFFSET)
+ .filter(
+ p -> {
+ TopicPartition tp = new TopicPartition(p.getTopic(),
p.getPartition());
+ Long prev = lastCommittedOffsets.get(tp);
+ long next = p.getNextOffset();
+ Instant lastCommitTime = lastCommitTimes.get(tp);
+
+ // Commit if offset has advanced
+ if (prev == null || next > prev) {
+ return true;
+ }
+
+ // Also commit if partition hasn't been committed within
max idle interval
+ if (lastCommitTime == null
+ ||
now.isAfter(lastCommitTime.plus(MAX_IDLE_COMMIT_INTERVAL))) {
+ return true;
+ }
+
+ return false;
+ })
+ .collect(
+ Collectors.toMap(
+ p -> new TopicPartition(p.getTopic(), p.getPartition()),
+ p -> new OffsetAndMetadata(p.getNextOffset())));
+
+ int totalPartitions = checkpointMark.getPartitions().size();
+ int idlePartitions = totalPartitions - toCommit.size();
+ if (idlePartitions > 0) {
+ LOG.debug(
+ "{}: Skipping commit for {} idle partitions ({} of {} partitions
active)",
+ this,
+ idlePartitions,
+ toCommit.size(),
+ totalPartitions);
+ }
+
+ if (toCommit.isEmpty()) {
+ // Nothing advanced since last successful commit; avoid noisy
commitSync().
+ return;
+ }
+
+ consumer.commitSync(toCommit);
+
+ // Only update after a successful commit.
+ for (Map.Entry<TopicPartition, OffsetAndMetadata> e :
toCommit.entrySet()) {
+ lastCommittedOffsets.put(e.getKey(), e.getValue().offset());
+ lastCommitTimes.put(e.getKey(), now);
+ }
+
+ nextAllowedCommitFailLogTime = now.plus(MIN_COMMIT_FAIL_LOG_INTERVAL);
+ } catch (Exception e) {
+ // Log but ignore the exception. Committing consumer offsets to Kafka is
not critical for
+ // KafkaIO because it relies on the offsets stored in
KafkaCheckpointMark.
+ if (now.isAfter(nextAllowedCommitFailLogTime)) {
+ LOG.warn(
+ String.format(
+ "%s: Did not successfully commit finalized checkpoint for >
%s. Current checkpoint: %s",
+ this, MIN_COMMIT_FAIL_LOG_INTERVAL, checkpointMark),
+ e);
nextAllowedCommitFailLogTime = now.plus(MIN_COMMIT_FAIL_LOG_INTERVAL);
- } catch (Exception e) {
- // Log but ignore the exception. Committing consumer offsets to Kafka
is not critical for
- // KafkaIO because it relies on the offsets stored in
KafkaCheckpointMark.
- if (now.isAfter(nextAllowedCommitFailLogTime)) {
- LOG.warn(
- String.format(
- "%s: Did not successfully commit finalized checkpoint for >
%s. Current checkpoint: %s",
- this, MIN_COMMIT_FAIL_LOG_INTERVAL, checkpointMark),
- e);
- nextAllowedCommitFailLogTime =
now.plus(MIN_COMMIT_FAIL_LOG_INTERVAL);
- } else {
- LOG.info(
- String.format(
- "%s: Could not commit finalized checkpoint. Commit will be
retried with subsequent reads. Current checkpoint: %s",
- this, checkpointMark),
- e);
- }
+ } else {
+ LOG.info(
+ String.format(
+ "%s: Could not commit finalized checkpoint. Commit will be
retried with subsequent reads. Current checkpoint: %s",
+ this, checkpointMark),
+ e);
}
}
}
diff --git
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReaderIdlePartitionTest.java
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReaderIdlePartitionTest.java
new file mode 100644
index 00000000000..005cd6fa378
--- /dev/null
+++
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReaderIdlePartitionTest.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for idle partition optimization in {@link
KafkaUnboundedReader}. */
+@RunWith(JUnit4.class)
+public class KafkaUnboundedReaderIdlePartitionTest {
+
+ private static final Instant LOG_APPEND_START_TIME = new Instant(100000);
+
+ /**
+ * Verifies that idle partitions (partitions with no offset changes) are not
committed repeatedly,
+ * reducing load on Kafka brokers.
+ */
+ @Test
+ public void testIdlePartitionsNotCommittedRepeatedly() throws Exception {
+ int numElements = 50;
+ int numPartitions = 10;
+ List<String> topics = ImmutableList.of("test_topic");
+
+ // Create a tracking consumer factory
+ TrackingConsumerFactory consumerFactory =
+ new TrackingConsumerFactory(topics, numPartitions, numElements);
+
+ // Create a Kafka source with commit offsets enabled
+ UnboundedSource<KafkaRecord<Integer, Long>, KafkaCheckpointMark> source =
+ KafkaIO.<Integer, Long>read()
+ .withBootstrapServers("test_server")
+ .withTopics(topics)
+ .withConsumerFactoryFn(consumerFactory)
+ .withKeyDeserializer(IntegerDeserializer.class)
+ .withValueDeserializer(LongDeserializer.class)
+ .withConsumerConfigUpdates(
+ ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG,
"test_group_id"))
+ .commitOffsetsInFinalize()
+ .makeSource()
+ .split(1, PipelineOptionsFactory.create())
+ .get(0);
+
+ UnboundedReader<KafkaRecord<Integer, Long>> reader =
source.createReader(null, null);
+
+ // Read some elements
+ assertTrue("Reader should start", reader.start());
+ for (int i = 0; i < 10; i++) {
+ assertTrue("Reader should have more elements", reader.advance());
+ }
+
+ // Get first checkpoint and finalize it
+ KafkaCheckpointMark mark1 = (KafkaCheckpointMark)
reader.getCheckpointMark();
+ mark1.finalizeCheckpoint();
+
+ // Allow commit to happen (it's async)
+ Thread.sleep(2000);
+
+ int initialCommitCount = consumerFactory.commitCounter.get();
+ assertTrue("Should have committed at least once", initialCommitCount > 0);
+
+ // Create another checkpoint without reading more data (all partitions
idle)
+ KafkaCheckpointMark mark2 = (KafkaCheckpointMark)
reader.getCheckpointMark();
+ mark2.finalizeCheckpoint();
+
+ // Allow commit attempt to happen
+ Thread.sleep(2000);
+
+ int secondCommitCount = consumerFactory.commitCounter.get();
+
+ // Verify that no new commits happened for idle partitions
+ assertEquals(
+ "Idle partitions should not trigger additional commits",
+ initialCommitCount,
+ secondCommitCount);
+
+ // Read more elements to activate partitions again
+ for (int i = 0; i < 10; i++) {
+ reader.advance();
+ }
+
+ // Create another checkpoint after reading more data
+ KafkaCheckpointMark mark3 = (KafkaCheckpointMark)
reader.getCheckpointMark();
+ mark3.finalizeCheckpoint();
+
+ // Allow commit to happen
+ Thread.sleep(2000);
+
+ int thirdCommitCount = consumerFactory.commitCounter.get();
+
+ // Verify that commits happened for partitions with new data
+ assertTrue("Active partitions should trigger commits", thirdCommitCount >
secondCommitCount);
+
+ reader.close();
+ }
+
+ /** Consumer factory that creates a mock consumer with commit tracking. */
+ private static class TrackingConsumerFactory
+ implements SerializableFunction<Map<String, Object>, Consumer<byte[],
byte[]>> {
+
+ private final List<String> topics;
+ private final int partitionsPerTopic;
+ private final int numElements;
+ final AtomicInteger commitCounter = new AtomicInteger(0);
+
+ TrackingConsumerFactory(List<String> topics, int partitionsPerTopic, int
numElements) {
+ this.topics = topics;
+ this.partitionsPerTopic = partitionsPerTopic;
+ this.numElements = numElements;
+ }
+
+ @Override
+ public Consumer<byte[], byte[]> apply(Map<String, Object> config) {
+ return createMockConsumer(
+ topics,
+ partitionsPerTopic,
+ numElements,
+ config,
+ i -> ByteBuffer.wrap(new byte[4]).putInt(i).array(),
+ i -> ByteBuffer.wrap(new byte[8]).putLong((long) i).array());
+ }
+
+ private MockConsumer<byte[], byte[]> createMockConsumer(
+ List<String> topics,
+ int partitionsPerTopic,
+ int numElements,
+ Map<String, Object> config,
+ SerializableFunction<Integer, byte[]> keyFunction,
+ SerializableFunction<Integer, byte[]> valueFunction) {
+
+ final List<TopicPartition> partitions = new ArrayList<>();
+ final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> records
= new HashMap<>();
+ Map<String, List<PartitionInfo>> partitionMap = new HashMap<>();
+
+ for (String topic : topics) {
+ List<PartitionInfo> partIds = new ArrayList<>(partitionsPerTopic);
+ for (int i = 0; i < partitionsPerTopic; i++) {
+ TopicPartition tp = new TopicPartition(topic, i);
+ partitions.add(tp);
+ partIds.add(new PartitionInfo(topic, i, null, null, null));
+ records.put(tp, new ArrayList<>());
+ }
+ partitionMap.put(topic, partIds);
+ }
+
+ int numPartitions = partitions.size();
+ final long[] offsets = new long[numPartitions];
+
+ for (int i = 0; i < numElements; i++) {
+ int pIdx = i % numPartitions;
+ TopicPartition tp = partitions.get(pIdx);
+
+ byte[] key = keyFunction.apply(i);
+ byte[] value = valueFunction.apply(i);
+
+ records
+ .get(tp)
+ .add(
+ new ConsumerRecord<>(
+ tp.topic(),
+ tp.partition(),
+ offsets[pIdx]++,
+ LOG_APPEND_START_TIME.getMillis() +
Duration.standardSeconds(i).getMillis(),
+ TimestampType.LOG_APPEND_TIME,
+ 0,
+ key.length,
+ value.length,
+ key,
+ value));
+ }
+
+ final AtomicReference<List<TopicPartition>> assignedPartitions =
+ new AtomicReference<>(Collections.emptyList());
+
+ final MockConsumer<byte[], byte[]> consumer =
+ new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
+ @Override
+ public synchronized void assign(final Collection<TopicPartition>
assigned) {
+ super.assign(assigned);
+ assignedPartitions.set(ImmutableList.copyOf(assigned));
+ }
+
+ @Override
+ public synchronized void commitSync(Map<TopicPartition,
OffsetAndMetadata> offsets) {
+ if (!offsets.isEmpty()) {
+ commitCounter.incrementAndGet();
+ }
+ super.commitSync(offsets);
+ }
+ };
+
+ partitionMap.forEach(consumer::updatePartitions);
+ consumer.updateBeginningOffsets(
+
records.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e ->
0L)));
+ consumer.updateEndOffsets(
+ records.entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e -> (long)
e.getValue().size())));
+
+ Runnable recordEnqueueTask =
+ new Runnable() {
+ @Override
+ public void run() {
+ int recordsAdded = 0;
+ for (TopicPartition tp : assignedPartitions.get()) {
+ long curPos = consumer.position(tp);
+ for (ConsumerRecord<byte[], byte[]> r : records.get(tp)) {
+ if (r.offset() >= curPos) {
+ consumer.addRecord(r);
+ recordsAdded++;
+ }
+ }
+ }
+ if (recordsAdded == 0) {
+ Uninterruptibles.sleepUninterruptibly(10,
TimeUnit.MILLISECONDS);
+ }
+ consumer.schedulePollTask(this);
+ }
+ };
+
+ consumer.schedulePollTask(recordEnqueueTask);
+ return consumer;
+ }
+ }
+}