okumin commented on code in PR #3998:
URL: https://github.com/apache/hive/pull/3998#discussion_r1957263810


##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/graph/OperatorGraph.java:
##########
@@ -81,183 +76,202 @@ public EdgeType getEdgeType() {
 
   }
 
-  public static interface OperatorEdgePredicate {
+  public interface OperatorEdgePredicate {
 
     boolean accept(Operator<?> s, Operator<?> t, OpEdge opEdge);
 
   }
 
-  Map<Operator<?>, Cluster> nodeCluster = new HashMap<>();
-
   public class Cluster {
 
-    Set<Operator<?>> members = new LinkedHashSet<>();
+    private final Set<Operator<?>> members = new HashSet<>();
 
-    protected void merge(Cluster o) {
-      if (o == this) {
-        return;
-      }
-      for (Operator<?> node : o.members) {
-        add(node);
-      }
-      o.members.clear();
+    public void add(Operator<?> operator) {
+      members.add(operator);
     }
 
-    protected void add(Operator<?> curr) {
-      nodeCluster.put(curr, this);
-      members.add(curr);
+    public void merge(Cluster cluster) {
+      members.addAll(cluster.getMembers());
     }
 
     public Set<Cluster> parentClusters(OperatorEdgePredicate traverseEdge) {
       Set<Cluster> ret = new HashSet<Cluster>();
       for (Operator<?> operator : members) {
-        for (Operator<? extends OperatorDesc> p : 
operator.getParentOperators()) {
-          if (members.contains(p)) {
-            continue;
-          }
-          Optional<OpEdge> e = g.getEdge(p, operator);
-          if (traverseEdge.accept(p, operator, e.get())) {
-            ret.add(nodeCluster.get(p));
+        Stream<Operator<?>> foreignParentOperators =
+            dagGraph.predecessors(operator).stream()
+                .filter(pOp -> !members.contains(pOp))
+                .filter(pOp -> traverseEdge.accept(pOp, operator, 
dagGraph.getEdge(pOp, operator).get()));
+        foreignParentOperators.forEach(parentOperator -> {
+          for (Cluster parentCluster: operatorToCluster.get(parentOperator)) {
+            if (!parentCluster.getMembers().contains(operator)) {
+              ret.add(parentCluster);
+            }
           }
-        }
+        });
       }
       return ret;
     }
 
     public Set<Cluster> childClusters(OperatorEdgePredicate traverseEdge) {
       Set<Cluster> ret = new HashSet<Cluster>();
       for (Operator<?> operator : members) {
-        for (Operator<? extends OperatorDesc> p : 
operator.getChildOperators()) {
-          if (members.contains(p)) {
-            continue;
-          }
-          Optional<OpEdge> e = g.getEdge(operator, p);
-          if (traverseEdge.accept(operator, p, e.get())) {
-            ret.add(nodeCluster.get(p));
+        Stream<Operator<?>> foreignChildOperators =
+            dagGraph.successors(operator).stream()
+                .filter(cOp -> !members.contains(cOp))
+                .filter(cOp -> traverseEdge.accept(operator, cOp, 
dagGraph.getEdge(operator, cOp).get()));
+        foreignChildOperators.forEach(childOperator -> {
+          for (Cluster childCluster: operatorToCluster.get(childOperator)) {
+            if (!childCluster.getMembers().contains(operator)) {
+              ret.add(childCluster);
+            }
           }
-        }
+        });
       }
       return ret;
     }
 
     public Set<Operator<?>> getMembers() {
       return Collections.unmodifiableSet(members);
     }
+
   }
 
+  private Cluster createCluster(Operator<?> rootOperator) {
+    Cluster cluster = new Cluster();
+    Queue<Operator<?>> remainingOperators = new LinkedList<>();
+    remainingOperators.add(rootOperator);
 
-  public OperatorGraph(ParseContext pctx) {
-    g = new DagGraph<Operator<?>, OperatorGraph.OpEdge>();
-    Set<Operator<?>> visited = Sets.newIdentityHashSet();
-    Set<Operator<?>> seen = Sets.newIdentityHashSet();
-
-    seen.addAll(pctx.getTopOps().values());
-    while (!seen.isEmpty()) {
-      Operator<?> curr = seen.iterator().next();
-      seen.remove(curr);
-      if (visited.contains(curr)) {
-        continue;
+    while (!remainingOperators.isEmpty()) {
+      Operator<?> currentOperator = remainingOperators.poll();
+      if (!cluster.getMembers().contains(currentOperator)) {
+        cluster.add(currentOperator);
+
+        // TODO: write about DummyStoreOperator and FileSinkOperator
+        if (!(currentOperator instanceof ReduceSinkOperator)) {
+          remainingOperators.addAll(currentOperator.getChildOperators());
+        }
       }
+    }
 
-      visited.add(curr);
+    return cluster;
+  }
 
-      Cluster currentCluster = nodeCluster.get(curr);
-      if (currentCluster == null) {
-        currentCluster=new Cluster();
-        currentCluster.add(curr);
+  private Set<Cluster> createClusterSet(ParseContext pctx) {
+    Set<Operator<?>> rootOperators = new HashSet<>(pctx.getTopOps().values());
+    Set<Operator<?>> mergeJoinOperators = new HashSet<>();
+    for (Operator<?> operator: pctx.getAllOps()) {
+      if (operator instanceof CommonMergeJoinOperator) {
+        mergeJoinOperators.add(operator);
       }
-      List<Operator<?>> parents = curr.getParentOperators();
-      for (int i = 0; i < parents.size(); i++) {
-        Operator<?> p = parents.get(i);
-        if (curr instanceof MapJoinOperator && p instanceof 
ReduceSinkOperator) {
-          g.putEdgeValue(p, curr, new OpEdge(EdgeType.BROADCAST, i));
-        } else {
-          g.putEdgeValue(p, curr, new OpEdge(EdgeType.FLOW, i));
+
+      if (operator instanceof ReduceSinkOperator) {
+        // TODO: Do we need to consider SJ and DPP?
+        for (Operator<?> childOperator: operator.getChildOperators()) {
+          if (childOperator instanceof MapJoinOperator) {
+            MapJoinOperator childMJOperator = (MapJoinOperator) childOperator;
+            int parentTag = 
childMJOperator.getParentOperators().indexOf(operator);
+            int bigTablePos = childMJOperator.getConf().getPosBigTable();
+            if (parentTag == bigTablePos) {
+              rootOperators.add(childOperator);
+            }
+          } else {
+            rootOperators.add(childOperator);
+          }
         }
-        if (p instanceof ReduceSinkOperator) {
-          // ignore cluster of parent RS
-          continue;
+      }
+    }
+
+    Set<Cluster> clusters = new HashSet<>();
+    for (Operator<?> rootOperator: rootOperators) {
+      clusters.add(createCluster(rootOperator));
+    }
+
+    for (Operator<?> operator: mergeJoinOperators) {
+      Set<Cluster> mergeJoinCluster = new HashSet<>();
+      for (Cluster cluster: clusters) {
+        if (cluster.getMembers().contains(operator)) {
+          mergeJoinCluster.add(cluster);
         }
-        Cluster cluster = nodeCluster.get(p);
-        if (cluster != null) {
-          currentCluster.merge(cluster);
+      }
+
+      if (!mergeJoinCluster.isEmpty()) {
+        Cluster mergedCluster = new Cluster();
+        for (Cluster cluster: mergeJoinCluster) {
+          mergedCluster.merge(cluster);
+          clusters.remove(cluster);
+        }
+        clusters.add(mergedCluster);
+      }
+    }
+
+    return clusters;
+  }
+
+  private DagGraph<Operator<?>, OperatorGraph.OpEdge> 
createDagGraph(ParseContext pctx) {

Review Comment:
   I expect the new `DagGraph<Operator<?>, OpEdge>` is identical to the 
previous `DagGraph<Operator<?>, OpEdge>`. This is a kind of refactoring.



##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/graph/OperatorGraph.java:
##########
@@ -81,183 +76,202 @@ public EdgeType getEdgeType() {
 
   }
 
-  public static interface OperatorEdgePredicate {
+  public interface OperatorEdgePredicate {
 
     boolean accept(Operator<?> s, Operator<?> t, OpEdge opEdge);
 
   }
 
-  Map<Operator<?>, Cluster> nodeCluster = new HashMap<>();
-
   public class Cluster {
 
-    Set<Operator<?>> members = new LinkedHashSet<>();
+    private final Set<Operator<?>> members = new HashSet<>();
 
-    protected void merge(Cluster o) {
-      if (o == this) {
-        return;
-      }
-      for (Operator<?> node : o.members) {
-        add(node);
-      }
-      o.members.clear();
+    public void add(Operator<?> operator) {
+      members.add(operator);
     }
 
-    protected void add(Operator<?> curr) {
-      nodeCluster.put(curr, this);
-      members.add(curr);
+    public void merge(Cluster cluster) {
+      members.addAll(cluster.getMembers());
     }
 
     public Set<Cluster> parentClusters(OperatorEdgePredicate traverseEdge) {
       Set<Cluster> ret = new HashSet<Cluster>();
       for (Operator<?> operator : members) {
-        for (Operator<? extends OperatorDesc> p : 
operator.getParentOperators()) {
-          if (members.contains(p)) {
-            continue;
-          }
-          Optional<OpEdge> e = g.getEdge(p, operator);
-          if (traverseEdge.accept(p, operator, e.get())) {
-            ret.add(nodeCluster.get(p));
+        Stream<Operator<?>> foreignParentOperators =
+            dagGraph.predecessors(operator).stream()
+                .filter(pOp -> !members.contains(pOp))
+                .filter(pOp -> traverseEdge.accept(pOp, operator, 
dagGraph.getEdge(pOp, operator).get()));
+        foreignParentOperators.forEach(parentOperator -> {
+          for (Cluster parentCluster: operatorToCluster.get(parentOperator)) {
+            if (!parentCluster.getMembers().contains(operator)) {
+              ret.add(parentCluster);
+            }
           }
-        }
+        });
       }
       return ret;
     }
 
     public Set<Cluster> childClusters(OperatorEdgePredicate traverseEdge) {
       Set<Cluster> ret = new HashSet<Cluster>();
       for (Operator<?> operator : members) {
-        for (Operator<? extends OperatorDesc> p : 
operator.getChildOperators()) {
-          if (members.contains(p)) {
-            continue;
-          }
-          Optional<OpEdge> e = g.getEdge(operator, p);
-          if (traverseEdge.accept(operator, p, e.get())) {
-            ret.add(nodeCluster.get(p));
+        Stream<Operator<?>> foreignChildOperators =
+            dagGraph.successors(operator).stream()
+                .filter(cOp -> !members.contains(cOp))
+                .filter(cOp -> traverseEdge.accept(operator, cOp, 
dagGraph.getEdge(operator, cOp).get()));
+        foreignChildOperators.forEach(childOperator -> {
+          for (Cluster childCluster: operatorToCluster.get(childOperator)) {
+            if (!childCluster.getMembers().contains(operator)) {
+              ret.add(childCluster);
+            }
           }
-        }
+        });
       }
       return ret;
     }
 
     public Set<Operator<?>> getMembers() {
       return Collections.unmodifiableSet(members);
     }
+
   }
 
+  private Cluster createCluster(Operator<?> rootOperator) {
+    Cluster cluster = new Cluster();
+    Queue<Operator<?>> remainingOperators = new LinkedList<>();
+    remainingOperators.add(rootOperator);
 
-  public OperatorGraph(ParseContext pctx) {
-    g = new DagGraph<Operator<?>, OperatorGraph.OpEdge>();
-    Set<Operator<?>> visited = Sets.newIdentityHashSet();
-    Set<Operator<?>> seen = Sets.newIdentityHashSet();
-
-    seen.addAll(pctx.getTopOps().values());
-    while (!seen.isEmpty()) {
-      Operator<?> curr = seen.iterator().next();
-      seen.remove(curr);
-      if (visited.contains(curr)) {
-        continue;
+    while (!remainingOperators.isEmpty()) {
+      Operator<?> currentOperator = remainingOperators.poll();
+      if (!cluster.getMembers().contains(currentOperator)) {
+        cluster.add(currentOperator);
+
+        // TODO: write about DummyStoreOperator and FileSinkOperator
+        if (!(currentOperator instanceof ReduceSinkOperator)) {
+          remainingOperators.addAll(currentOperator.getChildOperators());
+        }
       }
+    }
 
-      visited.add(curr);
+    return cluster;
+  }
 
-      Cluster currentCluster = nodeCluster.get(curr);
-      if (currentCluster == null) {
-        currentCluster=new Cluster();
-        currentCluster.add(curr);
+  private Set<Cluster> createClusterSet(ParseContext pctx) {
+    Set<Operator<?>> rootOperators = new HashSet<>(pctx.getTopOps().values());
+    Set<Operator<?>> mergeJoinOperators = new HashSet<>();
+    for (Operator<?> operator: pctx.getAllOps()) {
+      if (operator instanceof CommonMergeJoinOperator) {
+        mergeJoinOperators.add(operator);
       }
-      List<Operator<?>> parents = curr.getParentOperators();
-      for (int i = 0; i < parents.size(); i++) {
-        Operator<?> p = parents.get(i);
-        if (curr instanceof MapJoinOperator && p instanceof 
ReduceSinkOperator) {
-          g.putEdgeValue(p, curr, new OpEdge(EdgeType.BROADCAST, i));
-        } else {
-          g.putEdgeValue(p, curr, new OpEdge(EdgeType.FLOW, i));
+
+      if (operator instanceof ReduceSinkOperator) {
+        // TODO: Do we need to consider SJ and DPP?
+        for (Operator<?> childOperator: operator.getChildOperators()) {
+          if (childOperator instanceof MapJoinOperator) {
+            MapJoinOperator childMJOperator = (MapJoinOperator) childOperator;
+            int parentTag = 
childMJOperator.getParentOperators().indexOf(operator);
+            int bigTablePos = childMJOperator.getConf().getPosBigTable();
+            if (parentTag == bigTablePos) {

Review Comment:
   I am less confident if this branch is valid or not.
   
   ### If it is a pure Map Join
   
   The ancestor of the MapJoinOperator includes TableScanOperator. I assume 
`pctx.getTopOps().values()` consists of the TSO -> MJO. `ConvertJoinMapJoin` is 
processed before `SharedWorkOptimizer`.
   
   Does the current implementation create duplicated clusters?
   
   ### If it is a dynamic partitioned hash join
   
   The MapJoinOperator behaves similarly to CommonMergeJoinOperator.
   
   The current implementation will likely work, but we might not need `if 
(parentTag == bigTablePos)` in this case.



##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/graph/OperatorGraph.java:
##########
@@ -81,183 +76,202 @@ public EdgeType getEdgeType() {
 
   }
 
-  public static interface OperatorEdgePredicate {
+  public interface OperatorEdgePredicate {
 
     boolean accept(Operator<?> s, Operator<?> t, OpEdge opEdge);
 
   }
 
-  Map<Operator<?>, Cluster> nodeCluster = new HashMap<>();
-
   public class Cluster {
 
-    Set<Operator<?>> members = new LinkedHashSet<>();
+    private final Set<Operator<?>> members = new HashSet<>();
 
-    protected void merge(Cluster o) {
-      if (o == this) {
-        return;
-      }
-      for (Operator<?> node : o.members) {
-        add(node);
-      }
-      o.members.clear();
+    public void add(Operator<?> operator) {
+      members.add(operator);
     }
 
-    protected void add(Operator<?> curr) {
-      nodeCluster.put(curr, this);
-      members.add(curr);
+    public void merge(Cluster cluster) {
+      members.addAll(cluster.getMembers());
     }
 
     public Set<Cluster> parentClusters(OperatorEdgePredicate traverseEdge) {
       Set<Cluster> ret = new HashSet<Cluster>();
       for (Operator<?> operator : members) {
-        for (Operator<? extends OperatorDesc> p : 
operator.getParentOperators()) {
-          if (members.contains(p)) {
-            continue;
-          }
-          Optional<OpEdge> e = g.getEdge(p, operator);
-          if (traverseEdge.accept(p, operator, e.get())) {
-            ret.add(nodeCluster.get(p));
+        Stream<Operator<?>> foreignParentOperators =
+            dagGraph.predecessors(operator).stream()
+                .filter(pOp -> !members.contains(pOp))
+                .filter(pOp -> traverseEdge.accept(pOp, operator, 
dagGraph.getEdge(pOp, operator).get()));
+        foreignParentOperators.forEach(parentOperator -> {
+          for (Cluster parentCluster: operatorToCluster.get(parentOperator)) {
+            if (!parentCluster.getMembers().contains(operator)) {
+              ret.add(parentCluster);
+            }
           }
-        }
+        });
       }
       return ret;
     }
 
     public Set<Cluster> childClusters(OperatorEdgePredicate traverseEdge) {
       Set<Cluster> ret = new HashSet<Cluster>();
       for (Operator<?> operator : members) {
-        for (Operator<? extends OperatorDesc> p : 
operator.getChildOperators()) {
-          if (members.contains(p)) {
-            continue;
-          }
-          Optional<OpEdge> e = g.getEdge(operator, p);
-          if (traverseEdge.accept(operator, p, e.get())) {
-            ret.add(nodeCluster.get(p));
+        Stream<Operator<?>> foreignChildOperators =
+            dagGraph.successors(operator).stream()
+                .filter(cOp -> !members.contains(cOp))
+                .filter(cOp -> traverseEdge.accept(operator, cOp, 
dagGraph.getEdge(operator, cOp).get()));
+        foreignChildOperators.forEach(childOperator -> {
+          for (Cluster childCluster: operatorToCluster.get(childOperator)) {
+            if (!childCluster.getMembers().contains(operator)) {
+              ret.add(childCluster);
+            }
           }
-        }
+        });
       }
       return ret;
     }
 
     public Set<Operator<?>> getMembers() {
       return Collections.unmodifiableSet(members);
     }
+
   }
 
+  private Cluster createCluster(Operator<?> rootOperator) {
+    Cluster cluster = new Cluster();
+    Queue<Operator<?>> remainingOperators = new LinkedList<>();
+    remainingOperators.add(rootOperator);
 
-  public OperatorGraph(ParseContext pctx) {
-    g = new DagGraph<Operator<?>, OperatorGraph.OpEdge>();
-    Set<Operator<?>> visited = Sets.newIdentityHashSet();
-    Set<Operator<?>> seen = Sets.newIdentityHashSet();
-
-    seen.addAll(pctx.getTopOps().values());
-    while (!seen.isEmpty()) {
-      Operator<?> curr = seen.iterator().next();
-      seen.remove(curr);
-      if (visited.contains(curr)) {
-        continue;
+    while (!remainingOperators.isEmpty()) {
+      Operator<?> currentOperator = remainingOperators.poll();
+      if (!cluster.getMembers().contains(currentOperator)) {
+        cluster.add(currentOperator);
+
+        // TODO: write about DummyStoreOperator and FileSinkOperator
+        if (!(currentOperator instanceof ReduceSinkOperator)) {
+          remainingOperators.addAll(currentOperator.getChildOperators());
+        }
       }
+    }
 
-      visited.add(curr);
+    return cluster;
+  }
 
-      Cluster currentCluster = nodeCluster.get(curr);
-      if (currentCluster == null) {
-        currentCluster=new Cluster();
-        currentCluster.add(curr);
+  private Set<Cluster> createClusterSet(ParseContext pctx) {
+    Set<Operator<?>> rootOperators = new HashSet<>(pctx.getTopOps().values());
+    Set<Operator<?>> mergeJoinOperators = new HashSet<>();
+    for (Operator<?> operator: pctx.getAllOps()) {
+      if (operator instanceof CommonMergeJoinOperator) {
+        mergeJoinOperators.add(operator);
       }
-      List<Operator<?>> parents = curr.getParentOperators();
-      for (int i = 0; i < parents.size(); i++) {
-        Operator<?> p = parents.get(i);
-        if (curr instanceof MapJoinOperator && p instanceof 
ReduceSinkOperator) {
-          g.putEdgeValue(p, curr, new OpEdge(EdgeType.BROADCAST, i));
-        } else {
-          g.putEdgeValue(p, curr, new OpEdge(EdgeType.FLOW, i));
+
+      if (operator instanceof ReduceSinkOperator) {
+        // TODO: Do we need to consider SJ and DPP?
+        for (Operator<?> childOperator: operator.getChildOperators()) {
+          if (childOperator instanceof MapJoinOperator) {
+            MapJoinOperator childMJOperator = (MapJoinOperator) childOperator;
+            int parentTag = 
childMJOperator.getParentOperators().indexOf(operator);
+            int bigTablePos = childMJOperator.getConf().getPosBigTable();
+            if (parentTag == bigTablePos) {
+              rootOperators.add(childOperator);
+            }
+          } else {
+            rootOperators.add(childOperator);
+          }
         }
-        if (p instanceof ReduceSinkOperator) {
-          // ignore cluster of parent RS
-          continue;
+      }
+    }
+
+    Set<Cluster> clusters = new HashSet<>();
+    for (Operator<?> rootOperator: rootOperators) {
+      clusters.add(createCluster(rootOperator));
+    }
+
+    for (Operator<?> operator: mergeJoinOperators) {

Review Comment:
   I'm still not understanding the intention. I assume the following operator 
tree.
   
   ```
   TS1 -> RS3 --> CommonMergeJoinOperator -> FS
                    ^ 
                    |
   TS2 -> RS4 ------
   ```
   
   I expect we will have three clusters.
   - TS1 + RS3
   - TS2 + RS4
   - CommonMergeJoinOperator + FS
   
   I think this condition has been already achieved because the `rootOperators` 
is a hash set.



##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/ParallelEdgeFixer.java:
##########
@@ -185,23 +188,36 @@ private String sig(Pair<Operator<?>, Operator<?>> o1) {
     }
   }
 
+  private static class ActualEdgePredicate implements 
OperatorGraph.OperatorEdgePredicate {
+    EnumSet<EdgeType> acceptableEdgeTypes = EnumSet.of(EdgeType.FLOW, 
EdgeType.SEMIJOIN, EdgeType.BROADCAST);

Review Comment:
   Don't we need DPP?



##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/graph/OperatorGraph.java:
##########
@@ -81,183 +76,202 @@ public EdgeType getEdgeType() {
 
   }
 
-  public static interface OperatorEdgePredicate {
+  public interface OperatorEdgePredicate {
 
     boolean accept(Operator<?> s, Operator<?> t, OpEdge opEdge);
 
   }
 
-  Map<Operator<?>, Cluster> nodeCluster = new HashMap<>();
-
   public class Cluster {
 
-    Set<Operator<?>> members = new LinkedHashSet<>();
+    private final Set<Operator<?>> members = new HashSet<>();
 
-    protected void merge(Cluster o) {
-      if (o == this) {
-        return;
-      }
-      for (Operator<?> node : o.members) {
-        add(node);
-      }
-      o.members.clear();
+    public void add(Operator<?> operator) {
+      members.add(operator);
     }
 
-    protected void add(Operator<?> curr) {
-      nodeCluster.put(curr, this);
-      members.add(curr);
+    public void merge(Cluster cluster) {
+      members.addAll(cluster.getMembers());
     }
 
     public Set<Cluster> parentClusters(OperatorEdgePredicate traverseEdge) {
       Set<Cluster> ret = new HashSet<Cluster>();
       for (Operator<?> operator : members) {
-        for (Operator<? extends OperatorDesc> p : 
operator.getParentOperators()) {
-          if (members.contains(p)) {
-            continue;
-          }
-          Optional<OpEdge> e = g.getEdge(p, operator);
-          if (traverseEdge.accept(p, operator, e.get())) {
-            ret.add(nodeCluster.get(p));
+        Stream<Operator<?>> foreignParentOperators =
+            dagGraph.predecessors(operator).stream()
+                .filter(pOp -> !members.contains(pOp))
+                .filter(pOp -> traverseEdge.accept(pOp, operator, 
dagGraph.getEdge(pOp, operator).get()));
+        foreignParentOperators.forEach(parentOperator -> {
+          for (Cluster parentCluster: operatorToCluster.get(parentOperator)) {
+            if (!parentCluster.getMembers().contains(operator)) {
+              ret.add(parentCluster);
+            }
           }
-        }
+        });
       }
       return ret;
     }
 
     public Set<Cluster> childClusters(OperatorEdgePredicate traverseEdge) {
       Set<Cluster> ret = new HashSet<Cluster>();
       for (Operator<?> operator : members) {
-        for (Operator<? extends OperatorDesc> p : 
operator.getChildOperators()) {
-          if (members.contains(p)) {
-            continue;
-          }
-          Optional<OpEdge> e = g.getEdge(operator, p);
-          if (traverseEdge.accept(operator, p, e.get())) {
-            ret.add(nodeCluster.get(p));
+        Stream<Operator<?>> foreignChildOperators =
+            dagGraph.successors(operator).stream()
+                .filter(cOp -> !members.contains(cOp))
+                .filter(cOp -> traverseEdge.accept(operator, cOp, 
dagGraph.getEdge(operator, cOp).get()));
+        foreignChildOperators.forEach(childOperator -> {
+          for (Cluster childCluster: operatorToCluster.get(childOperator)) {
+            if (!childCluster.getMembers().contains(operator)) {
+              ret.add(childCluster);
+            }
           }
-        }
+        });
       }
       return ret;
     }
 
     public Set<Operator<?>> getMembers() {
       return Collections.unmodifiableSet(members);
     }
+
   }
 
+  private Cluster createCluster(Operator<?> rootOperator) {
+    Cluster cluster = new Cluster();
+    Queue<Operator<?>> remainingOperators = new LinkedList<>();
+    remainingOperators.add(rootOperator);
 
-  public OperatorGraph(ParseContext pctx) {
-    g = new DagGraph<Operator<?>, OperatorGraph.OpEdge>();
-    Set<Operator<?>> visited = Sets.newIdentityHashSet();
-    Set<Operator<?>> seen = Sets.newIdentityHashSet();
-
-    seen.addAll(pctx.getTopOps().values());
-    while (!seen.isEmpty()) {
-      Operator<?> curr = seen.iterator().next();
-      seen.remove(curr);
-      if (visited.contains(curr)) {
-        continue;
+    while (!remainingOperators.isEmpty()) {
+      Operator<?> currentOperator = remainingOperators.poll();
+      if (!cluster.getMembers().contains(currentOperator)) {
+        cluster.add(currentOperator);
+
+        // TODO: write about DummyStoreOperator and FileSinkOperator
+        if (!(currentOperator instanceof ReduceSinkOperator)) {
+          remainingOperators.addAll(currentOperator.getChildOperators());
+        }
       }
+    }
 
-      visited.add(curr);
+    return cluster;
+  }
 
-      Cluster currentCluster = nodeCluster.get(curr);
-      if (currentCluster == null) {
-        currentCluster=new Cluster();
-        currentCluster.add(curr);
+  private Set<Cluster> createClusterSet(ParseContext pctx) {

Review Comment:
   I'm asserting my understanding.
   The original implementation merged clusters unless the ReduceSinkOperator 
decoupled them. It didn't work well with UNION because the same RSO can be 
replicated, and the ancestors can later be decomposed into multiple Tez 
vertices.
   The new implementation does not merge clusters by default so that we can 
retain two or more unmerged clusters in such cases.



##########
ql/src/test/org/apache/hadoop/hive/ql/optimizer/graph/TestOperatorGraph.java:
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.graph;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;

Review Comment:
   This is unused



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org

Reply via email to