This is an automated email from the ASF dual-hosted git repository.
heejong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 904a39e [BEAM-10670] Update Twister2Runner.runTest with the same
logic as Twister2Runner.run
new 06dbe4f Merge pull request #14864 from boyuanzz/fix
904a39e is described below
commit 904a39e45d071397e31e423740fcda596147fb5a
Author: Boyuan Zhang <[email protected]>
AuthorDate: Fri May 21 16:33:05 2021 -0700
[BEAM-10670] Update Twister2Runner.runTest with the same logic as
Twister2Runner.run
---
.../java/org/apache/beam/runners/twister2/Twister2Runner.java | 10 +++-------
1 file changed, 3 insertions(+), 7 deletions(-)
diff --git
a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2Runner.java
b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2Runner.java
index a6f0797..b3a02d3 100644
---
a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2Runner.java
+++
b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2Runner.java
@@ -151,15 +151,11 @@ public class Twister2Runner extends
PipelineRunner<PipelineResult> {
Twister2PipelineExecutionEnvironment env = new
Twister2PipelineExecutionEnvironment(options);
LOG.info("Translating pipeline to Twister2 program.");
pipeline.replaceAll(getDefaultOverrides());
+
// TODO(BEAM-10670): Use SDF read as default when we address performance
issue.
- if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(),
"use_sdf_read")) {
- // Populate experiments directly to have Kafka use legacy read.
- ExperimentalOptions.addExperiment(
- pipeline.getOptions().as(ExperimentalOptions.class),
"beam_fn_api_use_deprecated_read");
- ExperimentalOptions.addExperiment(
- pipeline.getOptions().as(ExperimentalOptions.class),
"use_deprecated_read");
+ if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(),
"beam_fn_api")) {
+
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
}
-
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
env.translate(pipeline);
setupSystemTest(options);