Author: gunther
Date: Tue Feb 24 03:13:38 2015
New Revision: 1661833
URL: http://svn.apache.org/r1661833
Log:
HIVE-9782: LLAP: hook up decider + dag utils (Gunther Hagleitner)
Modified:
hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
hive/branches/llap/ql/src/test/queries/clientpositive/llapdecider.q
Modified:
hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL:
http://svn.apache.org/viewvc/hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1661833&r1=1661832&r2=1661833&view=diff
==============================================================================
---
hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
(original)
+++
hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Tue Feb 24 03:13:38 2015
@@ -2022,7 +2022,7 @@ public class HiveConf extends Configurat
"Check input size, before considering vertex (-1 disables check)"),
LLAP_AUTO_MAX_OUTPUT("hive.llap.auto.max.output.size", 1*1024*1024*1024L,
"Check output size, before considering vertex (-1 disables check)"),
- LLAP_EXECUTION_MODE("hive.llap.execution.mode", "auto",
+ LLAP_EXECUTION_MODE("hive.llap.execution.mode", "none",
new StringSet("auto", "none", "all", "map"),
"Chooses whether query fragments will run in container or in llap"),
Modified:
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL:
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1661833&r1=1661832&r2=1661833&view=diff
==============================================================================
---
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
(original)
+++
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
Tue Feb 24 03:13:38 2015
@@ -623,8 +623,13 @@ public class DagUtils {
if (mapWork instanceof MergeFileWork) {
procClassName = MergeFileTezProcessor.class.getName();
}
+
+ String serviceName = findServiceName(mapWork);
+
map = Vertex.create(mapWork.getName(),
ProcessorDescriptor.create(procClassName)
- .setUserPayload(serializedConf), numTasks, getContainerResource(conf));
+ .setUserPayload(serializedConf), numTasks, getContainerResource(conf))
+ .setConf(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, serviceName)
+ .setConf(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME,
serviceName);
map.setTaskEnvironment(getContainerEnvironment(conf, true));
map.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
@@ -664,6 +669,13 @@ public class DagUtils {
return conf;
}
+ private String findServiceName(BaseWork work) {
+ String serviceName = TezSessionState.DEFAULT_SERVICE;
+ if (work.getLlapMode()) serviceName = TezSessionState.LLAP_SERVICE;
+ if (work.getUberMode()) serviceName = TezSessionState.LOCAL_SERVICE;
+ return serviceName;
+ }
+
/*
* Helper function to create Vertex for given ReduceWork.
*/
@@ -678,12 +690,17 @@ public class DagUtils {
// create the directories FileSinkOperators need
Utilities.createTmpDirs(conf, reduceWork);
+ String serviceName = findServiceName(reduceWork);
+
// create the vertex
Vertex reducer = Vertex.create(reduceWork.getName(),
ProcessorDescriptor.create(ReduceTezProcessor.class.getName()).
setUserPayload(TezUtils.createUserPayloadFromConf(conf)),
- reduceWork.isAutoReduceParallelism() ?
reduceWork.getMaxReduceTasks() : reduceWork
- .getNumReduceTasks(), getContainerResource(conf));
+ reduceWork.isAutoReduceParallelism()?
+ reduceWork.getMaxReduceTasks():
+ reduceWork.getNumReduceTasks(), getContainerResource(conf))
+ .setConf(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, serviceName)
+ .setConf(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME,
serviceName);
reducer.setTaskEnvironment(getContainerEnvironment(conf, false));
reducer.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
Modified:
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
URL:
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1661833&r1=1661832&r2=1661833&view=diff
==============================================================================
---
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
(original)
+++
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
Tue Feb 24 03:13:38 2015
@@ -56,6 +56,8 @@ import org.apache.tez.dag.api.SessionNot
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.TezException;
/**
* Holds session state related to Tez
@@ -64,6 +66,13 @@ public class TezSessionState {
private static final Log LOG =
LogFactory.getLog(TezSessionState.class.getName());
private static final String TEZ_DIR = "_tez_session_dir";
+ public static final String LLAP_SERVICE = "LLAP";
+ public static final String DEFAULT_SERVICE =
TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT;
+ public static final String LOCAL_SERVICE =
TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT;
+ private static final String LLAP_SCHEDULER =
"org.apache.tez.dag.app.rm.DaemonTaskSchedulerService";
+ private static final String LLAP_LAUNCHER =
"org.apache.tez.dag.app.launcher.DaemonContainerLauncher";
+ private static final String LLAP_SERVICE_SCHEDULER = LLAP_SERVICE + ":" +
LLAP_SCHEDULER;
+ private static final String LLAP_SERVICE_LAUNCHER = LLAP_SERVICE + ":" +
LLAP_LAUNCHER;
private HiveConf conf;
private Path tezScratchDir;
@@ -171,8 +180,18 @@ public class TezSessionState {
// and finally we're ready to create and start the session
// generate basic tez config
TezConfiguration tezConfig = new TezConfiguration(conf);
+
+ // set up the staging directory to use
tezConfig.set(TezConfiguration.TEZ_AM_STAGING_DIR,
tezScratchDir.toUri().toString());
+ // we need plugins to handle llap and uber mode
+ tezConfig.setStrings(TezConfiguration.TEZ_AM_TASK_SCHEDULERS,
+ DEFAULT_SERVICE, LOCAL_SERVICE, LLAP_SERVICE_SCHEDULER);
+
+ tezConfig.setStrings(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS,
+ DEFAULT_SERVICE, LOCAL_SERVICE, LLAP_SERVICE_LAUNCHER);
+
+ // container prewarming. tell the am how many containers we need
if (HiveConf.getBoolVar(conf, ConfVars.HIVE_PREWARM_ENABLED)) {
int n = HiveConf.getIntVar(conf, ConfVars.HIVE_PREWARM_NUM_CONTAINERS);
n = Math.max(tezConfig.getInt(
Modified:
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
URL:
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java?rev=1661833&r1=1661832&r2=1661833&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
(original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
Tue Feb 24 03:13:38 2015
@@ -62,6 +62,7 @@ public abstract class BaseWork extends A
protected Map<String, Map<String, Integer>> allColumnVectorMaps = null;
protected boolean llapMode = false;
+ protected boolean uberMode = false;
protected boolean vectorMode = false;
public void setGatheringStats(boolean gatherStats) {
@@ -186,6 +187,14 @@ public abstract class BaseWork extends A
return vectorMode;
}
+ public void setUberMode(boolean uberMode) {
+ this.uberMode = uberMode;
+ }
+
+ public boolean getUberMode() {
+ return uberMode;
+ }
+
public void setLlapMode(boolean llapMode) {
this.llapMode = llapMode;
}
Modified: hive/branches/llap/ql/src/test/queries/clientpositive/llapdecider.q
URL:
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/queries/clientpositive/llapdecider.q?rev=1661833&r1=1661832&r2=1661833&view=diff
==============================================================================
--- hive/branches/llap/ql/src/test/queries/clientpositive/llapdecider.q
(original)
+++ hive/branches/llap/ql/src/test/queries/clientpositive/llapdecider.q Tue Feb
24 03:13:38 2015
@@ -1,4 +1,5 @@
set hive.stats.fetch.column.stats=true;
+set hive.llap.execution.mode=auto;
-- simple query with multiple reduce stages
EXPLAIN SELECT key, count(value) as cnt FROM src GROUP BY key ORDER BY cnt;
@@ -50,10 +51,10 @@ set hive.llap.execution.mode=all;
EXPLAIN SELECT * from src_orc s1 join src_orc s2 on (s1.key = s2.key) order by
s2.value;
-set hive.llap.execution.mode=auto;
-
CREATE TEMPORARY FUNCTION test_udf_get_java_string AS
'org.apache.hadoop.hive.ql.udf.generic.GenericUDFTestGetJavaString';
+set hive.llap.execution.mode=auto;
+
EXPLAIN SELECT sum(cast(key as int) + 1) from src_orc where cast(key as int) >
1;
EXPLAIN SELECT sum(cast(test_udf_get_java_string(cast(key as string)) as int)
+ 1) from src_orc where cast(key as int) > 1;
EXPLAIN SELECT sum(cast(key as int) + 1) from src_orc where
cast(test_udf_get_java_string(cast(key as string)) as int) > 1;