Repository: beam Updated Branches: refs/heads/master eeb043299 -> 0e89df3f6
Use URNs, not Java classes, in immutability enforcements Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/311547aa Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/311547aa Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/311547aa Branch: refs/heads/master Commit: 311547aa561bb314a8fe743b6f4677a2eaaaca50 Parents: 9f904dc Author: Kenneth Knowles <[email protected]> Authored: Mon Jul 10 15:25:11 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Mon Jul 10 15:31:40 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/direct/DirectRunner.java | 21 ++++++++------------ .../direct/ExecutorServiceParallelExecutor.java | 16 ++++++--------- 2 files changed, 14 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/311547aa/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 7a221c4..4621224 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -38,14 +38,11 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline.PipelineExecutionException; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.PipelineRunner; -import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.PTransformOverride; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.ParDo.MultiOutput; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.values.PCollection; import org.joda.time.Duration; @@ -72,16 +69,17 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> { IMMUTABILITY { @Override public boolean appliesTo(PCollection<?> collection, DirectGraph graph) { - return CONTAINS_UDF.contains(graph.getProducer(collection).getTransform().getClass()); + return CONTAINS_UDF.contains( + PTransformTranslation.urnForTransform(graph.getProducer(collection).getTransform())); } }; /** * The set of {@link PTransform PTransforms} that execute a UDF. Useful for some enforcements. */ - private static final Set<Class<? extends PTransform>> CONTAINS_UDF = + private static final Set<String> CONTAINS_UDF = ImmutableSet.of( - Read.Bounded.class, Read.Unbounded.class, ParDo.SingleOutput.class, MultiOutput.class); + PTransformTranslation.READ_TRANSFORM_URN, PTransformTranslation.PAR_DO_TRANSFORM_URN); public abstract boolean appliesTo(PCollection<?> collection, DirectGraph graph); @@ -110,22 +108,19 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> { return bundleFactory; } - @SuppressWarnings("rawtypes") - private static Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> + private static Map<String, Collection<ModelEnforcementFactory>> defaultModelEnforcements(Set<Enforcement> enabledEnforcements) { - ImmutableMap.Builder<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> - enforcements = ImmutableMap.builder(); + ImmutableMap.Builder<String, Collection<ModelEnforcementFactory>> enforcements = + ImmutableMap.builder(); ImmutableList.Builder<ModelEnforcementFactory> enabledParDoEnforcements = ImmutableList.builder(); if (enabledEnforcements.contains(Enforcement.IMMUTABILITY)) { enabledParDoEnforcements.add(ImmutabilityEnforcementFactory.create()); } Collection<ModelEnforcementFactory> parDoEnforcements = enabledParDoEnforcements.build(); - enforcements.put(ParDo.SingleOutput.class, parDoEnforcements); - enforcements.put(MultiOutput.class, parDoEnforcements); + enforcements.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, parDoEnforcements); return enforcements.build(); } - } //////////////////////////////////////////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/beam/blob/311547aa/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index 2f4d1f6..75e2562 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -49,11 +49,11 @@ import javax.annotation.Nullable; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItems; import org.apache.beam.runners.core.TimerInternals.TimerData; +import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.runners.direct.WatermarkManager.FiredTimers; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.runners.AppliedPTransform; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; @@ -77,9 +77,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { private final DirectGraph graph; private final RootProviderRegistry rootProviderRegistry; private final TransformEvaluatorRegistry registry; - @SuppressWarnings("rawtypes") - private final Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> - transformEnforcements; + private final Map<String, Collection<ModelEnforcementFactory>> transformEnforcements; private final EvaluationContext evaluationContext; @@ -112,9 +110,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { DirectGraph graph, RootProviderRegistry rootProviderRegistry, TransformEvaluatorRegistry registry, - @SuppressWarnings("rawtypes") - Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> - transformEnforcements, + Map<String, Collection<ModelEnforcementFactory>> transformEnforcements, EvaluationContext context) { return new ExecutorServiceParallelExecutor( targetParallelism, @@ -130,8 +126,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { DirectGraph graph, RootProviderRegistry rootProviderRegistry, TransformEvaluatorRegistry registry, - @SuppressWarnings("rawtypes") - Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> transformEnforcements, + Map<String, Collection<ModelEnforcementFactory>> transformEnforcements, EvaluationContext context) { this.targetParallelism = targetParallelism; // Don't use Daemon threads for workers. The Pipeline should continue to execute even if there @@ -237,7 +232,8 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { Collection<ModelEnforcementFactory> enforcements = MoreObjects.firstNonNull( - transformEnforcements.get(transform.getTransform().getClass()), + transformEnforcements.get( + PTransformTranslation.urnForTransform(transform.getTransform())), Collections.<ModelEnforcementFactory>emptyList()); TransformExecutor<T> callable =
