Author: gunther
Date: Tue Feb 24 03:13:38 2015
New Revision: 1661833

URL: http://svn.apache.org/r1661833
Log:
HIVE-9782: LLAP: hook up decider + dag utils (Gunther Hagleitner)

Modified:
    hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
    
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
    hive/branches/llap/ql/src/test/queries/clientpositive/llapdecider.q

Modified: 
hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1661833&r1=1661832&r2=1661833&view=diff
==============================================================================
--- 
hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
(original)
+++ 
hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
Tue Feb 24 03:13:38 2015
@@ -2022,7 +2022,7 @@ public class HiveConf extends Configurat
        "Check input size, before considering vertex (-1 disables check)"),
     LLAP_AUTO_MAX_OUTPUT("hive.llap.auto.max.output.size", 1*1024*1024*1024L,
        "Check output size, before considering vertex (-1 disables check)"),
-    LLAP_EXECUTION_MODE("hive.llap.execution.mode", "auto",
+    LLAP_EXECUTION_MODE("hive.llap.execution.mode", "none",
        new StringSet("auto", "none", "all", "map"),
         "Chooses whether query fragments will run in container or in llap"),
 

Modified: 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1661833&r1=1661832&r2=1661833&view=diff
==============================================================================
--- 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java 
(original)
+++ 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java 
Tue Feb 24 03:13:38 2015
@@ -623,8 +623,13 @@ public class DagUtils {
     if (mapWork instanceof MergeFileWork) {
       procClassName = MergeFileTezProcessor.class.getName();
     }
+
+    String serviceName = findServiceName(mapWork);
+
     map = Vertex.create(mapWork.getName(), 
ProcessorDescriptor.create(procClassName)
-        .setUserPayload(serializedConf), numTasks, getContainerResource(conf));
+        .setUserPayload(serializedConf), numTasks, getContainerResource(conf))
+      .setConf(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, serviceName)
+      .setConf(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, 
serviceName);
 
     map.setTaskEnvironment(getContainerEnvironment(conf, true));
     map.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
@@ -664,6 +669,13 @@ public class DagUtils {
     return conf;
   }
 
+  private String findServiceName(BaseWork work) {
+    String serviceName = TezSessionState.DEFAULT_SERVICE;
+    if (work.getLlapMode()) serviceName = TezSessionState.LLAP_SERVICE;
+    if (work.getUberMode()) serviceName = TezSessionState.LOCAL_SERVICE;
+    return serviceName;
+  }
+
   /*
    * Helper function to create Vertex for given ReduceWork.
    */
@@ -678,12 +690,17 @@ public class DagUtils {
     // create the directories FileSinkOperators need
     Utilities.createTmpDirs(conf, reduceWork);
 
+    String serviceName = findServiceName(reduceWork);
+
     // create the vertex
     Vertex reducer = Vertex.create(reduceWork.getName(),
         ProcessorDescriptor.create(ReduceTezProcessor.class.getName()).
         setUserPayload(TezUtils.createUserPayloadFromConf(conf)),
-            reduceWork.isAutoReduceParallelism() ? 
reduceWork.getMaxReduceTasks() : reduceWork
-                .getNumReduceTasks(), getContainerResource(conf));
+            reduceWork.isAutoReduceParallelism()?
+             reduceWork.getMaxReduceTasks():
+             reduceWork.getNumReduceTasks(), getContainerResource(conf))
+      .setConf(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, serviceName)
+      .setConf(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, 
serviceName);
 
     reducer.setTaskEnvironment(getContainerEnvironment(conf, false));
     reducer.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));

Modified: 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1661833&r1=1661832&r2=1661833&view=diff
==============================================================================
--- 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
 (original)
+++ 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
 Tue Feb 24 03:13:38 2015
@@ -56,6 +56,8 @@ import org.apache.tez.dag.api.SessionNot
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.TezException;
 
 /**
  * Holds session state related to Tez
@@ -64,6 +66,13 @@ public class TezSessionState {
 
   private static final Log LOG = 
LogFactory.getLog(TezSessionState.class.getName());
   private static final String TEZ_DIR = "_tez_session_dir";
+  public static final String LLAP_SERVICE = "LLAP";
+  public static final String DEFAULT_SERVICE = 
TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT;
+  public static final String LOCAL_SERVICE = 
TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT;
+  private static final String LLAP_SCHEDULER = 
"org.apache.tez.dag.app.rm.DaemonTaskSchedulerService";
+  private static final String LLAP_LAUNCHER = 
"org.apache.tez.dag.app.launcher.DaemonContainerLauncher";
+  private static final String LLAP_SERVICE_SCHEDULER = LLAP_SERVICE + ":" + 
LLAP_SCHEDULER;
+  private static final String LLAP_SERVICE_LAUNCHER = LLAP_SERVICE + ":" + 
LLAP_LAUNCHER;
 
   private HiveConf conf;
   private Path tezScratchDir;
@@ -171,8 +180,18 @@ public class TezSessionState {
     // and finally we're ready to create and start the session
     // generate basic tez config
     TezConfiguration tezConfig = new TezConfiguration(conf);
+
+    // set up the staging directory to use
     tezConfig.set(TezConfiguration.TEZ_AM_STAGING_DIR, 
tezScratchDir.toUri().toString());
 
+    // we need plugins to handle llap and uber mode
+    tezConfig.setStrings(TezConfiguration.TEZ_AM_TASK_SCHEDULERS,
+        DEFAULT_SERVICE, LOCAL_SERVICE, LLAP_SERVICE_SCHEDULER);
+
+    tezConfig.setStrings(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS,
+       DEFAULT_SERVICE, LOCAL_SERVICE, LLAP_SERVICE_LAUNCHER);
+
+    // container prewarming. tell the am how many containers we need
     if (HiveConf.getBoolVar(conf, ConfVars.HIVE_PREWARM_ENABLED)) {
       int n = HiveConf.getIntVar(conf, ConfVars.HIVE_PREWARM_NUM_CONTAINERS);
       n = Math.max(tezConfig.getInt(

Modified: 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java?rev=1661833&r1=1661832&r2=1661833&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java 
(original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java 
Tue Feb 24 03:13:38 2015
@@ -62,6 +62,7 @@ public abstract class BaseWork extends A
   protected Map<String, Map<String, Integer>> allColumnVectorMaps = null;
 
   protected boolean llapMode = false;
+  protected boolean uberMode = false;
   protected boolean vectorMode = false;
 
   public void setGatheringStats(boolean gatherStats) {
@@ -186,6 +187,14 @@ public abstract class BaseWork extends A
     return vectorMode;
   }
 
+  public void setUberMode(boolean uberMode) {
+    this.uberMode = uberMode;
+  }
+
+  public boolean getUberMode() {
+    return uberMode;
+  }
+
   public void setLlapMode(boolean llapMode) {
     this.llapMode = llapMode;
   }

Modified: hive/branches/llap/ql/src/test/queries/clientpositive/llapdecider.q
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/queries/clientpositive/llapdecider.q?rev=1661833&r1=1661832&r2=1661833&view=diff
==============================================================================
--- hive/branches/llap/ql/src/test/queries/clientpositive/llapdecider.q 
(original)
+++ hive/branches/llap/ql/src/test/queries/clientpositive/llapdecider.q Tue Feb 
24 03:13:38 2015
@@ -1,4 +1,5 @@
 set hive.stats.fetch.column.stats=true;
+set hive.llap.execution.mode=auto;
 
 -- simple query with multiple reduce stages
 EXPLAIN SELECT key, count(value) as cnt FROM src GROUP BY key ORDER BY cnt;
@@ -50,10 +51,10 @@ set hive.llap.execution.mode=all;
 
 EXPLAIN SELECT * from src_orc s1 join src_orc s2 on (s1.key = s2.key) order by 
s2.value;
 
-set hive.llap.execution.mode=auto;
-
 CREATE TEMPORARY FUNCTION test_udf_get_java_string AS 
'org.apache.hadoop.hive.ql.udf.generic.GenericUDFTestGetJavaString';
 
+set hive.llap.execution.mode=auto;
+
 EXPLAIN SELECT sum(cast(key as int) + 1) from src_orc where cast(key as int) > 
1;
 EXPLAIN SELECT sum(cast(test_udf_get_java_string(cast(key as string)) as int) 
+ 1) from src_orc where cast(key as int) > 1;
 EXPLAIN SELECT sum(cast(key as int) + 1) from src_orc where 
cast(test_udf_get_java_string(cast(key as string)) as int) > 1;


Reply via email to