This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a8244815b0 Move Kafka read with redistribute override to Kafka IO 
package. (#36887)
6a8244815b0 is described below

commit 6a8244815b0f222b9de7cae71090620db30b5218
Author: Tom Stepp <[email protected]>
AuthorDate: Mon Nov 24 12:19:37 2025 -0800

    Move Kafka read with redistribute override to Kafka IO package. (#36887)
    
    * Add kafka read override to Dataflow java runner.
    
    * Fix spot bugs (spacing)
    
    * Add unit test of redistribute override
    
    * Update test dependencies via gradle
    
    * Add logic and test case for explicitly disabled.
    
    * Add explicitly enabled test case
    
    * Use boolean asserts over assertThat, assert each read is visited only 
once, refine suppressed lint warnings to just instanceof on matches method.
    
    * Move kafka read with redistribute override to Kafka IO package.
    
    * Lint fixes
    
    * Remove Kafka test dependencies for Dataflow worker
    
    * Ignore abandoned nodes in the test since we just need to replace the 
transforms.
---
 runners/google-cloud-dataflow-java/build.gradle                   | 2 --
 .../java/org/apache/beam/runners/dataflow/DataflowRunner.java     | 5 +----
 .../kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 7 +++++++
 .../beam/sdk/io/kafka}/KafkaReadWithRedistributeOverride.java     | 8 +++-----
 .../beam/sdk/io/kafka}/KafkaReadWithRedistributeOverrideTest.java | 4 ++--
 5 files changed, 13 insertions(+), 13 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/build.gradle 
b/runners/google-cloud-dataflow-java/build.gradle
index 3e5ff263765..9f064f2432b 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -129,8 +129,6 @@ dependencies {
   testImplementation library.java.google_cloud_dataflow_java_proto_library_all
   testImplementation library.java.jackson_dataformat_yaml
   testImplementation library.java.mockito_inline
-  testImplementation project(":sdks:java:io:kafka")
-  testImplementation library.java.kafka_clients
   validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
   validatesRunner project(path: project.path, configuration: 
"testRuntimeMigration")
   validatesRunner library.java.hamcrest
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 7d0a151b48b..775e7b91de9 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -659,10 +659,7 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
 
       try {
         overridesBuilder.add(KafkaIO.Read.KAFKA_READ_OVERRIDE);
-        overridesBuilder.add(
-            PTransformOverride.of(
-                KafkaReadWithRedistributeOverride.matcher(),
-                new KafkaReadWithRedistributeOverride.Factory()));
+        overridesBuilder.add(KafkaIO.Read.KAFKA_REDISTRIBUTE_OVERRIDE);
       } catch (NoClassDefFoundError e) {
         // Do nothing. io-kafka is an optional dependency of 
runners-google-cloud-dataflow-java
         // and only needed when KafkaIO is used in the pipeline.
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 48e4ae2317a..ad553551764 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -1817,6 +1817,13 @@ public class KafkaIO {
       return true;
     }
 
+    /** A {@link PTransformOverride} for runners to override redistributed 
Kafka Read transforms. */
+    @Internal
+    public static final PTransformOverride KAFKA_REDISTRIBUTE_OVERRIDE =
+        PTransformOverride.of(
+            KafkaReadWithRedistributeOverride.matcher(),
+            new KafkaReadWithRedistributeOverride.Factory<>());
+
     /**
      * A {@link PTransformOverride} for runners to swap {@link 
ReadFromKafkaViaSDF} to legacy Kafka
      * read if runners doesn't have a good support on executing unbounded 
Splittable DoFn.
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverride.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadWithRedistributeOverride.java
similarity index 90%
rename from 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverride.java
rename to 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadWithRedistributeOverride.java
index 89f0eef9b8c..f8ebaaed56b 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverride.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadWithRedistributeOverride.java
@@ -15,11 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.dataflow;
+package org.apache.beam.sdk.io.kafka;
 
 import java.util.Map;
-import org.apache.beam.sdk.io.kafka.KafkaIO;
-import org.apache.beam.sdk.io.kafka.KafkaRecord;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.PTransformMatcher;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
@@ -48,8 +46,8 @@ public final class KafkaReadWithRedistributeOverride {
   }
 
   /**
-   * {@link PTransformOverrideFactory} for {@link KafkaIO.Read} that enables 
{@code
-   * withOffsetDeduplication} when {@code withRedistribute} is enabled.
+   * {@link PTransformOverrideFactory} for {@link 
org.apache.beam.sdk.io.kafka.KafkaIO.Read} that
+   * enables {@code withOffsetDeduplication} when {@code withRedistribute} is 
enabled.
    */
   static class Factory<K, V>
       implements PTransformOverrideFactory<
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverrideTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadWithRedistributeOverrideTest.java
similarity index 98%
rename from 
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverrideTest.java
rename to 
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadWithRedistributeOverrideTest.java
index 05e5dd6a55d..4301aa92ec8 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverrideTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadWithRedistributeOverrideTest.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.dataflow;
+package org.apache.beam.sdk.io.kafka;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.nullValue;
@@ -25,7 +25,6 @@ import static org.junit.Assert.assertTrue;
 import java.io.Serializable;
 import java.util.Collections;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.kafka.KafkaIO;
 import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.runners.TransformHierarchy.Node;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -129,5 +128,6 @@ public class KafkaReadWithRedistributeOverrideTest 
implements Serializable {
           }
         };
     p.traverseTopologically(visitor);
+    p.enableAbandonedNodeEnforcement(false);
   }
 }

Reply via email to