Repository: beam
Updated Branches:
  refs/heads/master eeb043299 -> 0e89df3f6


Use URNs, not Java classes, in immutability enforcements


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/311547aa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/311547aa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/311547aa

Branch: refs/heads/master
Commit: 311547aa561bb314a8fe743b6f4677a2eaaaca50
Parents: 9f904dc
Author: Kenneth Knowles <[email protected]>
Authored: Mon Jul 10 15:25:11 2017 -0700
Committer: Kenneth Knowles <[email protected]>
Committed: Mon Jul 10 15:31:40 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/DirectRunner.java       | 21 ++++++++------------
 .../direct/ExecutorServiceParallelExecutor.java | 16 ++++++---------
 2 files changed, 14 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/311547aa/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 7a221c4..4621224 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -38,14 +38,11 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineRunner;
-import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
@@ -72,16 +69,17 @@ public class DirectRunner extends 
PipelineRunner<DirectPipelineResult> {
     IMMUTABILITY {
       @Override
       public boolean appliesTo(PCollection<?> collection, DirectGraph graph) {
-        return 
CONTAINS_UDF.contains(graph.getProducer(collection).getTransform().getClass());
+        return CONTAINS_UDF.contains(
+            
PTransformTranslation.urnForTransform(graph.getProducer(collection).getTransform()));
       }
     };
 
     /**
      * The set of {@link PTransform PTransforms} that execute a UDF. Useful 
for some enforcements.
      */
-    private static final Set<Class<? extends PTransform>> CONTAINS_UDF =
+    private static final Set<String> CONTAINS_UDF =
         ImmutableSet.of(
-            Read.Bounded.class, Read.Unbounded.class, 
ParDo.SingleOutput.class, MultiOutput.class);
+            PTransformTranslation.READ_TRANSFORM_URN, 
PTransformTranslation.PAR_DO_TRANSFORM_URN);
 
     public abstract boolean appliesTo(PCollection<?> collection, DirectGraph 
graph);
 
@@ -110,22 +108,19 @@ public class DirectRunner extends 
PipelineRunner<DirectPipelineResult> {
       return bundleFactory;
     }
 
-    @SuppressWarnings("rawtypes")
-    private static Map<Class<? extends PTransform>, 
Collection<ModelEnforcementFactory>>
+    private static Map<String, Collection<ModelEnforcementFactory>>
         defaultModelEnforcements(Set<Enforcement> enabledEnforcements) {
-      ImmutableMap.Builder<Class<? extends PTransform>, 
Collection<ModelEnforcementFactory>>
-          enforcements = ImmutableMap.builder();
+      ImmutableMap.Builder<String, Collection<ModelEnforcementFactory>> 
enforcements =
+          ImmutableMap.builder();
       ImmutableList.Builder<ModelEnforcementFactory> enabledParDoEnforcements =
           ImmutableList.builder();
       if (enabledEnforcements.contains(Enforcement.IMMUTABILITY)) {
         enabledParDoEnforcements.add(ImmutabilityEnforcementFactory.create());
       }
       Collection<ModelEnforcementFactory> parDoEnforcements = 
enabledParDoEnforcements.build();
-      enforcements.put(ParDo.SingleOutput.class, parDoEnforcements);
-      enforcements.put(MultiOutput.class, parDoEnforcements);
+      enforcements.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, 
parDoEnforcements);
       return enforcements.build();
     }
-
   }
 
   
////////////////////////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/beam/blob/311547aa/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 2f4d1f6..75e2562 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -49,11 +49,11 @@ import javax.annotation.Nullable;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
@@ -77,9 +77,7 @@ final class ExecutorServiceParallelExecutor implements 
PipelineExecutor {
   private final DirectGraph graph;
   private final RootProviderRegistry rootProviderRegistry;
   private final TransformEvaluatorRegistry registry;
-  @SuppressWarnings("rawtypes")
-  private final Map<Class<? extends PTransform>, 
Collection<ModelEnforcementFactory>>
-      transformEnforcements;
+  private final Map<String, Collection<ModelEnforcementFactory>> 
transformEnforcements;
 
   private final EvaluationContext evaluationContext;
 
@@ -112,9 +110,7 @@ final class ExecutorServiceParallelExecutor implements 
PipelineExecutor {
       DirectGraph graph,
       RootProviderRegistry rootProviderRegistry,
       TransformEvaluatorRegistry registry,
-      @SuppressWarnings("rawtypes")
-          Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
-              transformEnforcements,
+      Map<String, Collection<ModelEnforcementFactory>> transformEnforcements,
       EvaluationContext context) {
     return new ExecutorServiceParallelExecutor(
         targetParallelism,
@@ -130,8 +126,7 @@ final class ExecutorServiceParallelExecutor implements 
PipelineExecutor {
       DirectGraph graph,
       RootProviderRegistry rootProviderRegistry,
       TransformEvaluatorRegistry registry,
-      @SuppressWarnings("rawtypes")
-      Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> 
transformEnforcements,
+      Map<String, Collection<ModelEnforcementFactory>> transformEnforcements,
       EvaluationContext context) {
     this.targetParallelism = targetParallelism;
     // Don't use Daemon threads for workers. The Pipeline should continue to 
execute even if there
@@ -237,7 +232,8 @@ final class ExecutorServiceParallelExecutor implements 
PipelineExecutor {
 
     Collection<ModelEnforcementFactory> enforcements =
         MoreObjects.firstNonNull(
-            transformEnforcements.get(transform.getTransform().getClass()),
+            transformEnforcements.get(
+                
PTransformTranslation.urnForTransform(transform.getTransform())),
             Collections.<ModelEnforcementFactory>emptyList());
 
     TransformExecutor<T> callable =

Reply via email to