Repository: beam Updated Branches: refs/heads/master 6dfd3a2b7 -> df1476d82
Add option for toProto/fromProto translations in DirectRunner, disabled by default. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/247b1334 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/247b1334 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/247b1334 Branch: refs/heads/master Commit: 247b1334734bf70ebfb9354988fcb3840f41d55f Parents: 6dfd3a2 Author: mingmxu <[email protected]> Authored: Fri Sep 1 15:16:19 2017 -0700 Committer: Robert Bradshaw <[email protected]> Committed: Wed Sep 6 21:50:01 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/runners/direct/DirectOptions.java | 8 ++++++++ .../org/apache/beam/runners/direct/DirectRunner.java | 14 +++++++++----- 2 files changed, 17 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/247b1334/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java index 574ab46..af67306 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.direct; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; @@ -74,4 +76,10 @@ public interface DirectOptions extends PipelineOptions, ApplicationNameOptions { return Math.max(Runtime.getRuntime().availableProcessors(), MIN_PARALLELISM); } } + + @Experimental(Kind.CORE_RUNNERS_ONLY) + @Default.Boolean(false) + @Description("Control whether toProto/fromProto translations are applied to original Pipeline") + boolean isProtoTranslation(); + void setProtoTranslation(boolean b); } http://git-wip-us.apache.org/repos/asf/beam/blob/247b1334/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 642ce8f..35d55b1 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -160,11 +160,15 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> { @Override public DirectPipelineResult run(Pipeline originalPipeline) { Pipeline pipeline; - try { - pipeline = PipelineTranslation.fromProto( - PipelineTranslation.toProto(originalPipeline)); - } catch (IOException exception) { - throw new RuntimeException("Error preparing pipeline for direct execution.", exception); + if (getPipelineOptions().isProtoTranslation()) { + try { + pipeline = PipelineTranslation.fromProto( + PipelineTranslation.toProto(originalPipeline)); + } catch (IOException exception) { + throw new RuntimeException("Error preparing pipeline for direct execution.", exception); + } + } else { + pipeline = originalPipeline; } pipeline.replaceAll(defaultTransformOverrides()); MetricsEnvironment.setMetricsSupported(true);
