This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c111c8b83ef [KafkaIO] Make ReadFromKafkaDoFn restriction trackers
unsplittable (#36935)
c111c8b83ef is described below
commit c111c8b83efd123fd5d9a1810c9f5f9291863a68
Author: Steven van Rossum <[email protected]>
AuthorDate: Thu Dec 18 10:37:54 2025 +0100
[KafkaIO] Make ReadFromKafkaDoFn restriction trackers unsplittable (#36935)
---
.../splittabledofn/GrowableOffsetRangeTracker.java | 2 +-
.../UnsplittableRestrictionTracker.java | 70 ++++++++++++++++++++++
.../beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 19 ++++--
.../org/apache/beam/sdk/io/kafka/KafkaIOIT.java | 31 ++++++++++
4 files changed, 117 insertions(+), 5 deletions(-)
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java
index 97b0d9b8e78..75c25118c39 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java
@@ -30,7 +30,7 @@ import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Uns
* used as the end of the range to indicate infinity.
*
* <p>An offset range is considered growable when the end offset could grow
(or change) during
- * execution time (e.g., Kafka topic partition offset, appended file, ...).
+ * execution time (e.g., appended file, ...).
*
* <p>The growable range is marked as done by claiming {@code Long.MAX_VALUE}.
*/
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/UnsplittableRestrictionTracker.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/UnsplittableRestrictionTracker.java
new file mode 100644
index 00000000000..e09ebfba37f
--- /dev/null
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/UnsplittableRestrictionTracker.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * A {@link RestrictionTracker} for wrapping a {@link RestrictionTracker} with
unsplittable
+ * restrictions.
+ *
+ * <p>A restriction is considered unsplittable when restrictions of an element
must not be processed
+ * simultaneously (e.g., Kafka topic partition).
+ */
+public class UnsplittableRestrictionTracker<RestrictionT, PositionT>
+ extends RestrictionTracker<RestrictionT, PositionT> implements
RestrictionTracker.HasProgress {
+ private final RestrictionTracker<RestrictionT, PositionT> tracker;
+
+ public UnsplittableRestrictionTracker(RestrictionTracker<RestrictionT,
PositionT> tracker) {
+ this.tracker = tracker;
+ }
+
+ @Override
+ public boolean tryClaim(PositionT position) {
+ return tracker.tryClaim(position);
+ }
+
+ @Override
+ public RestrictionT currentRestriction() {
+ return tracker.currentRestriction();
+ }
+
+ @Override
+ public @Nullable SplitResult<RestrictionT> trySplit(double
fractionOfRemainder) {
+ return fractionOfRemainder > 0.0 && fractionOfRemainder < 1.0
+ ? null
+ : tracker.trySplit(fractionOfRemainder);
+ }
+
+ @Override
+ public void checkDone() throws IllegalStateException {
+ tracker.checkDone();
+ }
+
+ @Override
+ public IsBounded isBounded() {
+ return tracker.isBounded();
+ }
+
+ @Override
+ public Progress getProgress() {
+ return tracker instanceof RestrictionTracker.HasProgress
+ ? ((RestrictionTracker.HasProgress) tracker).getProgress()
+ : Progress.NONE;
+ }
+}
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
index 60fc9d57a62..a05abba06e7 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
@@ -45,6 +45,7 @@ import
org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import
org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
+import
org.apache.beam.sdk.transforms.splittabledofn.UnsplittableRestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -108,6 +109,15 @@ import org.slf4j.LoggerFactory;
*
* <h4>Splitting</h4>
*
+ * <p>Consumer group members must not consume from the same {@link
TopicPartition} simultaneously
+ * when {@code enable.auto.commit} is set. Doing so may arbitrarily overwrite
a consumer group's
+ * committed offset for a {@link TopicPartition}. Restriction trackers for a
{@link
+ * KafkaSourceDescriptor} are wrapped as {@link
UnsplittableRestrictionTracker<OffsetRange, Long>}
+ * and will only return a non-null {@link
org.apache.beam.sdk.transforms.splittabledofn.SplitResult}
+ * for a checkpoint. To the extent possible in the SDK, this reduces the risk
of overwriting
+ * committed offsets when {@code enable.auto.commit} is set and prevents
concurrent use of
+ * per-{@TopicPartition} cached {@link Consumer} resources.
+ *
* <p>TODO(https://github.com/apache/beam/issues/20280): Add support for
initial splitting.
*
* <h4>Checkpoint and Resume Processing</h4>
@@ -488,20 +498,21 @@ abstract class ReadFromKafkaDoFn<K, V>
@NewTracker
@RequiresNonNull({"latestOffsetEstimatorCache"})
- public OffsetRangeTracker restrictionTracker(
+ public UnsplittableRestrictionTracker<OffsetRange, Long> restrictionTracker(
@Element KafkaSourceDescriptor kafkaSourceDescriptor, @Restriction
OffsetRange restriction) {
final LoadingCache<KafkaSourceDescriptor, KafkaLatestOffsetEstimator>
latestOffsetEstimatorCache = this.latestOffsetEstimatorCache;
if (restriction.getTo() < Long.MAX_VALUE) {
- return new OffsetRangeTracker(restriction);
+ return new UnsplittableRestrictionTracker<>(new
OffsetRangeTracker(restriction));
}
// OffsetEstimators are cached for each topic-partition because they hold
a stateful connection,
// so we want to minimize the amount of connections that we start and
track with Kafka. Another
// point is that it has a memoized backlog, and this should make that more
reusable estimations.
- return new GrowableOffsetRangeTracker(
- restriction.getFrom(),
latestOffsetEstimatorCache.getUnchecked(kafkaSourceDescriptor));
+ return new UnsplittableRestrictionTracker<>(
+ new GrowableOffsetRangeTracker(
+ restriction.getFrom(),
latestOffsetEstimatorCache.getUnchecked(kafkaSourceDescriptor)));
}
@ProcessElement
diff --git
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
index 0e8cbd2183c..b1133eadb1c 100644
---
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
+++
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
@@ -813,6 +813,37 @@ public class KafkaIOIT {
runWithStopReadingFn(checkStopReadingFn, "delayed-stop-reading",
sourceOptions.numRecords);
}
+ @Test
+ public void testKafkaWithStopReadTime() throws IOException {
+ writePipeline
+ .apply("Generate records", Read.from(new
SyntheticBoundedSource(sourceOptions)))
+ .apply("Measure write time", ParDo.of(new TimeMonitor<>(NAMESPACE,
WRITE_TIME_METRIC_NAME)))
+ .apply(
+ "Write to Kafka",
+ writeToKafka().withTopic(options.getKafkaTopic() +
"-stop-read-time"));
+
+ PipelineResult writeResult = writePipeline.run();
+ PipelineResult.State writeState = writeResult.waitUntilFinish();
+ assertNotEquals(PipelineResult.State.FAILED, writeState);
+
+ sdfReadPipeline.getOptions().as(Options.class).setStreaming(false);
+ PCollection<KafkaRecord<byte[], byte[]>> rows =
+ sdfReadPipeline.apply(
+ "Read from bounded Kafka",
+ readFromKafka()
+ .withTopic(options.getKafkaTopic() + "-stop-read-time")
+ .withStopReadTime(
+ org.joda.time.Instant.ofEpochMilli(
+ new MetricsReader(writeResult, NAMESPACE)
+ .getEndTimeMetric(WRITE_TIME_METRIC_NAME))));
+
+ PipelineResult readResult = sdfReadPipeline.run();
+ PipelineResult.State readState =
+
readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
+ cancelIfTimeouted(readResult, readState);
+ assertNotEquals(PipelineResult.State.FAILED, readState);
+ }
+
public static final Schema KAFKA_TOPIC_SCHEMA =
Schema.builder()
.addStringField("name")