This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8ef3a2446bc Change KafkaIO to default to offset-based deduplication
when redistribute is enabled for Dataflow java runner. (#36849)
8ef3a2446bc is described below
commit 8ef3a2446bc8251bd11e1a08580d5f9a9c2e31db
Author: Tom Stepp <[email protected]>
AuthorDate: Fri Nov 21 02:16:31 2025 -0800
Change KafkaIO to default to offset-based deduplication when redistribute
is enabled for Dataflow java runner. (#36849)
* Add kafka read override to Dataflow java runner.
---
runners/google-cloud-dataflow-java/build.gradle | 2 +
.../beam/runners/dataflow/DataflowRunner.java | 4 +
.../KafkaReadWithRedistributeOverride.java | 75 ++++++++++++
.../KafkaReadWithRedistributeOverrideTest.java | 133 +++++++++++++++++++++
4 files changed, 214 insertions(+)
diff --git a/runners/google-cloud-dataflow-java/build.gradle
b/runners/google-cloud-dataflow-java/build.gradle
index 415132fa7d2..0961a385b21 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -129,6 +129,8 @@ dependencies {
testImplementation library.java.google_cloud_dataflow_java_proto_library_all
testImplementation library.java.jackson_dataformat_yaml
testImplementation library.java.mockito_inline
+ testImplementation project(":sdks:java:io:kafka")
+ testImplementation library.java.kafka_clients
validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
validatesRunner project(path: project.path, configuration:
"testRuntimeMigration")
validatesRunner library.java.hamcrest
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 7e23182042c..7d0a151b48b 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -659,6 +659,10 @@ public class DataflowRunner extends
PipelineRunner<DataflowPipelineJob> {
try {
overridesBuilder.add(KafkaIO.Read.KAFKA_READ_OVERRIDE);
+ overridesBuilder.add(
+ PTransformOverride.of(
+ KafkaReadWithRedistributeOverride.matcher(),
+ new KafkaReadWithRedistributeOverride.Factory()));
} catch (NoClassDefFoundError e) {
// Do nothing. io-kafka is an optional dependency of
runners-google-cloud-dataflow-java
// and only needed when KafkaIO is used in the pipeline.
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverride.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverride.java
new file mode 100644
index 00000000000..89f0eef9b8c
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverride.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import java.util.Map;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
+import org.apache.beam.sdk.io.kafka.KafkaRecord;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.runners.PTransformMatcher;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
+import org.apache.beam.sdk.util.construction.ReplacementOutputs;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+
+public final class KafkaReadWithRedistributeOverride {
+
+ private KafkaReadWithRedistributeOverride() {}
+
+ public static PTransformMatcher matcher() {
+ return new PTransformMatcher() {
+ @SuppressWarnings({
+ "PatternMatchingInstanceof" // For compiling on older Java versions.
+ })
+ @Override
+ public boolean matches(AppliedPTransform<?, ?, ?> application) {
+ if (application.getTransform() instanceof KafkaIO.Read) {
+ return ((KafkaIO.Read) application.getTransform()).isRedistributed();
+ }
+ return false;
+ }
+ };
+ }
+
+ /**
+ * {@link PTransformOverrideFactory} for {@link KafkaIO.Read} that enables
{@code
+ * withOffsetDeduplication} when {@code withRedistribute} is enabled.
+ */
+ static class Factory<K, V>
+ implements PTransformOverrideFactory<
+ PBegin, PCollection<KafkaRecord<K, V>>, KafkaIO.Read<K, V>> {
+
+ @Override
+ public PTransformReplacement<PBegin, PCollection<KafkaRecord<K, V>>>
getReplacementTransform(
+ AppliedPTransform<PBegin, PCollection<KafkaRecord<K, V>>,
KafkaIO.Read<K, V>> transform) {
+ KafkaIO.Read<K, V> read = transform.getTransform();
+ if (read.getOffsetDeduplication() == null) {
+ return PTransformReplacement.of(
+ transform.getPipeline().begin(),
read.withOffsetDeduplication(true));
+ }
+ return PTransformReplacement.of(transform.getPipeline().begin(), read);
+ }
+
+ @Override
+ public Map<PCollection<?>, ReplacementOutput> mapOutputs(
+ Map<TupleTag<?>, PCollection<?>> outputs, PCollection<KafkaRecord<K,
V>> newOutput) {
+ return ReplacementOutputs.singleton(outputs, newOutput);
+ }
+ }
+}
diff --git
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverrideTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverrideTest.java
new file mode 100644
index 00000000000..05e5dd6a55d
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverrideTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.Serializable;
+import java.util.Collections;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class KafkaReadWithRedistributeOverrideTest implements Serializable {
+ @Rule public transient TestPipeline p = TestPipeline.create();
+
+ @Test
+ public void testOverrideAppliedWhenRedistributeEnabled() {
+ p.apply(
+ "MatchingRead",
+ KafkaIO.<String, String>read()
+ .withBootstrapServers("localhost:9092")
+ .withTopic("test_match")
+ .withKeyDeserializer(StringDeserializer.class)
+ .withValueDeserializer(StringDeserializer.class)
+ .withRedistribute());
+ p.apply(
+ "NoRedistribute",
+ KafkaIO.<String, String>read()
+ .withBootstrapServers("localhost:9092")
+ .withTopic("test_no_redistribute")
+ .withKeyDeserializer(StringDeserializer.class)
+ .withValueDeserializer(StringDeserializer.class));
+ p.apply(
+ "ExplicitlyDisable",
+ KafkaIO.<String, String>read()
+ .withBootstrapServers("localhost:9092")
+ .withTopic("test_disabled")
+ .withKeyDeserializer(StringDeserializer.class)
+ .withValueDeserializer(StringDeserializer.class)
+ .withOffsetDeduplication(false));
+ p.apply(
+ "ExplicitlyEnable",
+ KafkaIO.<String, String>read()
+ .withBootstrapServers("localhost:9092")
+ .withTopic("test_enabled")
+ .withKeyDeserializer(StringDeserializer.class)
+ .withValueDeserializer(StringDeserializer.class)
+ .withRedistribute()
+ .withOffsetDeduplication(true));
+
+ p.replaceAll(
+ Collections.singletonList(
+ PTransformOverride.of(
+ KafkaReadWithRedistributeOverride.matcher(),
+ new KafkaReadWithRedistributeOverride.Factory<>())));
+
+ Pipeline.PipelineVisitor visitor =
+ new Pipeline.PipelineVisitor.Defaults() {
+
+ private boolean matchingVisited = false;
+ private boolean noRedistributeVisited = false;
+ private boolean explicitlyDisabledVisited = false;
+ private boolean explicitlyEnabledVisited = false;
+
+ @Override
+ public CompositeBehavior enterCompositeTransform(Node node) {
+ if (node.getTransform() instanceof KafkaIO.Read) {
+ KafkaIO.Read<?, ?> read = (KafkaIO.Read<?, ?>)
node.getTransform();
+ if (read.getTopics().contains("test_match")) {
+ assertTrue(read.isRedistributed());
+ assertTrue(read.getOffsetDeduplication());
+ assertFalse(matchingVisited);
+ matchingVisited = true;
+ } else if (read.getTopics().contains("test_no_redistribute")) {
+ assertFalse(read.isRedistributed());
+ assertThat(read.getOffsetDeduplication(), nullValue());
+ assertFalse(noRedistributeVisited);
+ noRedistributeVisited = true;
+ } else if (read.getTopics().contains("test_disabled")) {
+ assertFalse(read.isRedistributed());
+ assertFalse(read.getOffsetDeduplication());
+ assertFalse(explicitlyDisabledVisited);
+ explicitlyDisabledVisited = true;
+ } else if (read.getTopics().contains("test_enabled")) {
+ assertTrue(read.isRedistributed());
+ assertTrue(read.getOffsetDeduplication());
+ assertFalse(explicitlyEnabledVisited);
+ explicitlyEnabledVisited = true;
+ }
+ }
+ return CompositeBehavior.ENTER_TRANSFORM;
+ }
+
+ @Override
+ public void leaveCompositeTransform(Node node) {
+ if (node.isRootNode()) {
+ assertTrue("Matching transform was not visited",
matchingVisited);
+ assertTrue("No redistribute transform was not visited",
noRedistributeVisited);
+ assertTrue(
+ "Explicitly disabled transform was not visited",
explicitlyDisabledVisited);
+ assertTrue("Explicitly enabled transform was not visited",
explicitlyEnabledVisited);
+ }
+ }
+ };
+ p.traverseTopologically(visitor);
+ }
+}