Apply ModelEnforcement in the InProcessPipelineRunner This ensures that user code does not violate the model.
Add a flag to control application of immutability enforcement. This flag is enabled by default. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/740242c3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/740242c3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/740242c3 Branch: refs/heads/master Commit: 740242c330ce99916ed76af7c6e68638d1e3c0e3 Parents: 363d4ec Author: Thomas Groh <[email protected]> Authored: Mon Apr 4 16:48:15 2016 -0700 Committer: Thomas Groh <[email protected]> Committed: Thu Apr 7 09:03:11 2016 -0700 ---------------------------------------------------------------------- .../inprocess/InProcessPipelineOptions.java | 9 +++++++ .../inprocess/InProcessPipelineRunner.java | 25 +++++++++++++++++--- 2 files changed, 31 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/740242c3/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java index ae5b49b..d44ea78 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java @@ -89,4 +89,13 @@ public interface InProcessPipelineOptions extends PipelineOptions, ApplicationNa boolean isBlockOnRun(); void setBlockOnRun(boolean b); + + @Default.Boolean(true) + @Description( + "Controls whether the runner should ensure that all of the elements of every " + + "PCollection are not mutated. PTransforms are not permitted to mutate input elements " + + "at any point, or output elements after they are output.") + boolean isTestImmutability(); + + void setTestImmutability(boolean test); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/740242c3/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java index 4fb01b7..f5b7f3c 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java @@ -33,6 +33,7 @@ import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import com.google.cloud.dataflow.sdk.transforms.Create; import com.google.cloud.dataflow.sdk.transforms.GroupByKey; import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.ParDo; import com.google.cloud.dataflow.sdk.transforms.View.CreatePCollectionView; import com.google.cloud.dataflow.sdk.util.InstanceBuilder; import com.google.cloud.dataflow.sdk.util.MapAggregatorValues; @@ -48,13 +49,13 @@ import com.google.cloud.dataflow.sdk.values.PInput; import com.google.cloud.dataflow.sdk.values.POutput; import com.google.cloud.dataflow.sdk.values.PValue; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.joda.time.Instant; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -269,11 +270,29 @@ public class InProcessPipelineRunner private Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> defaultModelEnforcements(InProcessPipelineOptions options) { - return Collections.emptyMap(); + ImmutableMap.Builder<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> + enforcements = ImmutableMap.builder(); + Collection<ModelEnforcementFactory> parDoEnforcements = createParDoEnforcements(options); + enforcements.put(ParDo.Bound.class, parDoEnforcements); + enforcements.put(ParDo.BoundMulti.class, parDoEnforcements); + return enforcements.build(); + } + + private Collection<ModelEnforcementFactory> createParDoEnforcements( + InProcessPipelineOptions options) { + ImmutableList.Builder<ModelEnforcementFactory> enforcements = ImmutableList.builder(); + if (options.isTestImmutability()) { + enforcements.add(ImmutabilityEnforcementFactory.create()); + } + return enforcements.build(); } private BundleFactory createBundleFactory(InProcessPipelineOptions pipelineOptions) { - return InProcessBundleFactory.create(); + BundleFactory bundleFactory = InProcessBundleFactory.create(); + if (pipelineOptions.isTestImmutability()) { + bundleFactory = ImmutabilityCheckingBundleFactory.create(bundleFactory); + } + return bundleFactory; } /**
