This is an automated email from the ASF dual-hosted git repository.

heejong pushed a commit to branch release-2.30.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.30.0 by this push:
     new ae1d4f0  [BEAM-10670] Update Twister2Runner.runTest with the same 
logic as Twister2Runner.run
     new f141fac  Merge pull request #14868 from boyuanzz/cherrypick
ae1d4f0 is described below

commit ae1d4f01a04c7534e00b94f6400e18c1388450d4
Author: Boyuan Zhang <[email protected]>
AuthorDate: Fri May 21 16:33:05 2021 -0700

    [BEAM-10670] Update Twister2Runner.runTest with the same logic as 
Twister2Runner.run
    
    (cherry picked from commit 904a39e45d071397e31e423740fcda596147fb5a)
---
 .../java/org/apache/beam/runners/twister2/Twister2Runner.java  | 10 +++-------
 1 file changed, 3 insertions(+), 7 deletions(-)

diff --git 
a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2Runner.java
 
b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2Runner.java
index a6f0797..b3a02d3 100644
--- 
a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2Runner.java
+++ 
b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2Runner.java
@@ -151,15 +151,11 @@ public class Twister2Runner extends 
PipelineRunner<PipelineResult> {
     Twister2PipelineExecutionEnvironment env = new 
Twister2PipelineExecutionEnvironment(options);
     LOG.info("Translating pipeline to Twister2 program.");
     pipeline.replaceAll(getDefaultOverrides());
+
     // TODO(BEAM-10670): Use SDF read as default when we address performance 
issue.
-    if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), 
"use_sdf_read")) {
-      // Populate experiments directly to have Kafka use legacy read.
-      ExperimentalOptions.addExperiment(
-          pipeline.getOptions().as(ExperimentalOptions.class), 
"beam_fn_api_use_deprecated_read");
-      ExperimentalOptions.addExperiment(
-          pipeline.getOptions().as(ExperimentalOptions.class), 
"use_deprecated_read");
+    if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), 
"beam_fn_api")) {
+      
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
     }
-    
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
 
     env.translate(pipeline);
     setupSystemTest(options);

Reply via email to