Repository: beam
Updated Branches:
  refs/heads/master 6dfd3a2b7 -> df1476d82


Add option for toProto/fromProto translations in DirectRunner, disabled by 
default.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/247b1334
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/247b1334
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/247b1334

Branch: refs/heads/master
Commit: 247b1334734bf70ebfb9354988fcb3840f41d55f
Parents: 6dfd3a2
Author: mingmxu <[email protected]>
Authored: Fri Sep 1 15:16:19 2017 -0700
Committer: Robert Bradshaw <[email protected]>
Committed: Wed Sep 6 21:50:01 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/runners/direct/DirectOptions.java |  8 ++++++++
 .../org/apache/beam/runners/direct/DirectRunner.java  | 14 +++++++++-----
 2 files changed, 17 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/247b1334/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
index 574ab46..af67306 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.runners.direct;
 
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
@@ -74,4 +76,10 @@ public interface DirectOptions extends PipelineOptions, 
ApplicationNameOptions {
       return Math.max(Runtime.getRuntime().availableProcessors(), 
MIN_PARALLELISM);
     }
   }
+
+  @Experimental(Kind.CORE_RUNNERS_ONLY)
+  @Default.Boolean(false)
+  @Description("Control whether toProto/fromProto translations are applied to 
original Pipeline")
+  boolean isProtoTranslation();
+  void setProtoTranslation(boolean b);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/247b1334/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 642ce8f..35d55b1 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -160,11 +160,15 @@ public class DirectRunner extends 
PipelineRunner<DirectPipelineResult> {
   @Override
   public DirectPipelineResult run(Pipeline originalPipeline) {
     Pipeline pipeline;
-    try {
-      pipeline = PipelineTranslation.fromProto(
-          PipelineTranslation.toProto(originalPipeline));
-    } catch (IOException exception) {
-      throw new RuntimeException("Error preparing pipeline for direct 
execution.", exception);
+    if (getPipelineOptions().isProtoTranslation()) {
+      try {
+        pipeline = PipelineTranslation.fromProto(
+            PipelineTranslation.toProto(originalPipeline));
+      } catch (IOException exception) {
+        throw new RuntimeException("Error preparing pipeline for direct 
execution.", exception);
+      }
+    } else {
+      pipeline = originalPipeline;
     }
     pipeline.replaceAll(defaultTransformOverrides());
     MetricsEnvironment.setMetricsSupported(true);

Reply via email to