This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c097d84  Fix Kafka expansion issues introduced by PR14801
     new 2ad332b  Merge pull request #14849 from [BEAM-10670] Fix Kafka 
expansion issues introduced by PR14801
c097d84 is described below

commit c097d8424678813901ac46f2fe674cfe71e67430
Author: Boyuan Zhang <[email protected]>
AuthorDate: Thu May 20 11:29:42 2021 -0700

    Fix Kafka expansion issues introduced by PR14801
---
 .../org/apache/beam/runners/direct/DirectRunner.java   | 10 ----------
 .../org/apache/beam/runners/flink/FlinkRunner.java     |  7 -------
 .../org/apache/beam/runners/samza/SamzaRunner.java     | 12 ++++--------
 .../SparkStructuredStreamingRunner.java                | 13 ++++---------
 .../org/apache/beam/runners/spark/SparkRunner.java     |  9 ++-------
 .../apache/beam/runners/spark/SparkRunnerDebugger.java |  9 ++-------
 .../apache/beam/runners/twister2/Twister2Runner.java   |  9 ++-------
 .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java     | 18 +++++++++++++++++-
 8 files changed, 31 insertions(+), 56 deletions(-)

diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index b073840..4e7178b 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -39,7 +39,6 @@ import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
-import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -173,15 +172,6 @@ public class DirectRunner extends 
PipelineRunner<DirectPipelineResult> {
           "PipelineOptions specified failed to serialize to JSON.", e);
     }
 
-    // TODO(BEAM-10670): Remove additional experiments when we address 
performance issue.
-    if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), 
"use_sdf_read")) {
-      // Populate experiments directly to avoid direct-runner <-> kafka 
circular dependency.
-      ExperimentalOptions.addExperiment(
-          pipeline.getOptions().as(ExperimentalOptions.class), 
"beam_fn_api_use_deprecated_read");
-      ExperimentalOptions.addExperiment(
-          pipeline.getOptions().as(ExperimentalOptions.class), 
"use_deprecated_read");
-    }
-
     performRewrites(pipeline);
     MetricsEnvironment.setMetricsSupported(true);
     try {
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index 6d9e56b..1470a2c 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -78,13 +78,6 @@ public class FlinkRunner extends 
PipelineRunner<PipelineResult> {
     // Portable flink only support SDF as read.
     // TODO(BEAM-10670): Use SDF read as default when we address performance 
issue.
     if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), 
"beam_fn_api")) {
-      if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), 
"use_sdf_read")) {
-        // Populate experiments directly to have Kafka use legacy read.
-        ExperimentalOptions.addExperiment(
-            pipeline.getOptions().as(ExperimentalOptions.class), 
"beam_fn_api_use_deprecated_read");
-        ExperimentalOptions.addExperiment(
-            pipeline.getOptions().as(ExperimentalOptions.class), 
"use_deprecated_read");
-      }
       
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
     }
 
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
index dd53a49..42fbed0 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
@@ -112,15 +112,11 @@ public class SamzaRunner extends 
PipelineRunner<SamzaPipelineResult> {
 
   @Override
   public SamzaPipelineResult run(Pipeline pipeline) {
-    // TODO(BEAM-10670): Use SDF read as default when we address performance 
issue.
-    if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), 
"use_sdf_read")) {
-      // Populate experiments directly to have Kafka use legacy read.
-      ExperimentalOptions.addExperiment(
-          pipeline.getOptions().as(ExperimentalOptions.class), 
"beam_fn_api_use_deprecated_read");
-      ExperimentalOptions.addExperiment(
-          pipeline.getOptions().as(ExperimentalOptions.class), 
"use_deprecated_read");
+    // TODO(BEAM-10670): Use SDF read as default for non-portable execution 
when we address
+    // performance issue.
+    if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), 
"beam_fn_api")) {
+      
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
     }
-    
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
 
     MetricsEnvironment.setMetricsSupported(true);
 
diff --git 
a/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java
 
b/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java
index 46e17b0..5ad08b9 100644
--- 
a/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java
+++ 
b/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java
@@ -171,16 +171,11 @@ public final class SparkStructuredStreamingRunner
   private TranslationContext translatePipeline(Pipeline pipeline) {
     PipelineTranslator.detectTranslationMode(pipeline, options);
 
-    // Default to using the primitive versions of Read.Bounded and 
Read.Unbounded.
+    // Default to using the primitive versions of Read.Bounded and 
Read.Unbounded for non-portable
+    // execution.
     // TODO(BEAM-10670): Use SDF read as default when we address performance 
issue.
-    if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), 
"use_sdf_read")) {
-      // Populate experiments directly to have Kafka use legacy read.
-      ExperimentalOptions.addExperiment(
-          pipeline.getOptions().as(ExperimentalOptions.class), 
"beam_fn_api_use_deprecated_read");
-      ExperimentalOptions.addExperiment(
-          pipeline.getOptions().as(ExperimentalOptions.class), 
"use_deprecated_read");
-
-      
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(pipeline);
+    if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), 
"beam_fn_api")) {
+      
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
     }
 
     PipelineTranslator.replaceTransforms(pipeline, options);
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index ccefc7b..747a926 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -161,13 +161,8 @@ public final class SparkRunner extends 
PipelineRunner<SparkPipelineResult> {
 
     // Default to using the primitive versions of Read.Bounded and 
Read.Unbounded.
     // TODO(BEAM-10670): Use SDF read as default when we address performance 
issue.
-    if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), 
"use_sdf_read")) {
-      // Populate experiments directly to have Kafka use legacy read.
-      ExperimentalOptions.addExperiment(
-          pipeline.getOptions().as(ExperimentalOptions.class), 
"beam_fn_api_use_deprecated_read");
-      ExperimentalOptions.addExperiment(
-          pipeline.getOptions().as(ExperimentalOptions.class), 
"use_deprecated_read");
-      
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(pipeline);
+    if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), 
"beam_fn_api")) {
+      
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
     }
 
     
pipeline.replaceAll(SparkTransformOverrides.getDefaultOverrides(pipelineOptions.isStreaming()));
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
index 4484803..65f83f5 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
@@ -82,13 +82,8 @@ public final class SparkRunnerDebugger extends 
PipelineRunner<SparkPipelineResul
 
     // Default to using the primitive versions of Read.Bounded and 
Read.Unbounded.
     // TODO(BEAM-10670): Use SDF read as default when we address performance 
issue.
-    if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), 
"use_sdf_read")) {
-      // Populate experiments directly to have Kafka use legacy read.
-      ExperimentalOptions.addExperiment(
-          pipeline.getOptions().as(ExperimentalOptions.class), 
"beam_fn_api_use_deprecated_read");
-      ExperimentalOptions.addExperiment(
-          pipeline.getOptions().as(ExperimentalOptions.class), 
"use_deprecated_read");
-      
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(pipeline);
+    if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), 
"beam_fn_api")) {
+      
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
     }
 
     JavaSparkContext jsc = new JavaSparkContext("local[1]", "Debug_Pipeline");
diff --git 
a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2Runner.java
 
b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2Runner.java
index dec729c..a6f0797 100644
--- 
a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2Runner.java
+++ 
b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2Runner.java
@@ -92,14 +92,9 @@ public class Twister2Runner extends 
PipelineRunner<PipelineResult> {
     pipeline.replaceAll(getDefaultOverrides());
 
     // TODO(BEAM-10670): Use SDF read as default when we address performance 
issue.
-    if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), 
"use_sdf_read")) {
-      // Populate experiments directly to have Kafka use legacy read.
-      ExperimentalOptions.addExperiment(
-          pipeline.getOptions().as(ExperimentalOptions.class), 
"beam_fn_api_use_deprecated_read");
-      ExperimentalOptions.addExperiment(
-          pipeline.getOptions().as(ExperimentalOptions.class), 
"use_deprecated_read");
+    if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), 
"beam_fn_api")) {
+      
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
     }
-    
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
 
     env.translate(pipeline);
     setupSystem(options);
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 499a1f8..bf15093 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -1228,12 +1228,28 @@ public class KafkaIO {
           || ExperimentalOptions.hasExperiment(
               input.getPipeline().getOptions(), "use_deprecated_read")
           || getMaxNumRecords() < Long.MAX_VALUE
-          || getMaxReadTime() != null) {
+          || getMaxReadTime() != null
+          || runnerRequiresLegacyRead(input.getPipeline().getOptions())) {
         return input.apply(new ReadFromKafkaViaUnbounded<>(this, keyCoder, 
valueCoder));
       }
       return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder, 
valueCoder));
     }
 
+    private boolean runnerRequiresLegacyRead(PipelineOptions options) {
+      // Only Dataflow runner requires sdf read at this moment. For other 
non-portable runners, if
+      // it doesn't specify use_sdf_read, it will use legacy read regarding to 
performance concern.
+      // TODO(BEAM-10670): Remove this special check when we address 
performance issue.
+      if (ExperimentalOptions.hasExperiment(options, "use_sdf_read")) {
+        return false;
+      }
+      if 
(options.getRunner().getName().startsWith("org.apache.beam.runners.dataflow.")) 
{
+        return false;
+      } else if (ExperimentalOptions.hasExperiment(options, "beam_fn_api")) {
+        return false;
+      }
+      return true;
+    }
+
     /**
      * A {@link PTransformOverride} for runners to swap {@link 
ReadFromKafkaViaSDF} to legacy Kafka
      * read if runners doesn't have a good support on executing unbounded 
Splittable DoFn.

Reply via email to