okumin commented on code in PR #3998:
URL: https://github.com/apache/hive/pull/3998#discussion_r1978644044


##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/graph/OperatorGraph.java:
##########
@@ -81,183 +76,202 @@ public EdgeType getEdgeType() {
 
   }
 
-  public static interface OperatorEdgePredicate {
+  public interface OperatorEdgePredicate {
 
     boolean accept(Operator<?> s, Operator<?> t, OpEdge opEdge);
 
   }
 
-  Map<Operator<?>, Cluster> nodeCluster = new HashMap<>();
-
   public class Cluster {
 
-    Set<Operator<?>> members = new LinkedHashSet<>();
+    private final Set<Operator<?>> members = new HashSet<>();
 
-    protected void merge(Cluster o) {
-      if (o == this) {
-        return;
-      }
-      for (Operator<?> node : o.members) {
-        add(node);
-      }
-      o.members.clear();
+    public void add(Operator<?> operator) {
+      members.add(operator);
     }
 
-    protected void add(Operator<?> curr) {
-      nodeCluster.put(curr, this);
-      members.add(curr);
+    public void merge(Cluster cluster) {
+      members.addAll(cluster.getMembers());
     }
 
     public Set<Cluster> parentClusters(OperatorEdgePredicate traverseEdge) {
       Set<Cluster> ret = new HashSet<Cluster>();
       for (Operator<?> operator : members) {
-        for (Operator<? extends OperatorDesc> p : 
operator.getParentOperators()) {
-          if (members.contains(p)) {
-            continue;
-          }
-          Optional<OpEdge> e = g.getEdge(p, operator);
-          if (traverseEdge.accept(p, operator, e.get())) {
-            ret.add(nodeCluster.get(p));
+        Stream<Operator<?>> foreignParentOperators =
+            dagGraph.predecessors(operator).stream()
+                .filter(pOp -> !members.contains(pOp))
+                .filter(pOp -> traverseEdge.accept(pOp, operator, 
dagGraph.getEdge(pOp, operator).get()));
+        foreignParentOperators.forEach(parentOperator -> {
+          for (Cluster parentCluster: operatorToCluster.get(parentOperator)) {
+            if (!parentCluster.getMembers().contains(operator)) {
+              ret.add(parentCluster);
+            }
           }
-        }
+        });
       }
       return ret;
     }
 
     public Set<Cluster> childClusters(OperatorEdgePredicate traverseEdge) {
       Set<Cluster> ret = new HashSet<Cluster>();
       for (Operator<?> operator : members) {
-        for (Operator<? extends OperatorDesc> p : 
operator.getChildOperators()) {
-          if (members.contains(p)) {
-            continue;
-          }
-          Optional<OpEdge> e = g.getEdge(operator, p);
-          if (traverseEdge.accept(operator, p, e.get())) {
-            ret.add(nodeCluster.get(p));
+        Stream<Operator<?>> foreignChildOperators =
+            dagGraph.successors(operator).stream()
+                .filter(cOp -> !members.contains(cOp))
+                .filter(cOp -> traverseEdge.accept(operator, cOp, 
dagGraph.getEdge(operator, cOp).get()));
+        foreignChildOperators.forEach(childOperator -> {
+          for (Cluster childCluster: operatorToCluster.get(childOperator)) {
+            if (!childCluster.getMembers().contains(operator)) {
+              ret.add(childCluster);
+            }
           }
-        }
+        });
       }
       return ret;
     }
 
     public Set<Operator<?>> getMembers() {
       return Collections.unmodifiableSet(members);
     }
+
   }
 
+  private Cluster createCluster(Operator<?> rootOperator) {
+    Cluster cluster = new Cluster();
+    Queue<Operator<?>> remainingOperators = new LinkedList<>();
+    remainingOperators.add(rootOperator);
 
-  public OperatorGraph(ParseContext pctx) {
-    g = new DagGraph<Operator<?>, OperatorGraph.OpEdge>();
-    Set<Operator<?>> visited = Sets.newIdentityHashSet();
-    Set<Operator<?>> seen = Sets.newIdentityHashSet();
-
-    seen.addAll(pctx.getTopOps().values());
-    while (!seen.isEmpty()) {
-      Operator<?> curr = seen.iterator().next();
-      seen.remove(curr);
-      if (visited.contains(curr)) {
-        continue;
+    while (!remainingOperators.isEmpty()) {
+      Operator<?> currentOperator = remainingOperators.poll();
+      if (!cluster.getMembers().contains(currentOperator)) {
+        cluster.add(currentOperator);
+
+        // TODO: write about DummyStoreOperator and FileSinkOperator
+        if (!(currentOperator instanceof ReduceSinkOperator)) {
+          remainingOperators.addAll(currentOperator.getChildOperators());
+        }
       }
+    }
 
-      visited.add(curr);
+    return cluster;
+  }
 
-      Cluster currentCluster = nodeCluster.get(curr);
-      if (currentCluster == null) {
-        currentCluster=new Cluster();
-        currentCluster.add(curr);
+  private Set<Cluster> createClusterSet(ParseContext pctx) {
+    Set<Operator<?>> rootOperators = new HashSet<>(pctx.getTopOps().values());
+    Set<Operator<?>> mergeJoinOperators = new HashSet<>();
+    for (Operator<?> operator: pctx.getAllOps()) {
+      if (operator instanceof CommonMergeJoinOperator) {
+        mergeJoinOperators.add(operator);
       }
-      List<Operator<?>> parents = curr.getParentOperators();
-      for (int i = 0; i < parents.size(); i++) {
-        Operator<?> p = parents.get(i);
-        if (curr instanceof MapJoinOperator && p instanceof 
ReduceSinkOperator) {
-          g.putEdgeValue(p, curr, new OpEdge(EdgeType.BROADCAST, i));
-        } else {
-          g.putEdgeValue(p, curr, new OpEdge(EdgeType.FLOW, i));
+
+      if (operator instanceof ReduceSinkOperator) {
+        // TODO: Do we need to consider SJ and DPP?
+        for (Operator<?> childOperator: operator.getChildOperators()) {
+          if (childOperator instanceof MapJoinOperator) {
+            MapJoinOperator childMJOperator = (MapJoinOperator) childOperator;
+            int parentTag = 
childMJOperator.getParentOperators().indexOf(operator);
+            int bigTablePos = childMJOperator.getConf().getPosBigTable();
+            if (parentTag == bigTablePos) {
+              rootOperators.add(childOperator);
+            }
+          } else {
+            rootOperators.add(childOperator);
+          }
         }
-        if (p instanceof ReduceSinkOperator) {
-          // ignore cluster of parent RS
-          continue;
+      }
+    }
+
+    Set<Cluster> clusters = new HashSet<>();
+    for (Operator<?> rootOperator: rootOperators) {
+      clusters.add(createCluster(rootOperator));
+    }
+
+    for (Operator<?> operator: mergeJoinOperators) {

Review Comment:
   The example explains the answer to my question well. Thanks.



##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/ParallelEdgeFixer.java:
##########
@@ -185,23 +188,36 @@ private String sig(Pair<Operator<?>, Operator<?>> o1) {
     }
   }
 
+  private static class ActualEdgePredicate implements 
OperatorGraph.OperatorEdgePredicate {
+    EnumSet<EdgeType> acceptableEdgeTypes = EnumSet.of(EdgeType.FLOW, 
EdgeType.SEMIJOIN, EdgeType.BROADCAST);

Review Comment:
   That's true. Please make make it private static constant variable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org

Reply via email to