This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6a8244815b0 Move Kafka read with redistribute override to Kafka IO
package. (#36887)
6a8244815b0 is described below
commit 6a8244815b0f222b9de7cae71090620db30b5218
Author: Tom Stepp <[email protected]>
AuthorDate: Mon Nov 24 12:19:37 2025 -0800
Move Kafka read with redistribute override to Kafka IO package. (#36887)
* Add kafka read override to Dataflow java runner.
* Fix spot bugs (spacing)
* Add unit test of redistribute override
* Update test dependencies via gradle
* Add logic and test case for explicitly disabled.
* Add explicitly enabled test case
* Use boolean asserts over assertThat, assert each read is visited only
once, refine suppressed lint warnings to just instanceof on matches method.
* Move kafka read with redistribute override to Kafka IO package.
* Lint fixes
* Remove Kafka test dependencies for Dataflow worker
* Ignore abandoned nodes in the test since we just need to replace the
transforms.
---
runners/google-cloud-dataflow-java/build.gradle | 2 --
.../java/org/apache/beam/runners/dataflow/DataflowRunner.java | 5 +----
.../kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 7 +++++++
.../beam/sdk/io/kafka}/KafkaReadWithRedistributeOverride.java | 8 +++-----
.../beam/sdk/io/kafka}/KafkaReadWithRedistributeOverrideTest.java | 4 ++--
5 files changed, 13 insertions(+), 13 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/build.gradle
b/runners/google-cloud-dataflow-java/build.gradle
index 3e5ff263765..9f064f2432b 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -129,8 +129,6 @@ dependencies {
testImplementation library.java.google_cloud_dataflow_java_proto_library_all
testImplementation library.java.jackson_dataformat_yaml
testImplementation library.java.mockito_inline
- testImplementation project(":sdks:java:io:kafka")
- testImplementation library.java.kafka_clients
validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
validatesRunner project(path: project.path, configuration:
"testRuntimeMigration")
validatesRunner library.java.hamcrest
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 7d0a151b48b..775e7b91de9 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -659,10 +659,7 @@ public class DataflowRunner extends
PipelineRunner<DataflowPipelineJob> {
try {
overridesBuilder.add(KafkaIO.Read.KAFKA_READ_OVERRIDE);
- overridesBuilder.add(
- PTransformOverride.of(
- KafkaReadWithRedistributeOverride.matcher(),
- new KafkaReadWithRedistributeOverride.Factory()));
+ overridesBuilder.add(KafkaIO.Read.KAFKA_REDISTRIBUTE_OVERRIDE);
} catch (NoClassDefFoundError e) {
// Do nothing. io-kafka is an optional dependency of
runners-google-cloud-dataflow-java
// and only needed when KafkaIO is used in the pipeline.
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 48e4ae2317a..ad553551764 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -1817,6 +1817,13 @@ public class KafkaIO {
return true;
}
+ /** A {@link PTransformOverride} for runners to override redistributed
Kafka Read transforms. */
+ @Internal
+ public static final PTransformOverride KAFKA_REDISTRIBUTE_OVERRIDE =
+ PTransformOverride.of(
+ KafkaReadWithRedistributeOverride.matcher(),
+ new KafkaReadWithRedistributeOverride.Factory<>());
+
/**
* A {@link PTransformOverride} for runners to swap {@link
ReadFromKafkaViaSDF} to legacy Kafka
* read if runners doesn't have a good support on executing unbounded
Splittable DoFn.
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverride.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadWithRedistributeOverride.java
similarity index 90%
rename from
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverride.java
rename to
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadWithRedistributeOverride.java
index 89f0eef9b8c..f8ebaaed56b 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverride.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadWithRedistributeOverride.java
@@ -15,11 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.runners.dataflow;
+package org.apache.beam.sdk.io.kafka;
import java.util.Map;
-import org.apache.beam.sdk.io.kafka.KafkaIO;
-import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformMatcher;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
@@ -48,8 +46,8 @@ public final class KafkaReadWithRedistributeOverride {
}
/**
- * {@link PTransformOverrideFactory} for {@link KafkaIO.Read} that enables
{@code
- * withOffsetDeduplication} when {@code withRedistribute} is enabled.
+ * {@link PTransformOverrideFactory} for {@link
org.apache.beam.sdk.io.kafka.KafkaIO.Read} that
+ * enables {@code withOffsetDeduplication} when {@code withRedistribute} is
enabled.
*/
static class Factory<K, V>
implements PTransformOverrideFactory<
diff --git
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverrideTest.java
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadWithRedistributeOverrideTest.java
similarity index 98%
rename from
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverrideTest.java
rename to
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadWithRedistributeOverrideTest.java
index 05e5dd6a55d..4301aa92ec8 100644
---
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverrideTest.java
+++
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadWithRedistributeOverrideTest.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.runners.dataflow;
+package org.apache.beam.sdk.io.kafka;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.nullValue;
@@ -25,7 +25,6 @@ import static org.junit.Assert.assertTrue;
import java.io.Serializable;
import java.util.Collections;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -129,5 +128,6 @@ public class KafkaReadWithRedistributeOverrideTest
implements Serializable {
}
};
p.traverseTopologically(visitor);
+ p.enableAbandonedNodeEnforcement(false);
}
}