Author: gunther
Date: Sun Dec 15 23:07:32 2013
New Revision: 1551076

URL: http://svn.apache.org/r1551076
Log:
HIVE-5065: Create proper (i.e.: non .q file based) junit tests for DagUtils and 
TezTask (Gunther Hagleitner)

Added:
    hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/
    
hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
    
hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/parse/TestGenTezWork.java
Modified:
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
    
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
    
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
    
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
    
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
    
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java?rev=1551076&r1=1551075&r2=1551076&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java 
(original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java Sun 
Dec 15 23:07:32 2013
@@ -513,10 +513,16 @@ public abstract class Task<T extends Ser
   Throwable getException() {
     return exception;
   }
+
   void setException(Throwable ex) {
     exception = ex;
   }
 
+  public void setConsole(LogHelper console) {
+    this.console = console;
+  }
+
+  @Override
   public String toString() {
     return getId() + ":" + getType();
   }

Modified: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1551076&r1=1551075&r2=1551076&view=diff
==============================================================================
--- 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java 
(original)
+++ 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java 
Sun Dec 15 23:07:32 2013
@@ -97,12 +97,13 @@ import org.apache.tez.runtime.library.ou
 public class DagUtils {
 
   private static final String TEZ_DIR = "_tez_scratch_dir";
+  private static DagUtils instance;
 
   /*
    * Creates the configuration object necessary to run a specific vertex from
    * map work. This includes input formats, input processor, etc.
    */
-  private static JobConf initializeVertexConf(JobConf baseConf, MapWork 
mapWork) {
+  private JobConf initializeVertexConf(JobConf baseConf, MapWork mapWork) {
     JobConf conf = new JobConf(baseConf);
 
     if (mapWork.getNumMapTasks() != null) {
@@ -157,7 +158,7 @@ public class DagUtils {
    * @param w The second vertex (sink)
    * @return
    */
-  public static Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex 
w,
+  public Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w,
       EdgeType edgeType)
       throws IOException {
 
@@ -199,7 +200,7 @@ public class DagUtils {
   /*
    * Helper function to create Vertex from MapWork.
    */
-  private static Vertex createVertex(JobConf conf, MapWork mapWork,
+  private Vertex createVertex(JobConf conf, MapWork mapWork,
       LocalResource appJarLr, List<LocalResource> additionalLr, FileSystem fs,
       Path mrScratchDir, Context ctx) throws Exception {
 
@@ -295,7 +296,7 @@ public class DagUtils {
   /*
    * Helper function to create JobConf for specific ReduceWork.
    */
-  private static JobConf initializeVertexConf(JobConf baseConf, ReduceWork 
reduceWork) {
+  private JobConf initializeVertexConf(JobConf baseConf, ReduceWork 
reduceWork) {
     JobConf conf = new JobConf(baseConf);
 
     conf.set("mapred.reducer.class", ExecReducer.class.getName());
@@ -311,7 +312,7 @@ public class DagUtils {
   /*
    * Helper function to create Vertex for given ReduceWork.
    */
-  private static Vertex createVertex(JobConf conf, ReduceWork reduceWork,
+  private Vertex createVertex(JobConf conf, ReduceWork reduceWork,
       LocalResource appJarLr, List<LocalResource> additionalLr, FileSystem fs,
       Path mrScratchDir, Context ctx) throws Exception {
 
@@ -351,7 +352,7 @@ public class DagUtils {
   /*
    * Helper method to create a yarn local resource.
    */
-  private static LocalResource createLocalResource(FileSystem remoteFs, Path 
file,
+  private LocalResource createLocalResource(FileSystem remoteFs, Path file,
       LocalResourceType type, LocalResourceVisibility visibility) {
 
     FileStatus fstat = null;
@@ -381,7 +382,7 @@ public class DagUtils {
    * @throws LoginException if we are unable to figure user information
    * @throws IOException when any dfs operation fails.
    */
-  public static Path getDefaultDestDir(Configuration conf) throws 
LoginException, IOException {
+  public Path getDefaultDestDir(Configuration conf) throws LoginException, 
IOException {
     UserGroupInformation ugi = ShimLoader.getHadoopShims().getUGIForConf(conf);
     String userName = ShimLoader.getHadoopShims().getShortUserName(ugi);
     String userPathStr = HiveConf.getVar(conf, 
HiveConf.ConfVars.HIVE_USER_INSTALL_DIR);
@@ -415,7 +416,7 @@ public class DagUtils {
    * @throws IOException when hdfs operation fails
    * @throws LoginException when getDefaultDestDir fails with the same 
exception
    */
-  public static List<LocalResource> localizeTempFiles(Configuration conf) 
throws IOException, LoginException {
+  public List<LocalResource> localizeTempFiles(Configuration conf) throws 
IOException, LoginException {
     List<LocalResource> tmpResources = new ArrayList<LocalResource>();
 
     String addedFiles = Utilities.getResourceFiles(conf, 
SessionState.ResourceType.FILE);
@@ -471,7 +472,7 @@ public class DagUtils {
   }
 
   // the api that finds the jar being used by this class on disk
-  public static String getExecJarPathLocal () throws URISyntaxException {
+  public String getExecJarPathLocal () throws URISyntaxException {
       // returns the location on disc of the jar of this class.
     return 
DagUtils.class.getProtectionDomain().getCodeSource().getLocation().toURI().toString();
   }
@@ -479,7 +480,7 @@ public class DagUtils {
   /*
    * Helper function to retrieve the basename of a local resource
    */
-  public static String getBaseName(LocalResource lr) {
+  public String getBaseName(LocalResource lr) {
     return FilenameUtils.getName(lr.getResource().getFile());
   }
 
@@ -487,7 +488,7 @@ public class DagUtils {
    * @param pathStr - the string from which we try to determine the resource 
base name
    * @return the name of the resource from a given path string.
    */
-  public static String getResourceBaseName(String pathStr) {
+  public String getResourceBaseName(String pathStr) {
     String[] splits = pathStr.split("/");
     return splits[splits.length - 1];
   }
@@ -499,7 +500,7 @@ public class DagUtils {
    * @return true if the file names match else returns false.
    * @throws IOException when any file system related call fails
    */
-  private static boolean checkPreExisting(Path src, Path dest, Configuration 
conf)
+  private boolean checkPreExisting(Path src, Path dest, Configuration conf)
       throws IOException {
     FileSystem destFS = dest.getFileSystem(conf);
 
@@ -528,7 +529,7 @@ public class DagUtils {
    * @return localresource from tez localization.
    * @throws IOException when any file system related calls fails.
    */
-  public static LocalResource localizeResource(Path src, Path dest, 
Configuration conf)
+  public LocalResource localizeResource(Path src, Path dest, Configuration 
conf)
       throws IOException {
     FileSystem destFS = dest.getFileSystem(conf);
     if (!(destFS instanceof DistributedFileSystem)) {
@@ -557,7 +558,7 @@ public class DagUtils {
    * @return JobConf base configuration for job execution
    * @throws IOException
    */
-  public static JobConf createConfiguration(HiveConf hiveConf) throws 
IOException {
+  public JobConf createConfiguration(HiveConf hiveConf) throws IOException {
     hiveConf.setBoolean("mapred.mapper.new-api", false);
 
     JobConf conf = (JobConf) MRHelpers.getBaseMRConfiguration(hiveConf);
@@ -585,7 +586,7 @@ public class DagUtils {
    * @param work BaseWork will be used to populate the configuration object.
    * @return JobConf new configuration object
    */
-  public static JobConf initializeVertexConf(JobConf conf, BaseWork work) {
+  public JobConf initializeVertexConf(JobConf conf, BaseWork work) {
 
     // simply dispatch the call to the right method for the actual (sub-) type 
of
     // BaseWork.
@@ -612,7 +613,7 @@ public class DagUtils {
    * @param ctx This query's context
    * @return Vertex
    */
-  public static Vertex createVertex(JobConf conf, BaseWork work,
+  public Vertex createVertex(JobConf conf, BaseWork work,
       Path scratchDir, LocalResource appJarLr, List<LocalResource> 
additionalLr,
       FileSystem fileSystem, Context ctx, boolean hasChildren) throws 
Exception {
 
@@ -626,8 +627,8 @@ public class DagUtils {
       v = createVertex(conf, (ReduceWork) work, appJarLr,
           additionalLr, fileSystem, scratchDir, ctx);
     } else {
-      assert false;
-      return null;
+      // something is seriously wrong if this is happening
+      throw new HiveException(ErrorMsg.GENERIC_ERROR.getErrorCodedMsg());
     }
 
     // initialize stats publisher if necessary
@@ -660,7 +661,7 @@ public class DagUtils {
    * createTezDir creates a temporary directory in the scratchDir folder to
    * be used with Tez. Assumes scratchDir exists.
    */
-  public static Path createTezDir(Path scratchDir, Configuration conf)
+  public Path createTezDir(Path scratchDir, Configuration conf)
       throws IOException {
     Path tezDir = getTezDir(scratchDir);
     FileSystem fs = tezDir.getFileSystem(conf);
@@ -671,10 +672,21 @@ public class DagUtils {
   /**
    * Gets the tez dir that belongs to the hive scratch dir
    */
-  public static Path getTezDir(Path scratchDir) {
+  public Path getTezDir(Path scratchDir) {
     return new Path(scratchDir, TEZ_DIR);
   }
 
+  /**
+   * Singleton
+   * @return instance of this class
+   */
+  public static DagUtils getInstance() {
+    if (instance == null) {
+      instance = new DagUtils();
+    }
+    return instance;
+  }
+
   private DagUtils() {
     // don't instantiate
   }

Modified: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1551076&r1=1551075&r2=1551076&view=diff
==============================================================================
--- 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
 (original)
+++ 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
 Sun Dec 15 23:07:32 2013
@@ -58,6 +58,7 @@ public class TezSessionState {
   private LocalResource appJarLr;
   private TezSession session;
   private String sessionId;
+  private DagUtils utils;
 
   private static List<TezSessionState> openSessions
     = Collections.synchronizedList(new LinkedList<TezSessionState>());
@@ -66,7 +67,16 @@ public class TezSessionState {
    * Constructor. We do not automatically connect, because we only want to
    * load tez classes when the user has tez installed.
    */
-  public void TezSessionContext() {
+  public TezSessionState(DagUtils utils) {
+    this.utils = utils;
+  }
+
+  /**
+   * Constructor. We do not automatically connect, because we only want to
+   * load tez classes when the user has tez installed.
+   */
+  public TezSessionState() {
+    this(DagUtils.getInstance());
   }
 
   /**
@@ -112,7 +122,7 @@ public class TezSessionState {
 
     // configuration for the application master
     Map<String, LocalResource> commonLocalResources = new HashMap<String, 
LocalResource>();
-    commonLocalResources.put(DagUtils.getBaseName(appJarLr), appJarLr);
+    commonLocalResources.put(utils.getBaseName(appJarLr), appJarLr);
 
     AMConfiguration amConfig = new AMConfiguration(null, commonLocalResources,
          tezConfig, null);
@@ -211,8 +221,8 @@ public class TezSessionState {
   private LocalResource createHiveExecLocalResource()
       throws IOException, LoginException, URISyntaxException {
     String hiveJarDir = conf.getVar(HiveConf.ConfVars.HIVE_JAR_DIRECTORY);
-    String currentVersionPathStr = DagUtils.getExecJarPathLocal();
-    String currentJarName = 
DagUtils.getResourceBaseName(currentVersionPathStr);
+    String currentVersionPathStr = utils.getExecJarPathLocal();
+    String currentJarName = utils.getResourceBaseName(currentVersionPathStr);
     FileSystem fs = null;
     Path jarPath = null;
     FileStatus dirStatus = null;
@@ -234,18 +244,18 @@ public class TezSessionState {
       if ((dirStatus != null) && (dirStatus.isDir())) {
         FileStatus[] listFileStatus = fs.listStatus(hiveJarDirPath);
         for (FileStatus fstatus : listFileStatus) {
-          String jarName = 
DagUtils.getResourceBaseName(fstatus.getPath().toString());
+          String jarName = 
utils.getResourceBaseName(fstatus.getPath().toString());
           if (jarName.equals(currentJarName)) {
             // we have found the jar we need.
             jarPath = fstatus.getPath();
-            return DagUtils.localizeResource(null, jarPath, conf);
+            return utils.localizeResource(null, jarPath, conf);
           }
         }
 
         // jar wasn't in the directory, copy the one in current use
         if (jarPath == null) {
           Path dest = new Path(hiveJarDir + "/" + currentJarName);
-          return DagUtils.localizeResource(new Path(currentVersionPathStr), 
dest, conf);
+          return utils.localizeResource(new Path(currentVersionPathStr), dest, 
conf);
         }
       }
     }
@@ -258,12 +268,12 @@ public class TezSessionState {
      */
     if ((hiveJarDir == null) || (dirStatus == null) ||
         ((dirStatus != null) && (!dirStatus.isDir()))) {
-      Path dest = DagUtils.getDefaultDestDir(conf);
+      Path dest = utils.getDefaultDestDir(conf);
       String destPathStr = dest.toString();
       String jarPathStr = destPathStr + "/" + currentJarName;
       dirStatus = fs.getFileStatus(dest);
       if (dirStatus.isDir()) {
-        return DagUtils.localizeResource(new Path(currentVersionPathStr), new 
Path(jarPathStr), conf);
+        return utils.localizeResource(new Path(currentVersionPathStr), new 
Path(jarPathStr), conf);
       } else {
         throw new IOException(ErrorMsg.INVALID_DIR.format(dest.toString()));
       }

Modified: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1551076&r1=1551075&r2=1551076&view=diff
==============================================================================
--- 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java 
(original)
+++ 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java 
Sun Dec 15 23:07:32 2013
@@ -72,8 +72,15 @@ public class TezTask extends Task<TezWor
 
   private TezCounters counters;
 
+  private DagUtils utils;
+
   public TezTask() {
+    this(DagUtils.getInstance());
+  }
+
+  public TezTask(DagUtils utils) {
     super();
+    this.utils = utils;
   }
 
   public TezCounters getTezCounters() {
@@ -116,10 +123,10 @@ public class TezTask extends Task<TezWor
       Path scratchDir = new Path(ctx.getMRScratchDir());
 
       // create the tez tmp dir
-      DagUtils.createTezDir(scratchDir, conf);
+      utils.createTezDir(scratchDir, conf);
 
       // jobConf will hold all the configuration for hadoop, tez, and hive
-      JobConf jobConf = DagUtils.createConfiguration(conf);
+      JobConf jobConf = utils.createConfiguration(conf);
 
       // unless already installed on all the cluster nodes, we'll have to
       // localize hive-exec.jar as well.
@@ -129,7 +136,7 @@ public class TezTask extends Task<TezWor
       DAG dag = build(jobConf, work, scratchDir, appJarLr, ctx);
 
       // submit will send the job to the cluster and start executing
-      client = submit(jobConf, dag, scratchDir, appJarLr, 
session.getSession());
+      client = submit(jobConf, dag, scratchDir, appJarLr, session);
 
       // finally monitor will print progress until the job is done
       TezJobMonitor monitor = new TezJobMonitor();
@@ -161,7 +168,7 @@ public class TezTask extends Task<TezWor
     return rc;
   }
 
-  private DAG build(JobConf conf, TezWork work, Path scratchDir,
+  DAG build(JobConf conf, TezWork work, Path scratchDir,
       LocalResource appJarLr, Context ctx)
       throws Exception {
 
@@ -170,14 +177,14 @@ public class TezTask extends Task<TezWor
     Map<BaseWork, JobConf> workToConf = new HashMap<BaseWork, JobConf>();
 
     // we need to get the user specified local resources for this dag
-    List<LocalResource> additionalLr = DagUtils.localizeTempFiles(conf);
+    List<LocalResource> additionalLr = utils.localizeTempFiles(conf);
 
     // getAllWork returns a topologically sorted list, which we use to make
     // sure that vertices are created before they are used in edges.
     List<BaseWork> ws = work.getAllWork();
     Collections.reverse(ws);
 
-    Path tezDir = DagUtils.getTezDir(scratchDir);
+    Path tezDir = utils.getTezDir(scratchDir);
     FileSystem fs = tezDir.getFileSystem(conf);
 
     // the name of the dag is what is displayed in the AM/Job UI
@@ -191,8 +198,8 @@ public class TezTask extends Task<TezWor
 
       // translate work to vertex
       perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + 
w.getName());
-      JobConf wxConf = DagUtils.initializeVertexConf(conf, w);
-      Vertex wx = DagUtils.createVertex(wxConf, w, tezDir,
+      JobConf wxConf = utils.initializeVertexConf(conf, w);
+      Vertex wx = utils.createVertex(wxConf, w, tezDir,
          appJarLr, additionalLr, fs, ctx, !isFinal);
       dag.addVertex(wx);
       perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + 
w.getName());
@@ -206,7 +213,7 @@ public class TezTask extends Task<TezWor
 
         EdgeType edgeType = work.getEdgeProperty(w, v);
 
-        e = DagUtils.createEdge(wxConf, wx, workToConf.get(v), 
workToVertex.get(v), edgeType);
+        e = utils.createEdge(wxConf, wx, workToConf.get(v), 
workToVertex.get(v), edgeType);
         dag.addEdge(e);
       }
     }
@@ -214,8 +221,8 @@ public class TezTask extends Task<TezWor
     return dag;
   }
 
-  private DAGClient submit(JobConf conf, DAG dag, Path scratchDir,
-      LocalResource appJarLr, TezSession session)
+  DAGClient submit(JobConf conf, DAG dag, Path scratchDir,
+      LocalResource appJarLr, TezSessionState sessionState)
       throws IOException, TezException, InterruptedException,
       LoginException, URISyntaxException, HiveException {
 
@@ -224,25 +231,19 @@ public class TezTask extends Task<TezWor
 
     try {
       // ready to start execution on the cluster
-      dagClient = session.submitDAG(dag);
+      dagClient = sessionState.getSession().submitDAG(dag);
     } catch (SessionNotRunning nr) {
       console.printInfo("Tez session was closed. Reopening...");
 
-      // Need to remove this static hack. But this is the way currently to
-      // get a session.
-      SessionState ss = SessionState.get();
-      TezSessionState tezSession = ss.getTezSession();
-
       // close the old one, but keep the tmp files around
-      tezSession.close(true);
+      sessionState.close(true);
 
       // (re)open the session
-      tezSession.open(ss.getSessionId(), this.conf);
-      session = tezSession.getSession();
+      sessionState.open(sessionState.getSessionId(), this.conf);
 
       console.printInfo("Session re-established.");
 
-      dagClient = session.submitDAG(dag);
+      dagClient = sessionState.getSession().submitDAG(dag);
     }
 
     perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG);
@@ -253,7 +254,7 @@ public class TezTask extends Task<TezWor
    * close will move the temp files into the right place for the fetch
    * task. If the job has failed it will clean up the files.
    */
-  private int close(TezWork work, int rc) {
+  int close(TezWork work, int rc) {
     try {
       List<BaseWork> ws = work.getAllWork();
       for (BaseWork w: ws) {

Modified: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java?rev=1551076&r1=1551075&r2=1551076&view=diff
==============================================================================
--- 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java 
(original)
+++ 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java 
Sun Dec 15 23:07:32 2013
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.TezWork;
 import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType;
@@ -58,6 +59,9 @@ public class GenTezWork implements NodeP
 
     GenTezProcContext context = (GenTezProcContext) procContext;
 
+    assert context != null && context.currentTask != null
+        && context.currentRootOperator != null;
+
     // Operator is a file sink or reduce sink. Something that forces
     // a new vertex.
     Operator<?> operator = (Operator<?>) nd;
@@ -183,7 +187,7 @@ public class GenTezWork implements NodeP
     return null;
   }
 
-  private ReduceWork createReduceWork(GenTezProcContext context, Operator<?> 
root,
+  protected ReduceWork createReduceWork(GenTezProcContext context, Operator<?> 
root,
       TezWork tezWork) {
     assert !root.getParentOperators().isEmpty();
     ReduceWork reduceWork = new ReduceWork("Reducer "+ (++sequenceNumber));
@@ -210,7 +214,7 @@ public class GenTezWork implements NodeP
     return reduceWork;
   }
 
-  private void setupReduceSink(GenTezProcContext context, ReduceWork 
reduceWork,
+  protected void setupReduceSink(GenTezProcContext context, ReduceWork 
reduceWork,
       ReduceSinkOperator reduceSink) {
 
     LOG.debug("Setting up reduce sink: " + reduceSink
@@ -227,7 +231,7 @@ public class GenTezWork implements NodeP
     reduceSink.getConf().setOutputName(reduceWork.getName());
   }
 
-  private MapWork createMapWork(GenTezProcContext context, Operator<?> root,
+  protected MapWork createMapWork(GenTezProcContext context, Operator<?> root,
       TezWork tezWork) throws SemanticException {
     assert root.getParentOperators().isEmpty();
     MapWork mapWork = new MapWork("Map "+ (++sequenceNumber));
@@ -237,11 +241,19 @@ public class GenTezWork implements NodeP
     assert root instanceof TableScanOperator;
     String alias = ((TableScanOperator)root).getConf().getAlias();
 
-    GenMapRedUtils.setMapWork(mapWork, context.parseContext,
-        context.inputs, null, root, alias, context.conf, false);
+    setupMapWork(mapWork, context, root, alias);
+
+    // add new item to the tez work
     tezWork.add(mapWork);
 
     return mapWork;
   }
 
+  // this method's main use is to help unit testing this class
+  protected void setupMapWork(MapWork mapWork, GenTezProcContext context,
+      Operator<? extends OperatorDesc> root, String alias) throws 
SemanticException {
+    // All the setup is done in GenMapRedUtils
+    GenMapRedUtils.setMapWork(mapWork, context.parseContext,
+        context.inputs, null, root, alias, context.conf, false);
+  }
 }

Modified: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java?rev=1551076&r1=1551075&r2=1551076&view=diff
==============================================================================
--- 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java 
(original)
+++ 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java 
Sun Dec 15 23:07:32 2013
@@ -91,6 +91,7 @@ public class TezCompiler extends TaskCom
     opRules.put(new RuleRegExp(new String("Set parallelism - ReduceSink"),
         ReduceSinkOperator.getOperatorName() + "%"),
         new SetReducerParallelism());
+
     opRules.put(new RuleRegExp(new String("Convert Join to Map-join"),
         JoinOperator.getOperatorName() + "%"), new ConvertJoinMapJoin());
 

Modified: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java?rev=1551076&r1=1551075&r2=1551076&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java 
(original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java 
Sun Dec 15 23:07:32 2013
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.plan;
 
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -90,6 +91,24 @@ public class TezWork extends AbstractOpe
   }
 
   /**
+   * add all nodes in the collection without any connections
+   */
+  public void addAll(Collection<BaseWork> c) {
+    for (BaseWork w: c) {
+      this.add(w);
+    }
+  }
+
+  /**
+   * add all nodes in the collection without any connections
+   */
+  public void addAll(BaseWork[] bws) {
+    for (BaseWork w: bws) {
+      this.add(w);
+    }
+  }
+
+  /**
    * add creates a new node in the graph without any connections
    */
   public void add(BaseWork w) {

Added: 
hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java?rev=1551076&view=auto
==============================================================================
--- 
hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
 (added)
+++ 
hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
 Sun Dec 15 23:07:32 2013
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+import javax.security.auth.login.LoginException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.client.TezSession;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.SessionNotRunning;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestTezTask {
+
+  DagUtils utils;
+  MapWork[] mws;
+  ReduceWork[] rws;
+  TezWork work;
+  TezTask task;
+  TezSession session;
+  TezSessionState sessionState;
+  JobConf conf;
+  LocalResource appLr;
+  Operator<?> op;
+
+  @SuppressWarnings("unchecked")
+  @Before
+  public void setUp() throws Exception {
+    utils = mock(DagUtils.class);
+    when(utils.getTezDir(any(Path.class))).thenReturn(new 
Path("hdfs://localhost:9000/tez/"));
+    when(utils.createVertex(any(JobConf.class), any(BaseWork.class), 
any(Path.class), any(LocalResource.class),
+        any(List.class), any(FileSystem.class), any(Context.class), 
anyBoolean())).thenAnswer(new Answer<Vertex>() {
+
+          @Override
+          public Vertex answer(InvocationOnMock invocation) throws Throwable {
+            Object[] args = invocation.getArguments();
+            return new Vertex(((BaseWork)args[1]).getName(),
+                mock(ProcessorDescriptor.class), 0, mock(Resource.class));
+          }
+        });
+
+    when(utils.createEdge(any(JobConf.class), any(Vertex.class), 
any(JobConf.class),
+        any(Vertex.class), any(EdgeType.class))).thenAnswer(new Answer<Edge>() 
{
+
+          @Override
+          public Edge answer(InvocationOnMock invocation) throws Throwable {
+            Object[] args = invocation.getArguments();
+            return new Edge((Vertex)args[1], (Vertex)args[3], 
mock(EdgeProperty.class));
+          }
+        });
+
+    work = new TezWork();
+
+    mws = new MapWork[] { new MapWork(), new MapWork()};
+    rws = new ReduceWork[] { new ReduceWork(), new ReduceWork() };
+
+    work.addAll(mws);
+    work.addAll(rws);
+
+    int i = 0;
+    for (BaseWork w: work.getAllWork()) {
+      w.setName("Work "+(++i));
+    }
+
+    op = mock(Operator.class);
+
+    LinkedHashMap<String, Operator<? extends OperatorDesc>> map
+      = new LinkedHashMap<String,Operator<? extends OperatorDesc>>();
+    map.put("foo", op);
+    mws[0].setAliasToWork(map);
+    mws[1].setAliasToWork(map);
+
+    LinkedHashMap<String, ArrayList<String>> pathMap
+      = new LinkedHashMap<String, ArrayList<String>>();
+    ArrayList<String> aliasList = new ArrayList<String>();
+    aliasList.add("foo");
+    pathMap.put("foo", aliasList);
+
+    mws[0].setPathToAliases(pathMap);
+    mws[1].setPathToAliases(pathMap);
+
+    rws[0].setReducer(op);
+    rws[1].setReducer(op);
+
+    work.connect(mws[0], rws[0], EdgeType.SIMPLE_EDGE);
+    work.connect(mws[1], rws[0], EdgeType.SIMPLE_EDGE);
+    work.connect(rws[0], rws[1], EdgeType.SIMPLE_EDGE);
+
+    task = new TezTask(utils);
+    task.setWork(work);
+    task.setConsole(mock(LogHelper.class));
+
+    conf = new JobConf();
+    appLr = mock(LocalResource.class);
+
+    session = mock(TezSession.class);
+    sessionState = mock(TezSessionState.class);
+    when(sessionState.getSession()).thenReturn(session);
+    when(session.submitDAG(any(DAG.class))).thenThrow(new 
SessionNotRunning(""))
+      .thenReturn(mock(DAGClient.class));
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    utils = null;
+    work = null;
+    task = null;
+  }
+
+  @Test
+  public void testBuildDag() throws IllegalArgumentException, IOException, 
Exception {
+    DAG dag = task.build(conf, work, new Path("hdfs:///"), appLr, new 
Context(conf));
+    for (BaseWork w: work.getAllWork()) {
+      Vertex v = dag.getVertex(w.getName());
+      assertNotNull(v);
+      List<Vertex> outs = v.getOutputVertices();
+      for (BaseWork x: work.getChildren(w)) {
+        boolean found = false;
+        for (Vertex u: outs) {
+          if (u.getVertexName().equals(x.getName())) {
+            found = true;
+            break;
+          }
+        }
+        assertTrue(found);
+      }
+    }
+  }
+
+  @Test
+  public void testEmptyWork() throws IllegalArgumentException, IOException, 
Exception {
+    DAG dag = task.build(conf, new TezWork(), new Path("hdfs:///"), appLr, new 
Context(conf));
+    assertEquals(dag.getVertices().size(), 0);
+  }
+
+  @Test
+  public void testSubmit() throws LoginException, IllegalArgumentException,
+  IOException, TezException, InterruptedException, URISyntaxException, 
HiveException {
+    DAG dag = new DAG("test");
+    task.submit(conf, dag, new Path("hdfs:///"), appLr, sessionState);
+    // validate close/reopen
+    verify(sessionState, times(1)).open(any(String.class), 
any(HiveConf.class));
+    verify(sessionState, times(1)).close(eq(true));
+    verify(session, times(2)).submitDAG(any(DAG.class));
+  }
+
+  @Test
+  public void testClose() throws HiveException {
+    task.close(work, 0);
+    verify(op, times(4)).jobClose(any(Configuration.class), eq(true));
+  }
+}

Added: 
hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/parse/TestGenTezWork.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/parse/TestGenTezWork.java?rev=1551076&view=auto
==============================================================================
--- 
hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/parse/TestGenTezWork.java
 (added)
+++ 
hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/parse/TestGenTezWork.java
 Sun Dec 15 23:07:32 2013
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.MoveWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests for GenTezWork.
+ *
+ */
+public class TestGenTezWork {
+
+  GenTezProcContext ctx;
+  GenTezWork proc;
+  ReduceSinkOperator rs;
+  FileSinkOperator fs;
+  TableScanOperator ts;
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @SuppressWarnings("unchecked")
+  @Before
+  public void setUp() throws Exception {
+    ctx = new GenTezProcContext(
+        new HiveConf(),
+        new ParseContext(),
+        (List<Task<MoveWork>>)Collections.EMPTY_LIST,
+        (List<Task<? extends Serializable>>) new ArrayList<Task<? extends 
Serializable>>(),
+        (Set<ReadEntity>)Collections.EMPTY_SET,
+        (Set<WriteEntity>)Collections.EMPTY_SET);
+
+    proc = new GenTezWork() {
+      @Override
+      protected void setupMapWork(MapWork mapWork, GenTezProcContext context,
+          Operator<? extends OperatorDesc> root, String alias) throws 
SemanticException {
+        LinkedHashMap<String, Operator<? extends OperatorDesc>> map
+          = new LinkedHashMap<String, Operator<? extends OperatorDesc>>();
+        map.put("foo", root);
+        mapWork.setAliasToWork(map);
+        return;
+      }
+    };
+
+    fs = new FileSinkOperator();
+    fs.setConf(new FileSinkDesc());
+    rs = new ReduceSinkOperator();
+    rs.setConf(new ReduceSinkDesc());
+    ts = new TableScanOperator();
+    ts.setConf(new TableScanDesc());
+    ts.getChildOperators().add(rs);
+    rs.getParentOperators().add(ts);
+    rs.getChildOperators().add(fs);
+    fs.getParentOperators().add(rs);
+    ctx.preceedingWork = null;
+    ctx.currentRootOperator = ts;
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @After
+  public void tearDown() throws Exception {
+    ctx = null;
+    proc = null;
+    ts = null;
+    rs = null;
+    fs = null;
+  }
+
+  @Test
+  public void testCreateMap() throws SemanticException {
+    proc.process(rs, null, ctx, (Object[])null);
+
+    assertNotNull(ctx.currentTask);
+    assertTrue(ctx.rootTasks.contains(ctx.currentTask));
+
+    TezWork work = ctx.currentTask.getWork();
+    assertEquals(work.getAllWork().size(),1);
+
+    BaseWork w = work.getAllWork().get(0);
+    assertTrue(w instanceof MapWork);
+
+    MapWork mw = (MapWork)w;
+
+    // need to make sure names are set for tez to connect things right
+    assertNotNull(w.getName());
+
+    // map work should start with our ts op
+    assertSame(mw.getAliasToWork().entrySet().iterator().next().getValue(),ts);
+
+    // preceeding work must be set to the newly generated map
+    assertSame(ctx.preceedingWork, mw);
+
+    // should have a new root now
+    assertSame(ctx.currentRootOperator, fs);
+  }
+
+  @Test
+  public void testCreateReduce() throws SemanticException {
+    // create map
+    proc.process(rs,  null,  ctx,  (Object[])null);
+
+    // create reduce
+    proc.process(fs, null, ctx, (Object[])null);
+
+    TezWork work = ctx.currentTask.getWork();
+    assertEquals(work.getAllWork().size(),2);
+
+    BaseWork w = work.getAllWork().get(1);
+    assertTrue(w instanceof ReduceWork);
+    assertTrue(work.getParents(w).contains(work.getAllWork().get(0)));
+
+    ReduceWork rw = (ReduceWork)w;
+
+    // need to make sure names are set for tez to connect things right
+    assertNotNull(w.getName());
+
+    // map work should start with our ts op
+    assertSame(rw.getReducer(),fs);
+
+    // should have severed the ties
+    assertEquals(fs.getParentOperators().size(),0);
+  }
+}


Reply via email to