Author: gunther
Date: Mon Sep 30 18:03:01 2013
New Revision: 1527689

URL: http://svn.apache.org/r1527689
Log:
HIVE-5368: Changes to work creation for tez (Vikram Dixit K via Gunther 
Hagleitner)

Added:
    
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
Modified:
    
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
    
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
    
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
    
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
    
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
    
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
    
hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWork.java

Modified: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1527689&r1=1527688&r2=1527689&view=diff
==============================================================================
--- 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java 
(original)
+++ 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java 
Mon Sep 30 18:03:01 2013
@@ -46,6 +46,7 @@ import org.apache.hadoop.hive.ql.io.Hive
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType;
 import org.apache.hadoop.hive.shims.Hadoop20Shims.NullOutputCommitter;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.BytesWritable;
@@ -147,7 +148,8 @@ public class DagUtils {
    * @param w The second vertex (sink)
    * @return
    */
-  public static Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex 
w)
+  public static Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex 
w,
+      EdgeType edgeType)
       throws IOException {
 
     // Tez needs to setup output subsequent input pairs correctly
@@ -157,9 +159,23 @@ public class DagUtils {
     
v.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(vConf));
     
w.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(wConf));
 
-    // all edges are of the same type right now
+    DataMovementType dataMovementType;
+    switch (edgeType) {
+    case BROADCAST_EDGE:
+      dataMovementType = DataMovementType.BROADCAST;
+      break;
+
+    case SIMPLE_EDGE:
+      dataMovementType = DataMovementType.SCATTER_GATHER;
+      break;
+
+    default:
+      dataMovementType = DataMovementType.SCATTER_GATHER;
+      break;
+    }
+
     EdgeProperty edgeProperty =
-        new EdgeProperty(DataMovementType.SCATTER_GATHER,
+        new EdgeProperty(dataMovementType,
             DataSourceType.PERSISTED,
             SchedulingType.SEQUENTIAL,
             new OutputDescriptor(OnFileSortedOutput.class.getName()),

Modified: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1527689&r1=1527688&r2=1527689&view=diff
==============================================================================
--- 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java 
(original)
+++ 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java 
Mon Sep 30 18:03:01 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.exec.Ta
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.mapred.JobConf;
@@ -172,7 +173,13 @@ public class TezTask extends Task<TezWor
       // add all dependencies (i.e.: edges) to the graph
       for (BaseWork v: work.getChildren(w)) {
         assert workToVertex.containsKey(v);
-        Edge e = DagUtils.createEdge(wxConf, wx, workToConf.get(v), 
workToVertex.get(v));
+        Edge e = null;
+        EdgeType edgeType = EdgeType.SIMPLE_EDGE;
+        if (work.isBroadCastEdge(w, v)) {
+          edgeType = EdgeType.BROADCAST_EDGE;
+        }
+
+        e = DagUtils.createEdge(wxConf, wx, workToConf.get(v), 
workToVertex.get(v), edgeType);
         dag.addEdge(e);
       }
     }

Modified: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java?rev=1527689&r1=1527688&r2=1527689&view=diff
==============================================================================
--- 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
 (original)
+++ 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
 Mon Sep 30 18:03:01 2013
@@ -32,7 +32,6 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.optimizer.MapJoinProcessor;
 import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;

Added: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java?rev=1527689&view=auto
==============================================================================
--- 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
 (added)
+++ 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
 Mon Sep 30 18:03:01 2013
@@ -0,0 +1,85 @@
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.parse.GenTezProcContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType;
+
+public class ReduceSinkMapJoinProc implements NodeProcessor {
+
+  /* (non-Javadoc)
+   * This processor addresses the RS-MJ case that occurs in tez on the 
small/hash
+   * table side of things. The connection between the work that RS will be a 
part of
+   * must be connected to the MJ work via be a broadcast edge.
+   * We should not walk down the tree when we encounter this pattern because:
+   * the type of work (map work or reduce work) needs to be determined
+   * on the basis of the big table side because it may be a mapwork (no need 
for shuffle)
+   * or reduce work.
+   */
+  @Override
+  public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx 
procContext, Object... nodeOutputs)
+      throws SemanticException {
+    GenTezProcContext context = (GenTezProcContext) procContext;
+    context.preceedingWork = null;
+    context.currentRootOperator = null;
+
+    MapJoinOperator mapJoinOp = (MapJoinOperator)nd;
+
+    Operator<? extends OperatorDesc>childOp = 
mapJoinOp.getChildOperators().get(0);
+    ReduceSinkOperator parentRS = (ReduceSinkOperator)stack.get(stack.size() - 
2);
+    while (childOp != null) {
+      if ((childOp instanceof ReduceSinkOperator) || (childOp instanceof 
FileSinkOperator)) {
+        /*
+         *  if there was a pre-existing work generated for the big-table 
mapjoin side,
+         *  we need to hook the work generated for the RS (associated with the 
RS-MJ pattern)
+         *  with the pre-existing work.
+         *
+         *  Otherwise, we need to associate that the reduce sink/file sink 
down the MJ path
+         *  to be linked to the RS work (associated with the RS-MJ pattern).
+         *
+         */
+
+        BaseWork myWork = context.operatorWorkMap.get(childOp);
+        BaseWork parentWork = context.operatorWorkMap.get(parentRS);
+        if (myWork != null) {
+          // link the work with the work associated with the reduce sink that 
triggered this rule
+          TezWork tezWork = context.currentTask.getWork();
+          tezWork.connect(parentWork, myWork, EdgeType.BROADCAST_EDGE);
+        } else {
+          List<BaseWork> linkWorkList = context.linkOpWithWorkMap.get(childOp);
+          if (linkWorkList == null) {
+            linkWorkList = new ArrayList<BaseWork>();
+          }
+          linkWorkList.add(parentWork);
+          context.linkOpWithWorkMap.put(childOp, linkWorkList);
+        }
+
+        break;
+      }
+
+      if ((childOp.getChildOperators() != null) && 
(childOp.getChildOperators().size() >= 1)) {
+        childOp = childOp.getChildOperators().get(0);
+      } else {
+        break;
+      }
+    }
+
+    // cut the operator tree so as to not retain connections from the parent 
RS downstream
+    parentRS.removeChild(mapJoinOp);
+    return true;
+  }
+
+}

Modified: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java?rev=1527689&r1=1527688&r2=1527689&view=diff
==============================================================================
--- 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
 (original)
+++ 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
 Mon Sep 30 18:03:01 2013
@@ -82,6 +82,13 @@ public class GenTezProcContext implement
   // that follows it. This is used for connecting them later.
   public final Map<Operator<?>, BaseWork> leafOperatorToFollowingWork;
 
+  // a map that keeps track of work that need to be linked while
+  // traversing an operator tree
+  public final Map<Operator<?>, List<BaseWork>> linkOpWithWorkMap;
+
+  // a map that maintains operator (file-sink or reduce-sink) to work mapping
+  public final Map<Operator<?>, BaseWork> operatorWorkMap;
+
 
   @SuppressWarnings("unchecked")
   public GenTezProcContext(HiveConf conf, ParseContext parseContext,
@@ -97,5 +104,7 @@ public class GenTezProcContext implement
     this.currentTask = (TezTask) TaskFactory.get(new TezWork(), conf);
     this.leafOperatorToFollowingWork = new HashMap<Operator<?>, BaseWork>();
     this.rootOperators = rootOperators;
+    this.linkOpWithWorkMap = new HashMap<Operator<?>, List<BaseWork>>();
+    this.operatorWorkMap = new HashMap<Operator<?>, BaseWork>();
   }
 }

Modified: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java?rev=1527689&r1=1527688&r2=1527689&view=diff
==============================================================================
--- 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java 
(original)
+++ 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java 
Mon Sep 30 18:03:01 2013
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.parse;
 
 import java.util.ArrayList;
+import java.util.List;
 import java.util.Stack;
 
 import org.apache.commons.logging.Log;
@@ -34,6 +35,7 @@ import org.apache.hadoop.hive.ql.plan.Ba
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType;
 
 /**
  * GenTezWork separates the operator tree into tez tasks.
@@ -101,9 +103,9 @@ public class GenTezWork implements NodeP
       reduceWork.setReducer(root);
       reduceWork.setNeedsTagging(GenMapRedUtils.needsTagging(reduceWork));
 
-      // All parents should be reduce sinks. We pick the one we just walked 
-      // to choose the number of reducers. In the join/union case they will 
-      // all be -1. In sort/order case where it matters there will be only 
+      // All parents should be reduce sinks. We pick the one we just walked
+      // to choose the number of reducers. In the join/union case they will
+      // all be -1. In sort/order case where it matters there will be only
       // one parent.
       assert context.parentOfRoot instanceof ReduceSinkOperator;
       ReduceSinkOperator reduceSink = (ReduceSinkOperator) 
context.parentOfRoot;
@@ -121,7 +123,8 @@ public class GenTezWork implements NodeP
       tezWork.add(reduceWork);
       tezWork.connect(
           context.preceedingWork,
-          reduceWork);
+          reduceWork, EdgeType.SIMPLE_EDGE);
+
       work = reduceWork;
     }
 
@@ -142,21 +145,20 @@ public class GenTezWork implements NodeP
       BaseWork followingWork = 
context.leafOperatorToFollowingWork.get(operator);
 
       // need to add this branch to the key + value info
-      assert operator instanceof ReduceSinkOperator 
+      assert operator instanceof ReduceSinkOperator
         && followingWork instanceof ReduceWork;
       ReduceSinkOperator rs = (ReduceSinkOperator) operator;
       ReduceWork rWork = (ReduceWork) followingWork;
       GenMapRedUtils.setKeyAndValueDesc(rWork, rs);
 
       // add dependency between the two work items
-      tezWork.connect(work, context.leafOperatorToFollowingWork.get(operator));
+      tezWork.connect(work, context.leafOperatorToFollowingWork.get(operator), 
+         EdgeType.SIMPLE_EDGE);
     }
 
     // This is where we cut the tree as described above. We also remember that
     // we might have to connect parent work with this work later.
     for (Operator<?> parent: new 
ArrayList<Operator<?>>(root.getParentOperators())) {
-      assert !context.leafOperatorToFollowingWork.containsKey(parent);
-      assert !(work instanceof MapWork);
       context.leafOperatorToFollowingWork.put(parent, work);
       LOG.debug("Removing " + parent + " as parent from " + root);
       root.removeParent(parent);
@@ -175,6 +177,30 @@ public class GenTezWork implements NodeP
       context.preceedingWork = null;
     }
 
+    /*
+     * this happens in case of map join operations.
+     * The tree looks like this:
+     *
+     *        RS <--- we are here perhaps
+     *        |
+     *      MapJoin
+     *    /     \
+     *  RS       TS
+     *  /
+     * TS
+     *
+     * If we are at the RS pointed above, and we may have already visited the
+     * RS following the TS, we have already generated work for the TS-RS.
+     * We need to hook the current work to this generated work.
+     */
+    context.operatorWorkMap.put(operator, work);
+    List<BaseWork> linkWorkList = context.linkOpWithWorkMap.get(operator);
+    if (linkWorkList != null) {
+      for (BaseWork parentWork : linkWorkList) {
+        tezWork.connect(parentWork, work, EdgeType.BROADCAST_EDGE);
+      }
+    }
+
     return null;
   }
 

Modified: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java?rev=1527689&r1=1527688&r2=1527689&view=diff
==============================================================================
--- 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java 
(original)
+++ 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java 
Mon Sep 30 18:03:01 2013
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.Context
 import org.apache.hadoop.hive.ql.exec.ConditionalTask;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
@@ -47,6 +48,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.lib.Rule;
 import org.apache.hadoop.hive.ql.lib.RuleRegExp;
 import org.apache.hadoop.hive.ql.optimizer.ConvertJoinMapJoin;
+import org.apache.hadoop.hive.ql.optimizer.ReduceSinkMapJoinProc;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
@@ -122,6 +124,9 @@ public class TezCompiler extends TaskCom
     opRules.put(new RuleRegExp(new String("Split Work - ReduceSink"),
         ReduceSinkOperator.getOperatorName() + "%"),
         new GenTezWork());
+    opRules.put(new RuleRegExp(new String("No more walking on 
ReduceSink-MapJoin"),
+        ReduceSinkOperator.getOperatorName() + "%" +
+        MapJoinOperator.getOperatorName() + "%"), new ReduceSinkMapJoinProc());
     opRules.put(new RuleRegExp(new String("Split Work - FileSink"),
         FileSinkOperator.getOperatorName() + "%"),
         new GenTezWork());

Modified: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java?rev=1527689&r1=1527688&r2=1527689&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java 
(original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java 
Mon Sep 30 18:03:01 2013
@@ -29,8 +29,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 /**
- * TezWork. This class encapsulates all the work objects that can be executed 
- * in a single tez job. Currently it's basically a tree with MapWork at the 
+ * TezWork. This class encapsulates all the work objects that can be executed
+ * in a single tez job. Currently it's basically a tree with MapWork at the
  * leaves and and ReduceWork in all other nodes.
  *
  */
@@ -38,12 +38,18 @@ import org.apache.commons.logging.LogFac
 @Explain(displayName = "Tez")
 public class TezWork extends AbstractOperatorDesc {
 
+  public enum EdgeType {
+    SIMPLE_EDGE,
+    BROADCAST_EDGE
+  }
+
   private static transient final Log LOG = LogFactory.getLog(TezWork.class);
 
   private final Set<BaseWork> roots = new HashSet<BaseWork>();
   private final Set<BaseWork> leaves = new HashSet<BaseWork>();
   private final Map<BaseWork, List<BaseWork>> workGraph = new 
HashMap<BaseWork, List<BaseWork>>();
   private final Map<BaseWork, List<BaseWork>> invertedWorkGraph = new 
HashMap<BaseWork, List<BaseWork>>();
+  private final Map<BaseWork, List<BaseWork>> broadcastEdge = new 
HashMap<BaseWork, List<BaseWork>>();
 
   /**
    * getAllWork returns a topologically sorted list of BaseWork
@@ -53,12 +59,12 @@ public class TezWork extends AbstractOpe
 
     List<BaseWork> result = new LinkedList<BaseWork>();
     Set<BaseWork> seen = new HashSet<BaseWork>();
-    
+
     for (BaseWork leaf: leaves) {
       // make sure all leaves are visited at least once
       visit(leaf, seen, result);
     }
-    
+
     return result;
   }
 
@@ -76,7 +82,7 @@ public class TezWork extends AbstractOpe
         visit(parent, seen, result);
       }
     }
-    
+
     result.add(child);
   }
 
@@ -89,23 +95,31 @@ public class TezWork extends AbstractOpe
     }
     workGraph.put(w, new LinkedList<BaseWork>());
     invertedWorkGraph.put(w, new LinkedList<BaseWork>());
+    broadcastEdge.put(w, new LinkedList<BaseWork>());
     roots.add(w);
     leaves.add(w);
   }
 
   /**
-   * connect adds an edge between a and b. Both nodes have 
+   * connect adds an edge between a and b. Both nodes have
    * to be added prior to calling connect.
    */
-  public void connect(BaseWork a, BaseWork b) {
+  public void connect(BaseWork a, BaseWork b, EdgeType edgeType) {
     workGraph.get(a).add(b);
     invertedWorkGraph.get(b).add(a);
     roots.remove(b);
     leaves.remove(a);
+    switch (edgeType) {
+    case BROADCAST_EDGE:
+      broadcastEdge.get(a).add(b);
+      break;
+    default:
+      break;
+    }
   }
 
   /**
-   * disconnect removes an edge between a and b. Both a and 
+   * disconnect removes an edge between a and b. Both a and
    * b have to be in the graph. If there is no matching edge
    * no change happens.
    */
@@ -138,7 +152,7 @@ public class TezWork extends AbstractOpe
    * getParents returns all the nodes with edges leading into work
    */
   public List<BaseWork> getParents(BaseWork work) {
-    assert invertedWorkGraph.containsKey(work) 
+    assert invertedWorkGraph.containsKey(work)
       && invertedWorkGraph.get(work) != null;
     return new LinkedList<BaseWork>(invertedWorkGraph.get(work));
   }
@@ -147,7 +161,7 @@ public class TezWork extends AbstractOpe
    * getChildren returns all the nodes with edges leading out of work
    */
   public List<BaseWork> getChildren(BaseWork work) {
-    assert workGraph.containsKey(work) 
+    assert workGraph.containsKey(work)
       && workGraph.get(work) != null;
     return new LinkedList<BaseWork>(workGraph.get(work));
   }
@@ -162,7 +176,7 @@ public class TezWork extends AbstractOpe
     if (!workGraph.containsKey(work)) {
       return;
     }
-    
+
     List<BaseWork> children = getChildren(work);
     List<BaseWork> parents = getParents(work);
 
@@ -186,4 +200,12 @@ public class TezWork extends AbstractOpe
     workGraph.remove(work);
     invertedWorkGraph.remove(work);
   }
+
+  // checks if a and b need a broadcast edge between them
+  public boolean isBroadCastEdge(BaseWork a, BaseWork b) {
+    if ((broadcastEdge.get(a).contains(b)) || 
(broadcastEdge.get(b).contains(a))) {
+      return true;
+    }
+    return false;
+  }
 }

Modified: 
hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWork.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWork.java?rev=1527689&r1=1527688&r2=1527689&view=diff
==============================================================================
--- 
hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWork.java 
(original)
+++ 
hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWork.java 
Mon Sep 30 18:03:01 2013
@@ -17,11 +17,14 @@
  */
 package org.apache.hadoop.hive.ql.plan;
 
+import java.util.LinkedList;
+import java.util.List;
+
 import junit.framework.Assert;
+
+import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType;
 import org.junit.Before;
 import org.junit.Test;
-import java.util.List;
-import java.util.LinkedList;
 
 public class TestTezWork {
 
@@ -58,9 +61,9 @@ public class TestTezWork {
   public void testConnect() throws Exception {
     BaseWork parent = nodes.get(0);
     BaseWork child = nodes.get(1);
-    
-    work.connect(parent, child);
-    
+
+    work.connect(parent, child, EdgeType.SIMPLE_EDGE);
+
     Assert.assertEquals(work.getParents(child).size(), 1);
     Assert.assertEquals(work.getChildren(parent).size(), 1);
     Assert.assertEquals(work.getChildren(parent).get(0), child);
@@ -76,13 +79,35 @@ public class TestTezWork {
     }
   }
 
-  @Test 
+  @Test
+  public void testBroadcastConnect() throws Exception {
+    BaseWork parent = nodes.get(0);
+    BaseWork child = nodes.get(1);
+
+    work.connect(parent, child, EdgeType.BROADCAST_EDGE);
+
+    Assert.assertEquals(work.getParents(child).size(), 1);
+    Assert.assertEquals(work.getChildren(parent).size(), 1);
+    Assert.assertEquals(work.getChildren(parent).get(0), child);
+    Assert.assertEquals(work.getParents(child).get(0), parent);
+    Assert.assertTrue(work.getRoots().contains(parent) && 
!work.getRoots().contains(child));
+    Assert.assertTrue(!work.getLeaves().contains(parent) && 
work.getLeaves().contains(child));
+    for (BaseWork w: nodes) {
+      if (w == parent || w == child) {
+        continue;
+      }
+      Assert.assertEquals(work.getParents(w).size(), 0);
+      Assert.assertEquals(work.getChildren(w).size(), 0);
+    }
+  }
+
+  @Test
   public void testDisconnect() throws Exception {
     BaseWork parent = nodes.get(0);
     BaseWork children[] = {nodes.get(1), nodes.get(2)};
-    
-    work.connect(parent, children[0]);
-    work.connect(parent, children[1]);
+
+    work.connect(parent, children[0], EdgeType.SIMPLE_EDGE);
+    work.connect(parent, children[1], EdgeType.SIMPLE_EDGE);
 
     work.disconnect(parent, children[0]);
 
@@ -94,14 +119,14 @@ public class TestTezWork {
                       && work.getLeaves().contains(children[1]));
   }
 
-  @Test 
+  @Test
   public void testRemove() throws Exception {
     BaseWork parent = nodes.get(0);
     BaseWork children[] = {nodes.get(1), nodes.get(2)};
-    
-    work.connect(parent, children[0]);
-    work.connect(parent, children[1]);
-    
+
+    work.connect(parent, children[0], EdgeType.SIMPLE_EDGE);
+    work.connect(parent, children[1], EdgeType.SIMPLE_EDGE);
+
     work.remove(parent);
 
     Assert.assertEquals(work.getParents(children[0]).size(), 0);
@@ -114,7 +139,7 @@ public class TestTezWork {
   @Test
   public void testGetAllWork() throws Exception {
     for (int i = 4; i > 0; --i) {
-      work.connect(nodes.get(i), nodes.get(i-1));
+      work.connect(nodes.get(i), nodes.get(i-1), EdgeType.SIMPLE_EDGE);
     }
 
     List<BaseWork> sorted = work.getAllWork();


Reply via email to