[ 
https://issues.apache.org/jira/browse/BEAM-3914?focusedWorklogId=95319&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95319
 ]

ASF GitHub Bot logged work on BEAM-3914:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/Apr/18 01:11
            Start Date: 26/Apr/18 01:11
    Worklog Time Spent: 10m 
      Work Description: robertwb commented on a change in pull request #4977: 
[BEAM-3914] Deduplicate Unzipped Flattens after Pipeline Fusion
URL: https://github.com/apache/beam/pull/4977#discussion_r184247459
 
 

 ##########
 File path: 
runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/OutputDeduplicatorTest.java
 ##########
 @@ -114,47 +125,66 @@ public void unchangedWithNoDuplicates() {
   public void duplicateOverStages() {
     /* When multiple stages and a runner-executed transform produce a 
PCollection, all should be
      * replaced with synthetic flattens.
-     * S -> A; T -> A becomes S -> A'; T -> A''; A', A'' -> Flatten -> A
+     * original graph:
+     *             --> one -> .out \
+     * red -> .out |                -> shared -> .out -> blue -> .out
+     *             --> two -> .out /
+     *
+     * fused graph:
+     *             --> [one -> .out -> shared ->] .out
+     * red -> .out |                                   (shared.out) -> blue -> 
.out
+     *             --> [two -> .out -> shared ->] .out
+     *
+     * deduplicated graph:
+     *             --> [one -> .out -> shared ->] .out:0 \
+     * red -> .out |                                      -> shared -> .out -> 
blue ->.out
+     *             --> [two -> .out -> shared ->] .out:1 /
      */
-    PTransform one =
-        PTransform.newBuilder().putInputs("in", "red.out").putOutputs("out", 
"one.out").build();
+    PCollection redOut = 
PCollection.newBuilder().setUniqueName("red.out").build();
+    PTransform red = PTransform.newBuilder().putOutputs("out", 
redOut.getUniqueName()).build();
     PCollection oneOut = 
PCollection.newBuilder().setUniqueName("one.out").build();
-    PTransform two =
-        PTransform.newBuilder().putInputs("in", "red.out").putOutputs("out", 
"two.out").build();
+    PTransform one =
+        PTransform.newBuilder()
+            .putInputs("in", redOut.getUniqueName())
+            .putOutputs("out", oneOut.getUniqueName())
+            .build();
     PCollection twoOut = 
PCollection.newBuilder().setUniqueName("two.out").build();
+    PTransform two =
+        PTransform.newBuilder()
+            .putInputs("in", redOut.getUniqueName())
+            .putOutputs("out", twoOut.getUniqueName())
+            .build();
     PCollection sharedOut = 
PCollection.newBuilder().setUniqueName("shared.out").build();
     PTransform shared =
         PTransform.newBuilder()
-            .putInputs("one", "one.out")
-            .putInputs("two", "two.out")
+            .putInputs("one", oneOut.getUniqueName())
+            .putInputs("two", twoOut.getUniqueName())
             .putOutputs("shared", sharedOut.getUniqueName())
             .build();
-    PTransform red = PTransform.newBuilder().putOutputs("out", 
"red.out").build();
-    PCollection redOut = 
PCollection.newBuilder().setUniqueName("red.out").build();
+    PCollection blueOut = 
PCollection.newBuilder().setUniqueName("blue.out").build();
     PTransform blue =
         PTransform.newBuilder()
             .putInputs("in", sharedOut.getUniqueName())
-            .putOutputs("out", "blue.out")
+            .putOutputs("out", blueOut.getUniqueName())
             .build();
-    PCollection blueOut = 
PCollection.newBuilder().setUniqueName("blue.out").build();
     RunnerApi.Components components =
         Components.newBuilder()
             .putTransforms("one", one)
-            .putPcollections("one.out", oneOut)
+            .putPcollections(oneOut.getUniqueName(), oneOut)
 
 Review comment:
   Feels like there should be a one-argument overload. (Or is there any reason 
for the names to not agree?)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 95319)
    Time Spent: 3h 40m  (was: 3.5h)

> 'Unzip' flattens before performing fusion
> -----------------------------------------
>
>                 Key: BEAM-3914
>                 URL: https://issues.apache.org/jira/browse/BEAM-3914
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-core
>            Reporter: Thomas Groh
>            Assignee: Thomas Groh
>            Priority: Major
>              Labels: portability
>          Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> This consists of duplicating nodes downstream of a flatten that exist within 
> an environment, and reintroducing the flatten immediately upstream of a 
> runner-executed transform (the flatten should be executed within the runner)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to