Use Batch Replacement in the Apex Runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d89e9d7d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d89e9d7d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d89e9d7d Branch: refs/heads/master Commit: d89e9d7d3a3ea952e6eb0784f717203460afe90f Parents: c81f83b Author: Thomas Groh <[email protected]> Authored: Thu Mar 30 15:55:28 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Fri Mar 31 14:08:55 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/runners/apex/ApexRunner.java | 40 ++++++++++---------- 1 file changed, 20 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d89e9d7d/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java index dfc8f63..d23fc14 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java @@ -22,7 +22,7 @@ import com.datatorrent.api.Context.DAGContext; import com.datatorrent.api.DAG; import com.datatorrent.api.StreamingApplication; import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableList; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -31,7 +31,6 @@ import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Properties; import java.util.concurrent.atomic.AtomicReference; import org.apache.apex.api.EmbeddedAppLauncher; @@ -48,9 +47,7 @@ import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsValidator; -import org.apache.beam.sdk.runners.PTransformMatcher; import org.apache.beam.sdk.runners.PTransformOverride; -import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Combine.GloballyAsSingletonView; @@ -96,27 +93,30 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { return new ApexRunner(apexPipelineOptions); } - private Map<PTransformMatcher, PTransformOverrideFactory> getOverrides() { - return ImmutableMap.<PTransformMatcher, PTransformOverrideFactory>builder() - .put(PTransformMatchers.classEqualTo(Create.Values.class), new PrimitiveCreate.Factory()) - .put( - PTransformMatchers.classEqualTo(View.AsSingleton.class), - new StreamingViewAsSingleton.Factory()) - .put( - PTransformMatchers.classEqualTo(View.AsIterable.class), - new StreamingViewAsIterable.Factory()) - .put( - PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class), - new StreamingCombineGloballyAsSingletonView.Factory()) + private List<PTransformOverride> getOverrides() { + return ImmutableList.<PTransformOverride>builder() + .add( + PTransformOverride.of( + PTransformMatchers.classEqualTo(Create.Values.class), + new PrimitiveCreate.Factory())) + .add( + PTransformOverride.of( + PTransformMatchers.classEqualTo(View.AsSingleton.class), + new StreamingViewAsSingleton.Factory())) + .add( + PTransformOverride.of( + PTransformMatchers.classEqualTo(View.AsIterable.class), + new StreamingViewAsIterable.Factory())) + .add( + PTransformOverride.of( + PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class), + new StreamingCombineGloballyAsSingletonView.Factory())) .build(); } @Override public ApexRunnerResult run(final Pipeline pipeline) { - for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override : - getOverrides().entrySet()) { - pipeline.replace(PTransformOverride.of(override.getKey(), override.getValue())); - } + pipeline.replaceAll(getOverrides()); final ApexPipelineTranslator translator = new ApexPipelineTranslator(options); final AtomicReference<DAG> apexDAG = new AtomicReference<>();
