[
https://issues.apache.org/jira/browse/BEAM-3914?focusedWorklogId=86872&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86872
]
ASF GitHub Bot logged work on BEAM-3914:
----------------------------------------
Author: ASF GitHub Bot
Created on: 03/Apr/18 00:06
Start Date: 03/Apr/18 00:06
Worklog Time Spent: 10m
Work Description: tgroh commented on a change in pull request #4977:
[BEAM-3914] Deduplicate Unzipped Flattens after Pipeline Fusion
URL: https://github.com/apache/beam/pull/4977#discussion_r178682914
##########
File path:
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/OutputDeduplicator.java
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction.graph;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
+import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+
+/**
+ * Utilities to insert synthetic {@link PCollectionNode PCollections} for
{@link PCollection
+ * PCollections} which are produced by multiple independently executable
stages.
+ */
+class OutputDeduplicator {
+ static DeduplicationResult ensureSingleProducer(
+ QueryablePipeline pipeline,
+ Collection<ExecutableStage> stages,
+ Collection<PTransformNode> unfusedTransforms) {
+ RunnerApi.Components.Builder unzippedComponents =
pipeline.getComponents().toBuilder();
+
+ Multimap<PCollectionNode, StageOrTransform> pcollectionProducers =
+ getProducers(pipeline, stages, unfusedTransforms);
+ Multimap<StageOrTransform, PCollectionNode> requiresNewOutput =
HashMultimap.create();
+ // Create a synthetic PCollection for each of these nodes. The transforms
in the runner
+ // portion of the graph that creates them should be replaced in the result
components. The
+ // ExecutableStage must also be rewritten to have updated outputs and
transforms.
+ for (Map.Entry<PCollectionNode, Collection<StageOrTransform>>
collectionProducer :
+ pcollectionProducers.asMap().entrySet()) {
+ if (collectionProducer.getValue().size() > 1) {
+ for (StageOrTransform producer : collectionProducer.getValue()) {
+ requiresNewOutput.put(producer, collectionProducer.getKey());
+ }
+ }
+ }
+
+ Map<ExecutableStage, ExecutableStage> updatedStages = new
LinkedHashMap<>();
+ Map<String, PTransformNode> updatedTransforms = new LinkedHashMap<>();
+ Multimap<String, PCollectionNode> originalToPartial =
HashMultimap.create();
+ for (Map.Entry<StageOrTransform, Collection<PCollectionNode>>
deduplicationTargets :
+ requiresNewOutput.asMap().entrySet()) {
+ if (deduplicationTargets.getKey().getStage() != null) {
+ StageDeduplication deduplication =
+ deduplicatePCollections(
+ deduplicationTargets.getKey().getStage(),
+ deduplicationTargets.getValue(),
+ unzippedComponents::containsPcollections);
+ for (Entry<String, PCollectionNode> originalToPartialReplacement :
+ deduplication.getOriginalToPartialPCollections().entrySet()) {
+ originalToPartial.put(
+ originalToPartialReplacement.getKey(),
originalToPartialReplacement.getValue());
+ unzippedComponents.putPcollections(
+ originalToPartialReplacement.getValue().getId(),
+ originalToPartialReplacement.getValue().getPCollection());
+ }
+ updatedStages.put(
+ deduplicationTargets.getKey().getStage(),
deduplication.getUpdatedStage());
+ } else if (deduplicationTargets.getKey().getTransform() != null) {
+ PTransformDeduplication deduplication =
+ deduplicatePCollections(
+ deduplicationTargets.getKey().getTransform(),
+ deduplicationTargets.getValue(),
+ unzippedComponents::containsPcollections);
+ for (Entry<String, PCollectionNode> originalToPartialReplacement :
+ deduplication.getOriginalToPartialPCollections().entrySet()) {
+ originalToPartial.put(
+ originalToPartialReplacement.getKey(),
originalToPartialReplacement.getValue());
+ unzippedComponents.putPcollections(
+ originalToPartialReplacement.getValue().getId(),
+ originalToPartialReplacement.getValue().getPCollection());
+ }
+ updatedTransforms.put(
+ deduplicationTargets.getKey().getTransform().getId(),
+ deduplication.getUpdatedTransform());
+ } else {
+ throw new IllegalStateException(
+ String.format(
+ "%s with no %s or %s",
+ StageOrTransform.class.getSimpleName(),
+ ExecutableStage.class.getSimpleName(),
+ PTransformNode.class.getSimpleName()));
+ }
+ }
+
+ Set<PTransformNode> introducedFlattens = new LinkedHashSet<>();
+ for (Map.Entry<String, Collection<PCollectionNode>> partialFlattenTargets :
+ originalToPartial.asMap().entrySet()) {
+ PTransform flattenSynthetics =
+ createSyntheticFlatten(partialFlattenTargets.getKey(),
partialFlattenTargets.getValue());
+ String flattenId =
+ SyntheticNodes.uniqueId("unzipped_flatten",
unzippedComponents::containsTransforms);
+ unzippedComponents.putTransforms(flattenId, flattenSynthetics);
+ introducedFlattens.add(PipelineNode.pTransform(flattenId,
flattenSynthetics));
+ }
+
+ Components components = unzippedComponents.build();
+ return DeduplicationResult.of(components, introducedFlattens,
updatedStages, updatedTransforms);
+ }
+
+ @AutoValue
+ abstract static class DeduplicationResult {
+ private static DeduplicationResult of(
+ RunnerApi.Components components,
+ Set<PTransformNode> introducedTransforms,
+ Map<ExecutableStage, ExecutableStage> stages,
+ Map<String, PTransformNode> unfused) {
+ return new AutoValue_OutputDeduplicator_DeduplicationResult(
+ components, introducedTransforms, stages, unfused);
+ }
+
+ abstract RunnerApi.Components getDeduplicatedComponents();
+
+ abstract Set<PTransformNode> getIntroducedTransforms();
+
+ abstract Map<ExecutableStage, ExecutableStage> getDeduplicatedStages();
+
+ abstract Map<String, PTransformNode> getDeduplicatedTransforms();
+ }
+
+ private static PTransform createSyntheticFlatten(
+ String outputId, Collection<PCollectionNode> generatedInputs) {
+ PTransform.Builder newFlattenBuilder = PTransform.newBuilder();
+ int i = 0;
+ for (PCollectionNode generatedInput : generatedInputs) {
+ String localInputId = String.format("input_%s", i);
+ i++;
+ newFlattenBuilder.putInputs(localInputId, generatedInput.getId());
+ }
+ // Flatten all of the new partial nodes together.
+ return newFlattenBuilder
+ .putOutputs("output", outputId)
+
.setSpec(FunctionSpec.newBuilder().setUrn(PTransformTranslation.FLATTEN_TRANSFORM_URN))
+ .build();
+ }
+
+ /**
+ * Returns the map from each {@link PCollectionNode} produced by any of the
{@link ExecutableStage
+ * stages} or {@link PTransformNode transforms} to all of the {@link
ExecutableStage stages} or
+ * {@link PTransformNode transforms} that produce it.
+ */
+ private static Multimap<PCollectionNode, StageOrTransform> getProducers(
+ QueryablePipeline pipeline,
+ Iterable<ExecutableStage> stages,
+ Iterable<PTransformNode> unfusedTransforms) {
+ Multimap<PCollectionNode, StageOrTransform> pcollectionProducers =
HashMultimap.create();
+ for (ExecutableStage stage : stages) {
+ for (PCollectionNode output : stage.getOutputPCollections()) {
+ pcollectionProducers.put(output, StageOrTransform.stage(stage));
+ }
+ }
+ for (PTransformNode unfused : unfusedTransforms) {
+ for (PCollectionNode output : pipeline.getOutputPCollections(unfused)) {
+ pcollectionProducers.put(output, StageOrTransform.transform(unfused));
+ }
+ }
+ return pcollectionProducers;
+ }
+
+ private static PTransformDeduplication deduplicatePCollections(
+ PTransformNode transform,
+ Collection<PCollectionNode> duplicates,
+ Predicate<String> existingPCollectionIds) {
+ Map<String, PCollectionNode> unzippedOutputs =
+ createSyntheticPCollections(duplicates, existingPCollectionIds);
+ PTransform pTransform = updateOutputs(transform.getTransform(),
unzippedOutputs);
+ return PTransformDeduplication.of(
+ PipelineNode.pTransform(transform.getId(), pTransform),
unzippedOutputs);
+ }
+
+ @AutoValue
+ abstract static class PTransformDeduplication {
+ public static PTransformDeduplication of(
+ PTransformNode updatedTransform, Map<String, PCollectionNode>
originalToPartial) {
+ return new AutoValue_OutputDeduplicator_PTransformDeduplication(
+ updatedTransform, originalToPartial);
+ }
+
+ abstract PTransformNode getUpdatedTransform();
+
+ abstract Map<String, PCollectionNode> getOriginalToPartialPCollections();
+ }
+
+ private static StageDeduplication deduplicatePCollections(
+ ExecutableStage stage,
+ Collection<PCollectionNode> duplicates,
+ Predicate<String> existingPCollectionIds) {
+ Map<String, PCollectionNode> unzippedOutputs =
+ createSyntheticPCollections(duplicates, existingPCollectionIds);
+ ExecutableStage updatedStage = deduplicateStageOutput(stage,
unzippedOutputs);
+ return StageDeduplication.of(updatedStage, unzippedOutputs);
+ }
+
+ @AutoValue
+ abstract static class StageDeduplication {
+ public static StageDeduplication of(
+ ExecutableStage updatedStage, Map<String, PCollectionNode>
originalToPartial) {
+ return new AutoValue_OutputDeduplicator_StageDeduplication(updatedStage,
originalToPartial);
+ }
+
+ abstract ExecutableStage getUpdatedStage();
+
+ abstract Map<String, PCollectionNode> getOriginalToPartialPCollections();
+ }
+
+ /**
+ * Returns a {@link Map} from the ID of a {@link PCollectionNode
PCollection} to a {@link
+ * PCollectionNode} that contains part of that {@link PCollectionNode
PCollection}.
+ */
+ private static Map<String, PCollectionNode> createSyntheticPCollections(
+ Collection<PCollectionNode> duplicates, Predicate<String>
existingPCollectionIds) {
+ Map<String, PCollectionNode> unzippedOutputs = new LinkedHashMap<>();
+ Predicate<String> existingOrNewIds =
+ existingPCollectionIds.or(
+ id ->
+
unzippedOutputs.values().stream().map(PCollectionNode::getId).anyMatch(id::equals));
+ for (PCollectionNode duplicateOutput : duplicates) {
+ String id = SyntheticNodes.uniqueId(duplicateOutput.getId(),
existingOrNewIds);
+ PCollection partial =
duplicateOutput.getPCollection().toBuilder().setUniqueName(id).build();
+ PCollectionNode former =
+ unzippedOutputs.put(duplicateOutput.getId(),
PipelineNode.pCollection(id, partial));
+ checkArgument(former == null, "a duplicate should only appear once per
stage");
+ }
+ return unzippedOutputs;
+ }
+
+ /**
+ * Returns an {@link ExecutableStage} where all of the {@link
PCollectionNode PCollections}
+ * matching the original are replaced with the synthetic {@link PCollection}
in all references
+ * made within the {@link ExecutableStage}.
+ */
+ private static ExecutableStage deduplicateStageOutput(
+ ExecutableStage stage, Map<String, PCollectionNode>
originalToSynthetics) {
Review comment:
Renamed where appropriate, which I think is everywhere.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 86872)
Time Spent: 1h (was: 50m)
> 'Unzip' flattens before performing fusion
> -----------------------------------------
>
> Key: BEAM-3914
> URL: https://issues.apache.org/jira/browse/BEAM-3914
> Project: Beam
> Issue Type: Improvement
> Components: runner-core
> Reporter: Thomas Groh
> Assignee: Thomas Groh
> Priority: Major
> Labels: portability
> Time Spent: 1h
> Remaining Estimate: 0h
>
> This consists of duplicating nodes downstream of a flatten that exist within
> an environment, and reintroducing the flatten immediately upstream of a
> runner-executed transform (the flatten should be executed within the runner)
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)