This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c097d84 Fix Kafka expansion issues introduced by PR14801
new 2ad332b Merge pull request #14849 from [BEAM-10670] Fix Kafka
expansion issues introduced by PR14801
c097d84 is described below
commit c097d8424678813901ac46f2fe674cfe71e67430
Author: Boyuan Zhang <[email protected]>
AuthorDate: Thu May 20 11:29:42 2021 -0700
Fix Kafka expansion issues introduced by PR14801
---
.../org/apache/beam/runners/direct/DirectRunner.java | 10 ----------
.../org/apache/beam/runners/flink/FlinkRunner.java | 7 -------
.../org/apache/beam/runners/samza/SamzaRunner.java | 12 ++++--------
.../SparkStructuredStreamingRunner.java | 13 ++++---------
.../org/apache/beam/runners/spark/SparkRunner.java | 9 ++-------
.../apache/beam/runners/spark/SparkRunnerDebugger.java | 9 ++-------
.../apache/beam/runners/twister2/Twister2Runner.java | 9 ++-------
.../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 18 +++++++++++++++++-
8 files changed, 31 insertions(+), 56 deletions(-)
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index b073840..4e7178b 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -39,7 +39,6 @@ import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
-import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.transforms.PTransform;
@@ -173,15 +172,6 @@ public class DirectRunner extends
PipelineRunner<DirectPipelineResult> {
"PipelineOptions specified failed to serialize to JSON.", e);
}
- // TODO(BEAM-10670): Remove additional experiments when we address
performance issue.
- if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(),
"use_sdf_read")) {
- // Populate experiments directly to avoid direct-runner <-> kafka
circular dependency.
- ExperimentalOptions.addExperiment(
- pipeline.getOptions().as(ExperimentalOptions.class),
"beam_fn_api_use_deprecated_read");
- ExperimentalOptions.addExperiment(
- pipeline.getOptions().as(ExperimentalOptions.class),
"use_deprecated_read");
- }
-
performRewrites(pipeline);
MetricsEnvironment.setMetricsSupported(true);
try {
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index 6d9e56b..1470a2c 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -78,13 +78,6 @@ public class FlinkRunner extends
PipelineRunner<PipelineResult> {
// Portable flink only support SDF as read.
// TODO(BEAM-10670): Use SDF read as default when we address performance
issue.
if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(),
"beam_fn_api")) {
- if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(),
"use_sdf_read")) {
- // Populate experiments directly to have Kafka use legacy read.
- ExperimentalOptions.addExperiment(
- pipeline.getOptions().as(ExperimentalOptions.class),
"beam_fn_api_use_deprecated_read");
- ExperimentalOptions.addExperiment(
- pipeline.getOptions().as(ExperimentalOptions.class),
"use_deprecated_read");
- }
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
}
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
index dd53a49..42fbed0 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
@@ -112,15 +112,11 @@ public class SamzaRunner extends
PipelineRunner<SamzaPipelineResult> {
@Override
public SamzaPipelineResult run(Pipeline pipeline) {
- // TODO(BEAM-10670): Use SDF read as default when we address performance
issue.
- if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(),
"use_sdf_read")) {
- // Populate experiments directly to have Kafka use legacy read.
- ExperimentalOptions.addExperiment(
- pipeline.getOptions().as(ExperimentalOptions.class),
"beam_fn_api_use_deprecated_read");
- ExperimentalOptions.addExperiment(
- pipeline.getOptions().as(ExperimentalOptions.class),
"use_deprecated_read");
+ // TODO(BEAM-10670): Use SDF read as default for non-portable execution
when we address
+ // performance issue.
+ if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(),
"beam_fn_api")) {
+
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
}
-
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
MetricsEnvironment.setMetricsSupported(true);
diff --git
a/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java
b/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java
index 46e17b0..5ad08b9 100644
---
a/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java
+++
b/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java
@@ -171,16 +171,11 @@ public final class SparkStructuredStreamingRunner
private TranslationContext translatePipeline(Pipeline pipeline) {
PipelineTranslator.detectTranslationMode(pipeline, options);
- // Default to using the primitive versions of Read.Bounded and
Read.Unbounded.
+ // Default to using the primitive versions of Read.Bounded and
Read.Unbounded for non-portable
+ // execution.
// TODO(BEAM-10670): Use SDF read as default when we address performance
issue.
- if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(),
"use_sdf_read")) {
- // Populate experiments directly to have Kafka use legacy read.
- ExperimentalOptions.addExperiment(
- pipeline.getOptions().as(ExperimentalOptions.class),
"beam_fn_api_use_deprecated_read");
- ExperimentalOptions.addExperiment(
- pipeline.getOptions().as(ExperimentalOptions.class),
"use_deprecated_read");
-
-
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(pipeline);
+ if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(),
"beam_fn_api")) {
+
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
}
PipelineTranslator.replaceTransforms(pipeline, options);
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index ccefc7b..747a926 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -161,13 +161,8 @@ public final class SparkRunner extends
PipelineRunner<SparkPipelineResult> {
// Default to using the primitive versions of Read.Bounded and
Read.Unbounded.
// TODO(BEAM-10670): Use SDF read as default when we address performance
issue.
- if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(),
"use_sdf_read")) {
- // Populate experiments directly to have Kafka use legacy read.
- ExperimentalOptions.addExperiment(
- pipeline.getOptions().as(ExperimentalOptions.class),
"beam_fn_api_use_deprecated_read");
- ExperimentalOptions.addExperiment(
- pipeline.getOptions().as(ExperimentalOptions.class),
"use_deprecated_read");
-
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(pipeline);
+ if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(),
"beam_fn_api")) {
+
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
}
pipeline.replaceAll(SparkTransformOverrides.getDefaultOverrides(pipelineOptions.isStreaming()));
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
index 4484803..65f83f5 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
@@ -82,13 +82,8 @@ public final class SparkRunnerDebugger extends
PipelineRunner<SparkPipelineResul
// Default to using the primitive versions of Read.Bounded and
Read.Unbounded.
// TODO(BEAM-10670): Use SDF read as default when we address performance
issue.
- if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(),
"use_sdf_read")) {
- // Populate experiments directly to have Kafka use legacy read.
- ExperimentalOptions.addExperiment(
- pipeline.getOptions().as(ExperimentalOptions.class),
"beam_fn_api_use_deprecated_read");
- ExperimentalOptions.addExperiment(
- pipeline.getOptions().as(ExperimentalOptions.class),
"use_deprecated_read");
-
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(pipeline);
+ if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(),
"beam_fn_api")) {
+
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
}
JavaSparkContext jsc = new JavaSparkContext("local[1]", "Debug_Pipeline");
diff --git
a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2Runner.java
b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2Runner.java
index dec729c..a6f0797 100644
---
a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2Runner.java
+++
b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2Runner.java
@@ -92,14 +92,9 @@ public class Twister2Runner extends
PipelineRunner<PipelineResult> {
pipeline.replaceAll(getDefaultOverrides());
// TODO(BEAM-10670): Use SDF read as default when we address performance
issue.
- if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(),
"use_sdf_read")) {
- // Populate experiments directly to have Kafka use legacy read.
- ExperimentalOptions.addExperiment(
- pipeline.getOptions().as(ExperimentalOptions.class),
"beam_fn_api_use_deprecated_read");
- ExperimentalOptions.addExperiment(
- pipeline.getOptions().as(ExperimentalOptions.class),
"use_deprecated_read");
+ if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(),
"beam_fn_api")) {
+
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
}
-
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
env.translate(pipeline);
setupSystem(options);
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 499a1f8..bf15093 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -1228,12 +1228,28 @@ public class KafkaIO {
|| ExperimentalOptions.hasExperiment(
input.getPipeline().getOptions(), "use_deprecated_read")
|| getMaxNumRecords() < Long.MAX_VALUE
- || getMaxReadTime() != null) {
+ || getMaxReadTime() != null
+ || runnerRequiresLegacyRead(input.getPipeline().getOptions())) {
return input.apply(new ReadFromKafkaViaUnbounded<>(this, keyCoder,
valueCoder));
}
return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder,
valueCoder));
}
+ private boolean runnerRequiresLegacyRead(PipelineOptions options) {
+ // Only Dataflow runner requires sdf read at this moment. For other
non-portable runners, if
+ // it doesn't specify use_sdf_read, it will use legacy read regarding to
performance concern.
+ // TODO(BEAM-10670): Remove this special check when we address
performance issue.
+ if (ExperimentalOptions.hasExperiment(options, "use_sdf_read")) {
+ return false;
+ }
+ if
(options.getRunner().getName().startsWith("org.apache.beam.runners.dataflow."))
{
+ return false;
+ } else if (ExperimentalOptions.hasExperiment(options, "beam_fn_api")) {
+ return false;
+ }
+ return true;
+ }
+
/**
* A {@link PTransformOverride} for runners to swap {@link
ReadFromKafkaViaSDF} to legacy Kafka
* read if runners doesn't have a good support on executing unbounded
Splittable DoFn.