[ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82364&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82364
 ]

ASF GitHub Bot logged work on BEAM-3565:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Mar/18 17:40
            Start Date: 20/Mar/18 17:40
    Worklog Time Spent: 10m 
      Work Description: tgroh commented on a change in pull request #4777: 
[BEAM-3565] Add FusedPipeline#toPipeline
URL: https://github.com/apache/beam/pull/4777#discussion_r175238617
 
 

 ##########
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
 ##########
 @@ -161,6 +179,40 @@ private static boolean isPrimitiveTransform(PTransform 
transform) {
     return network;
   }
 
+  // This enables a naive implementation of topological sort, instead of doing 
something clever.
+  // Nodes with lower weight precede nodes with higher weight, and are 
unrelated to nodes with
+  // equal weight
+  private final LoadingCache<PTransformNode, Long> nodeWeights =
+      CacheBuilder.newBuilder()
+          .build(
+              new CacheLoader<PTransformNode, Long>() {
+                @Override
+                public Long load(@Nonnull PTransformNode transformNode) {
+                  long parentWeight = 0L;
+                  for (String inputPCollectionId :
+                      transformNode.getTransform().getInputsMap().values()) {
+                    PTransformNode upstream =
+                        getProducer(
+                            PipelineNode.pCollection(
+                                inputPCollectionId,
+                                
components.getPcollectionsOrThrow(inputPCollectionId)));
+                    parentWeight += nodeWeights.getUnchecked(upstream);
+                  }
+                  return 1 + parentWeight;
+                }
+              });
+
+  public Iterable<PTransformNode> getTopologicallyOrderedTransforms() {
+    return pipelineNetwork
+        .nodes()
+        .stream()
+        .filter(node -> node instanceof PTransformNode)
+        .map(PTransformNode.class::cast)
+        .collect(
+            Collectors.toCollection(
+                () -> new 
TreeSet<>(Comparator.comparingLong(nodeWeights::getUnchecked))));
 
 Review comment:
   This comparator is really extremely inconsistent with equals, and will make 
it so there only looks like there's one root transform.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 82364)
    Time Spent: 14h  (was: 13h 50m)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> ------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-3565
>                 URL: https://issues.apache.org/jira/browse/BEAM-3565
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-core
>            Reporter: Thomas Groh
>            Assignee: Thomas Groh
>            Priority: Major
>              Labels: portability
>             Fix For: 2.4.0
>
>          Time Spent: 14h
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to