This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c111c8b83ef [KafkaIO] Make ReadFromKafkaDoFn restriction trackers 
unsplittable (#36935)
c111c8b83ef is described below

commit c111c8b83efd123fd5d9a1810c9f5f9291863a68
Author: Steven van Rossum <[email protected]>
AuthorDate: Thu Dec 18 10:37:54 2025 +0100

    [KafkaIO] Make ReadFromKafkaDoFn restriction trackers unsplittable (#36935)
---
 .../splittabledofn/GrowableOffsetRangeTracker.java |  2 +-
 .../UnsplittableRestrictionTracker.java            | 70 ++++++++++++++++++++++
 .../beam/sdk/io/kafka/ReadFromKafkaDoFn.java       | 19 ++++--
 .../org/apache/beam/sdk/io/kafka/KafkaIOIT.java    | 31 ++++++++++
 4 files changed, 117 insertions(+), 5 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java
index 97b0d9b8e78..75c25118c39 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java
@@ -30,7 +30,7 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Uns
  * used as the end of the range to indicate infinity.
  *
  * <p>An offset range is considered growable when the end offset could grow 
(or change) during
- * execution time (e.g., Kafka topic partition offset, appended file, ...).
+ * execution time (e.g., appended file, ...).
  *
  * <p>The growable range is marked as done by claiming {@code Long.MAX_VALUE}.
  */
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/UnsplittableRestrictionTracker.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/UnsplittableRestrictionTracker.java
new file mode 100644
index 00000000000..e09ebfba37f
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/UnsplittableRestrictionTracker.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * A {@link RestrictionTracker} for wrapping a {@link RestrictionTracker} with 
unsplittable
+ * restrictions.
+ *
+ * <p>A restriction is considered unsplittable when restrictions of an element 
must not be processed
+ * simultaneously (e.g., Kafka topic partition).
+ */
+public class UnsplittableRestrictionTracker<RestrictionT, PositionT>
+    extends RestrictionTracker<RestrictionT, PositionT> implements 
RestrictionTracker.HasProgress {
+  private final RestrictionTracker<RestrictionT, PositionT> tracker;
+
+  public UnsplittableRestrictionTracker(RestrictionTracker<RestrictionT, 
PositionT> tracker) {
+    this.tracker = tracker;
+  }
+
+  @Override
+  public boolean tryClaim(PositionT position) {
+    return tracker.tryClaim(position);
+  }
+
+  @Override
+  public RestrictionT currentRestriction() {
+    return tracker.currentRestriction();
+  }
+
+  @Override
+  public @Nullable SplitResult<RestrictionT> trySplit(double 
fractionOfRemainder) {
+    return fractionOfRemainder > 0.0 && fractionOfRemainder < 1.0
+        ? null
+        : tracker.trySplit(fractionOfRemainder);
+  }
+
+  @Override
+  public void checkDone() throws IllegalStateException {
+    tracker.checkDone();
+  }
+
+  @Override
+  public IsBounded isBounded() {
+    return tracker.isBounded();
+  }
+
+  @Override
+  public Progress getProgress() {
+    return tracker instanceof RestrictionTracker.HasProgress
+        ? ((RestrictionTracker.HasProgress) tracker).getProgress()
+        : Progress.NONE;
+  }
+}
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
index 60fc9d57a62..a05abba06e7 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
@@ -45,6 +45,7 @@ import 
org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
 import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import 
org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
+import 
org.apache.beam.sdk.transforms.splittabledofn.UnsplittableRestrictionTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
 import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -108,6 +109,15 @@ import org.slf4j.LoggerFactory;
  *
  * <h4>Splitting</h4>
  *
+ * <p>Consumer group members must not consume from the same {@link 
TopicPartition} simultaneously
+ * when {@code enable.auto.commit} is set. Doing so may arbitrarily overwrite 
a consumer group's
+ * committed offset for a {@link TopicPartition}. Restriction trackers for a 
{@link
+ * KafkaSourceDescriptor} are wrapped as {@link 
UnsplittableRestrictionTracker<OffsetRange, Long>}
+ * and will only return a non-null {@link 
org.apache.beam.sdk.transforms.splittabledofn.SplitResult}
+ * for a checkpoint. To the extent possible in the SDK, this reduces the risk 
of overwriting
+ * committed offsets when {@code enable.auto.commit} is set and prevents 
concurrent use of
+ * per-{@TopicPartition} cached {@link Consumer} resources.
+ *
  * <p>TODO(https://github.com/apache/beam/issues/20280): Add support for 
initial splitting.
  *
  * <h4>Checkpoint and Resume Processing</h4>
@@ -488,20 +498,21 @@ abstract class ReadFromKafkaDoFn<K, V>
 
   @NewTracker
   @RequiresNonNull({"latestOffsetEstimatorCache"})
-  public OffsetRangeTracker restrictionTracker(
+  public UnsplittableRestrictionTracker<OffsetRange, Long> restrictionTracker(
       @Element KafkaSourceDescriptor kafkaSourceDescriptor, @Restriction 
OffsetRange restriction) {
     final LoadingCache<KafkaSourceDescriptor, KafkaLatestOffsetEstimator>
         latestOffsetEstimatorCache = this.latestOffsetEstimatorCache;
 
     if (restriction.getTo() < Long.MAX_VALUE) {
-      return new OffsetRangeTracker(restriction);
+      return new UnsplittableRestrictionTracker<>(new 
OffsetRangeTracker(restriction));
     }
 
     // OffsetEstimators are cached for each topic-partition because they hold 
a stateful connection,
     // so we want to minimize the amount of connections that we start and 
track with Kafka. Another
     // point is that it has a memoized backlog, and this should make that more 
reusable estimations.
-    return new GrowableOffsetRangeTracker(
-        restriction.getFrom(), 
latestOffsetEstimatorCache.getUnchecked(kafkaSourceDescriptor));
+    return new UnsplittableRestrictionTracker<>(
+        new GrowableOffsetRangeTracker(
+            restriction.getFrom(), 
latestOffsetEstimatorCache.getUnchecked(kafkaSourceDescriptor)));
   }
 
   @ProcessElement
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
index 0e8cbd2183c..b1133eadb1c 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
@@ -813,6 +813,37 @@ public class KafkaIOIT {
     runWithStopReadingFn(checkStopReadingFn, "delayed-stop-reading", 
sourceOptions.numRecords);
   }
 
+  @Test
+  public void testKafkaWithStopReadTime() throws IOException {
+    writePipeline
+        .apply("Generate records", Read.from(new 
SyntheticBoundedSource(sourceOptions)))
+        .apply("Measure write time", ParDo.of(new TimeMonitor<>(NAMESPACE, 
WRITE_TIME_METRIC_NAME)))
+        .apply(
+            "Write to Kafka",
+            writeToKafka().withTopic(options.getKafkaTopic() + 
"-stop-read-time"));
+
+    PipelineResult writeResult = writePipeline.run();
+    PipelineResult.State writeState = writeResult.waitUntilFinish();
+    assertNotEquals(PipelineResult.State.FAILED, writeState);
+
+    sdfReadPipeline.getOptions().as(Options.class).setStreaming(false);
+    PCollection<KafkaRecord<byte[], byte[]>> rows =
+        sdfReadPipeline.apply(
+            "Read from bounded Kafka",
+            readFromKafka()
+                .withTopic(options.getKafkaTopic() + "-stop-read-time")
+                .withStopReadTime(
+                    org.joda.time.Instant.ofEpochMilli(
+                        new MetricsReader(writeResult, NAMESPACE)
+                            .getEndTimeMetric(WRITE_TIME_METRIC_NAME))));
+
+    PipelineResult readResult = sdfReadPipeline.run();
+    PipelineResult.State readState =
+        
readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
+    cancelIfTimeouted(readResult, readState);
+    assertNotEquals(PipelineResult.State.FAILED, readState);
+  }
+
   public static final Schema KAFKA_TOPIC_SCHEMA =
       Schema.builder()
           .addStringField("name")

Reply via email to