[BEAM-831] ParDo Fusion of Apex Runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6eab5c94 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6eab5c94 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6eab5c94 Branch: refs/heads/release-2.0.0 Commit: 6eab5c9465bda3da4d8a1ea9f73a74e9c8faec85 Parents: 3b7a623 Author: chinmaykolhatkar <[email protected]> Authored: Wed Mar 1 16:59:46 2017 +0530 Committer: Thomas Weise <[email protected]> Committed: Mon May 8 08:37:29 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/apex/ApexPipelineOptions.java | 5 + .../apex/translation/TranslationContext.java | 103 +++++++++++++++++-- 2 files changed, 97 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/6eab5c94/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java index f37e874..92f6e8f 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java @@ -62,4 +62,9 @@ public interface ApexPipelineOptions extends PipelineOptions, java.io.Serializab @Default.String("classpath:/beam-runners-apex.properties") String getConfigFile(); + @Description("configure whether to perform ParDo fusion") + void setParDoFusionEnabled(boolean enabled); + + @Default.Boolean(true) + boolean isParDoFusionEnabled(); } http://git-wip-us.apache.org/repos/asf/beam/blob/6eab5c94/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java index 9c20449..1224e25 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java @@ -37,6 +37,8 @@ import org.apache.beam.runners.apex.translation.utils.CoderAdapterStreamCodec; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; @@ -54,7 +56,8 @@ class TranslationContext { private final ApexPipelineOptions pipelineOptions; private AppliedPTransform<?, ?, ?> currentTransform; - private final Map<PCollection, Pair<OutputPort<?>, List<InputPort<?>>>> streams = new HashMap<>(); + private final Map<PCollection, Pair<OutputPortInfo, List<InputPortInfo>>> streams = + new HashMap<>(); private final Map<String, Operator> operators = new HashMap<>(); private final Map<PCollectionView<?>, PInput> viewInputs = new HashMap<>(); private Map<PInput, PInput> aliasCollections = new HashMap<>(); @@ -122,8 +125,10 @@ class TranslationContext { addOperator(operator, portEntry.getValue(), portEntry.getKey()); first = false; } else { - this.streams.put(portEntry.getKey(), (Pair) new ImmutablePair<>(portEntry.getValue(), - new ArrayList<>())); + this.streams.put(portEntry.getKey(), + (Pair) new ImmutablePair<>(new OutputPortInfo(portEntry.getValue(), + getCurrentTransform()), + new ArrayList<>())); } } } @@ -142,16 +147,19 @@ class TranslationContext { name = getCurrentTransform().getFullName() + i; } this.operators.put(name, operator); - this.streams.put(output, (Pair) new ImmutablePair<>(port, new ArrayList<>())); + this.streams.put(output, (Pair) new ImmutablePair<>( + new OutputPortInfo(port, getCurrentTransform()), + new ArrayList<>())); } public void addStream(PInput input, InputPort inputPort) { while (aliasCollections.containsKey(input)) { input = aliasCollections.get(input); } - Pair<OutputPort<?>, List<InputPort<?>>> stream = this.streams.get(input); - checkArgument(stream != null, "no upstream operator defined for %s", input); - stream.getRight().add(inputPort); + + Pair<OutputPortInfo, List<InputPortInfo>> stream = this.streams.get(input); + checkArgument(stream != null, "no upstream operator defined for " + input); + stream.getRight().add(new InputPortInfo(inputPort, getCurrentTransform())); } /** @@ -168,13 +176,23 @@ class TranslationContext { for (Map.Entry<String, Operator> nameAndOperator : this.operators.entrySet()) { dag.addOperator(nameAndOperator.getKey(), nameAndOperator.getValue()); } + int streamIndex = 0; - for (Map.Entry<PCollection, Pair<OutputPort<?>, List<InputPort<?>>>> streamEntry : this. + for (Map.Entry<PCollection, Pair<OutputPortInfo, List<InputPortInfo>>> streamEntry : this. streams.entrySet()) { - List<InputPort<?>> sinksList = streamEntry.getValue().getRight(); - InputPort[] sinks = sinksList.toArray(new InputPort[sinksList.size()]); + List<InputPortInfo> destInfo = streamEntry.getValue().getRight(); + InputPort[] sinks = new InputPort[destInfo.size()]; + for (int i = 0; i < sinks.length; i++) { + sinks[i] = destInfo.get(i).port; + } + if (sinks.length > 0) { - dag.addStream("stream" + streamIndex++, streamEntry.getValue().getLeft(), sinks); + DAG.StreamMeta streamMeta = dag.addStream("stream" + streamIndex++, + streamEntry.getValue().getLeft().port, sinks); + if (pipelineOptions.isParDoFusionEnabled()) { + optimizeStreams(streamMeta, streamEntry); + } + for (InputPort port : sinks) { PCollection pc = streamEntry.getKey(); Coder coder = pc.getCoder(); @@ -191,6 +209,49 @@ class TranslationContext { } } + private void optimizeStreams(DAG.StreamMeta streamMeta, + Map.Entry<PCollection, + Pair<OutputPortInfo, List<InputPortInfo>>> streamEntry) { + DAG.Locality loc = null; + + List<InputPortInfo> sinks = streamEntry.getValue().getRight(); + OutputPortInfo source = streamEntry.getValue().getLeft(); + PTransform sourceTransform = source.transform.getTransform(); + if (sourceTransform instanceof ParDo.Bound + || sourceTransform instanceof ParDo.BoundMulti) { + // Source is ParDo.. Check sink(s) + for (InputPortInfo sink : sinks) { + PTransform transform = sink.transform.getTransform(); + if (transform instanceof ParDo.Bound) { + ParDo.Bound t = (ParDo.Bound) transform; + if (t.getSideInputs().size() > 0) { + loc = DAG.Locality.CONTAINER_LOCAL; + break; + } else { + loc = DAG.Locality.THREAD_LOCAL; + } + } else if (transform instanceof ParDo.BoundMulti) { + ParDo.BoundMulti t = (ParDo.BoundMulti) transform; + if (t.getSideInputs().size() > 0) { + loc = DAG.Locality.CONTAINER_LOCAL; + break; + } else { + loc = DAG.Locality.THREAD_LOCAL; + } + } else { + // Sink is not ParDo.. set null locality. + loc = null; + break; + } + } + } else { + // Source is not ParDo... set null locality + loc = null; + } + + streamMeta.setLocality(loc); + } + /** * Return the state backend for the pipeline translation. * @return @@ -198,4 +259,24 @@ class TranslationContext { public ApexStateBackend getStateBackend() { return new ApexStateInternals.ApexStateBackend(); } + + static class InputPortInfo { + InputPort port; + AppliedPTransform transform; + + public InputPortInfo(InputPort port, AppliedPTransform transform) { + this.port = port; + this.transform = transform; + } + } + + static class OutputPortInfo { + OutputPort port; + AppliedPTransform transform; + + public OutputPortInfo(OutputPort port, AppliedPTransform transform) { + this.port = port; + this.transform = transform; + } + } }
