[ 
https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127818&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127818
 ]

ASF GitHub Bot logged work on BEAM-4658:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/Jul/18 15:58
            Start Date: 26/Jul/18 15:58
    Worklog Time Spent: 10m 
      Work Description: lukecwik commented on a change in pull request #6050: 
[BEAM-4658] Update pipeline representation in runner support libraries to 
handle timers.
URL: https://github.com/apache/beam/pull/6050#discussion_r205512015
 
 

 ##########
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
 ##########
 @@ -150,41 +154,51 @@ private static boolean canFuseParDo(
       // is never possible.
       return false;
     }
-    if (!pipeline.getSideInputs(parDo).isEmpty()) {
-      // At execution time, a Runner is required to only provide inputs to a 
PTransform that, at
-      // the time the PTransform processes them, the associated window is 
ready in all side inputs
-      // that the PTransform consumes. For an arbitrary stage, it is 
significantly complex for the
-      // runner to determine this for each input. As a result, we break fusion 
to simplify this
-      // inspection. In general, a ParDo which consumes side inputs cannot be 
fused into an
-      // executable stage alongside any transforms which are upstream of any 
of its side inputs.
-      return false;
-    } else {
-      try {
-        ParDoPayload payload = 
ParDoPayload.parseFrom(parDo.getTransform().getSpec().getPayload());
-        if (payload.getStateSpecsCount() > 0 || payload.getTimerSpecsCount() > 
0) {
-          // Inputs to a ParDo that uses State or Timers must be 
key-partitioned, and elements for
-          // a key must execute serially. To avoid checking if the rest of the 
stage is
-          // key-partitioned and preserves keys, these ParDos do not fuse into 
an existing stage.
-          return false;
-        }
-      } catch (InvalidProtocolBufferException e) {
-        throw new IllegalArgumentException(e);
+    try {
+      ParDoPayload payload = 
ParDoPayload.parseFrom(parDo.getTransform().getSpec().getPayload());
+      if (Maps.filterKeys(
+              parDo.getTransform().getInputsMap(), s -> 
payload.getTimerSpecsMap().containsKey(s))
+          .values()
+          .contains(candidate.getId())) {
+        // Allow fusion across timer PCollections because they are a self loop.
+        return true;
+      } else if (payload.getStateSpecsCount() > 0 || 
payload.getTimerSpecsCount() > 0) {
+        // Inputs to a ParDo that uses State or Timers must be 
key-partitioned, and elements for
+        // a key must execute serially. To avoid checking if the rest of the 
stage is
+        // key-partitioned and preserves keys, these ParDos do not fuse into 
an existing stage.
+        return false;
+      } else if (!pipeline.getSideInputs(parDo).isEmpty()) {
+        // At execution time, a Runner is required to only provide inputs to a 
PTransform that, at
+        // the time the PTransform processes them, the associated window is 
ready in all side inputs
+        // that the PTransform consumes. For an arbitrary stage, it is 
significantly complex for the
+        // runner to determine this for each input. As a result, we break 
fusion to simplify this
+        // inspection. In general, a ParDo which consumes side inputs cannot 
be fused into an
+        // executable stage alongside any transforms which are upstream of any 
of its side inputs.
+        return false;
       }
+    } catch (InvalidProtocolBufferException e) {
+      throw new IllegalArgumentException(e);
     }
     return true;
   }
 
   private static boolean parDoCompatibility(
       PTransformNode parDo, PTransformNode other, QueryablePipeline pipeline) {
-    // This is a convenience rather than a strict requirement. In general, a 
ParDo that consumes
-    // side inputs can be fused with other transforms in the same environment 
which are not
-    // upstream of any of the side inputs.
-    return pipeline.getSideInputs(parDo).isEmpty()
-        // Since we lack the ability to mark upstream transforms as key 
preserving, we
-        // purposefully break fusion here to provide runners the opportunity 
to insert a
-        // grouping operation
-        && pipeline.getUserStates(parDo).isEmpty()
-        && compatibleEnvironments(parDo, other, pipeline);
+    // Implicitly true if we are attempting to fuse against oneself. This is 
for timer PCollection which create a loop.
+    return parDo.equals(other)
+        // This is a convenience rather than a strict requirement. In general, 
a ParDo that consumes
+        // side inputs can be fused with other transforms in the same 
environment which are not
+        // upstream of any of the side inputs.
+        || (pipeline.getSideInputs(parDo).isEmpty()
+            // Since we lack the ability to mark upstream transforms as key 
preserving, we
+            // purposefully break fusion here to provide runners the 
opportunity to insert a
+            // grouping operation
+            && pipeline.getUserStates(parDo).isEmpty()
+            // Since we lack the ability to mark upstream transforms as key 
preserving, we
 
 Review comment:
   I wanted to make sure that user state and timers had fusion breaks for the 
same reason. I clarified the comment but still kept it "duplicated" to 
reiterate this point since it is important.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 127818)
    Time Spent: 1.5h  (was: 1h 20m)

> Update pipeline representation in runner support libraries to handle timers
> ---------------------------------------------------------------------------
>
>                 Key: BEAM-4658
>                 URL: https://issues.apache.org/jira/browse/BEAM-4658
>             Project: Beam
>          Issue Type: Sub-task
>          Components: runner-core
>            Reporter: Luke Cwik
>            Assignee: Luke Cwik
>            Priority: Major
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Expose relevant timer information on *ExecutableStage* to runners
> Update fusion logic to handle timers.
> Provide execution time interface to fire timers into *RemoteBundle*s and also 
> to receive new timers that are being set.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to