Author: gunther
Date: Tue Feb 25 06:43:15 2014
New Revision: 1571588

URL: http://svn.apache.org/r1571588
Log:
HIVE-6498: Add config vars to be able to set different defaults for tez and 
mapred (Gunther Hagleitner)

Modified:
    hive/branches/tez/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/branches/tez/conf/hive-default.xml.template
    hive/branches/tez/data/conf/tez/hive-site.xml
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
    
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
    
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java
    
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
    
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
    
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java

Modified: 
hive/branches/tez/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1571588&r1=1571587&r2=1571588&view=diff
==============================================================================
--- hive/branches/tez/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
(original)
+++ hive/branches/tez/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
Tue Feb 25 06:43:15 2014
@@ -499,6 +499,7 @@ public class HiveConf extends Configurat
 
     HIVEMERGEMAPFILES("hive.merge.mapfiles", true),
     HIVEMERGEMAPREDFILES("hive.merge.mapredfiles", false),
+    HIVEMERGETEZFILES("hive.merge.tezfiles", false),
     HIVEMERGEMAPFILESSIZE("hive.merge.size.per.task", (long) (256 * 1000 * 
1000)),
     HIVEMERGEMAPFILESAVGSIZE("hive.merge.smallfiles.avgsize", (long) (16 * 
1000 * 1000)),
     HIVEMERGERCFILEBLOCKLEVEL("hive.merge.rcfile.block.level", true),
@@ -560,6 +561,10 @@ public class HiveConf extends Configurat
     HIVEDEBUGLOCALTASK("hive.debug.localtask",false),
 
     HIVEINPUTFORMAT("hive.input.format", 
"org.apache.hadoop.hive.ql.io.CombineHiveInputFormat"),
+    HIVETEZINPUTFORMAT("hive.tez.input.format", 
"org.apache.hadoop.hive.ql.io.HiveInputFormat"),
+
+    HIVETEZCONTAINERSIZE("hive.tez.container.size", -1),
+    HIVETEZJAVAOPTS("hive.tez.java.opts", null),
 
     HIVEENFORCEBUCKETING("hive.enforce.bucketing", false),
     HIVEENFORCESORTING("hive.enforce.sorting", false),

Modified: hive/branches/tez/conf/hive-default.xml.template
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/conf/hive-default.xml.template?rev=1571588&r1=1571587&r2=1571588&view=diff
==============================================================================
--- hive/branches/tez/conf/hive-default.xml.template (original)
+++ hive/branches/tez/conf/hive-default.xml.template Tue Feb 25 06:43:15 2014
@@ -794,6 +794,12 @@
 </property>
 
 <property>
+  <name>hive.merge.tezfiles</name>
+  <value>false</value>
+  <description>Merge small files at the end of a Tez DAG</description>
+</property>
+
+<property>
   <name>hive.heartbeat.interval</name>
   <value>1000</value>
   <description>Send a heartbeat after this interval - used by mapjoin and 
filter operators</description>
@@ -960,6 +966,12 @@
 </property>
 
 <property>
+  <name>hive.tez.input.format</name>
+  <value>org.apache.hadoop.hive.ql.io.HiveInputFormat</value>
+  <description>The default input format for tez. Tez groups splits in the 
AM.</description>
+</property>
+
+<property>
   <name>hive.udtf.auto.progress</name>
   <value>false</value>
   <description>Whether Hive should automatically send progress information to 
TaskTracker when using UDTF's to prevent the task getting killed because of 
inactivity.  Users should be cautious because this may prevent TaskTracker from 
killing tasks with infinite loops.  </description>
@@ -2360,4 +2372,16 @@
   </description>
 </property>
 
+<property>
+  <name>hive.tez.container.size</name>
+  <value>-1</value>
+  <description>By default tez will spawn containers of the size of a mapper. 
This can be used to overwrite.</description>
+</property>
+
+<property>
+  <name>hive.tez.java.opts</name>
+  <value></value>
+  <description>By default tez will use the java opts from map tasks. This can 
be used to overwrite.</description>
+</property>
+
 </configuration>

Modified: hive/branches/tez/data/conf/tez/hive-site.xml
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/data/conf/tez/hive-site.xml?rev=1571588&r1=1571587&r2=1571588&view=diff
==============================================================================
Binary files - no diff available.

Modified: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1571588&r1=1571587&r2=1571588&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java 
(original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java 
Tue Feb 25 06:43:15 2014
@@ -3144,8 +3144,10 @@ public final class Utilities {
    * Set hive input format, and input format file if necessary.
    */
   public static void setInputAttributes(Configuration conf, MapWork mWork) {
+    HiveConf.ConfVars var = HiveConf.getVar(conf, 
HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez") ?
+      HiveConf.ConfVars.HIVETEZINPUTFORMAT : HiveConf.ConfVars.HIVEINPUTFORMAT;
     if (mWork.getInputformat() != null) {
-      HiveConf.setVar(conf, HiveConf.ConfVars.HIVEINPUTFORMAT, 
mWork.getInputformat());
+      HiveConf.setVar(conf, var, mWork.getInputformat());
     }
     if (mWork.getIndexIntermediateFile() != null) {
       conf.set("hive.index.compact.file", mWork.getIndexIntermediateFile());

Modified: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1571588&r1=1571587&r2=1571588&view=diff
==============================================================================
--- 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java 
(original)
+++ 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java 
Tue Feb 25 06:43:15 2014
@@ -74,6 +74,7 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
@@ -178,7 +179,7 @@ public class DagUtils {
 
     Utilities.setInputAttributes(conf, mapWork);
 
-    String inpFormat = HiveConf.getVar(conf, 
HiveConf.ConfVars.HIVEINPUTFORMAT);
+    String inpFormat = HiveConf.getVar(conf, 
HiveConf.ConfVars.HIVETEZINPUTFORMAT);
     if ((inpFormat == null) || (!StringUtils.isNotBlank(inpFormat))) {
       inpFormat = ShimLoader.getHadoopShims().getInputFormatClassName();
     }
@@ -294,6 +295,35 @@ public class DagUtils {
   }
 
   /*
+   * Helper to determine the size of the container requested
+   * from yarn. Falls back to Map-reduce's map size if tez
+   * container size isn't set.
+   */
+  private Resource getContainerResource(Configuration conf) {
+    Resource containerResource;
+    int memory = HiveConf.getIntVar(conf, 
HiveConf.ConfVars.HIVETEZCONTAINERSIZE) > 0 ?
+      HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) :
+      conf.getInt(MRJobConfig.MAP_MEMORY_MB, 
MRJobConfig.DEFAULT_MAP_MEMORY_MB);
+    int cpus = conf.getInt(MRJobConfig.MAP_CPU_VCORES,
+                           MRJobConfig.DEFAULT_MAP_CPU_VCORES);
+    return Resource.newInstance(memory, cpus);
+  }
+
+  /*
+   * Helper to determine what java options to use for the containers
+   * Falls back to Map-reduces map java opts if no tez specific options
+   * are set
+   */
+  private String getContainerJavaOpts(Configuration conf) {
+    String javaOpts = HiveConf.getVar(conf, HiveConf.ConfVars.HIVETEZJAVAOPTS);
+    if (javaOpts != null && !javaOpts.isEmpty()) {
+      return javaOpts;
+    }
+    return MRHelpers.getMapJavaOpts(conf);
+  }
+
+
+  /*
    * Helper function to create Vertex from MapWork.
    */
   private Vertex createVertex(JobConf conf, MapWork mapWork,
@@ -344,12 +374,11 @@ public class DagUtils {
     byte[] serializedConf = MRHelpers.createUserPayloadFromConf(conf);
     map = new Vertex(mapWork.getName(),
         new ProcessorDescriptor(MapTezProcessor.class.getName()).
-             setUserPayload(serializedConf), numTasks,
-        MRHelpers.getMapResource(conf));
+             setUserPayload(serializedConf), numTasks, 
getContainerResource(conf));
     Map<String, String> environment = new HashMap<String, String>();
     MRHelpers.updateEnvironmentForMRTasks(conf, environment, true);
     map.setTaskEnvironment(environment);
-    map.setJavaOpts(MRHelpers.getMapJavaOpts(conf));
+    map.setJavaOpts(getContainerJavaOpts(conf));
 
     assert mapWork.getAliasToWork().keySet().size() == 1;
 
@@ -419,14 +448,14 @@ public class DagUtils {
     Vertex reducer = new Vertex(reduceWork.getName(),
         new ProcessorDescriptor(ReduceTezProcessor.class.getName()).
              setUserPayload(MRHelpers.createUserPayloadFromConf(conf)),
-        reduceWork.getNumReduceTasks(), MRHelpers.getReduceResource(conf));
+        reduceWork.getNumReduceTasks(), getContainerResource(conf));
 
     Map<String, String> environment = new HashMap<String, String>();
 
     MRHelpers.updateEnvironmentForMRTasks(conf, environment, false);
     reducer.setTaskEnvironment(environment);
 
-    reducer.setJavaOpts(MRHelpers.getReduceJavaOpts(conf));
+    reducer.setJavaOpts(getContainerJavaOpts(conf));
 
     Map<String, LocalResource> localResources = new HashMap<String, 
LocalResource>();
     localResources.put(getBaseName(appJarLr), appJarLr);
@@ -479,7 +508,7 @@ public class DagUtils {
     ProcessorDescriptor prewarmProcDescriptor = new 
ProcessorDescriptor(HivePreWarmProcessor.class.getName());
     
prewarmProcDescriptor.setUserPayload(MRHelpers.createUserPayloadFromConf(conf));
 
-    PreWarmContext context = new PreWarmContext(prewarmProcDescriptor, 
MRHelpers.getMapResource(conf),
+    PreWarmContext context = new PreWarmContext(prewarmProcDescriptor, 
getContainerResource(conf),
         numContainers, new VertexLocationHint(null));
 
     Map<String, LocalResource> combinedResources = new HashMap<String, 
LocalResource>();
@@ -504,7 +533,7 @@ public class DagUtils {
     Map<String, String> environment = new HashMap<String, String>();
     MRHelpers.updateEnvironmentForMRTasks(conf, environment, true);
     context.setEnvironment(environment);
-    context.setJavaOpts(MRHelpers.getMapJavaOpts(conf));
+    context.setJavaOpts(getContainerJavaOpts(conf));
     return context;
   }
 

Modified: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java?rev=1571588&r1=1571587&r2=1571588&view=diff
==============================================================================
--- 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java
 (original)
+++ 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java
 Tue Feb 25 06:43:15 2014
@@ -152,6 +152,7 @@ public class AggregateIndexHandler exten
       HiveConf builderConf = new HiveConf(getConf(), 
AggregateIndexHandler.class);
       builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGEMAPFILES, false);
       builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGEMAPREDFILES, false);
+      builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGETEZFILES, false);
       Task<?> rootTask = IndexUtils.createRootTask(builderConf, inputs, 
outputs,
           command, (LinkedHashMap<String, String>) partSpec, indexTableName, 
dbName);
 

Modified: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java?rev=1571588&r1=1571587&r2=1571588&view=diff
==============================================================================
--- 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
 (original)
+++ 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
 Tue Feb 25 06:43:15 2014
@@ -144,6 +144,7 @@ public class CompactIndexHandler extends
     HiveConf builderConf = new HiveConf(getConf(), CompactIndexHandler.class);
     builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGEMAPFILES, false);
     builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGEMAPREDFILES, false);
+    builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGETEZFILES, false);
     Task<?> rootTask = IndexUtils.createRootTask(builderConf, inputs, outputs,
         command, partSpec, indexTableName, dbName);
     return rootTask;

Modified: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1571588&r1=1571587&r2=1571588&view=diff
==============================================================================
--- 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
 (original)
+++ 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
 Tue Feb 25 06:43:15 2014
@@ -1619,6 +1619,13 @@ public final class GenMapRedUtils {
       }
 
       if ((mvTask != null) && !mvTask.isLocal() && 
fsOp.getConf().canBeMerged()) {
+
+        if (currTask.getWork() instanceof TezWork) {
+          // tez blurs the boundary between map and reduce, thus it has it's 
own
+          // config
+          return hconf.getBoolVar(ConfVars.HIVEMERGETEZFILES);
+        }
+
         if (fsOp.getConf().isLinkedFileSink()) {
           // If the user has HIVEMERGEMAPREDFILES set to false, the idea was 
the
           // number of reducers are few, so the number of files anyway are 
small.
@@ -1632,16 +1639,13 @@ public final class GenMapRedUtils {
           // There are separate configuration parameters to control whether to
           // merge for a map-only job
           // or for a map-reduce job
-          if (currTask.getWork() instanceof TezWork) {
-            return hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) || 
-                hconf.getBoolVar(ConfVars.HIVEMERGEMAPREDFILES);
-          } else if (currTask.getWork() instanceof MapredWork) {
+          if (currTask.getWork() instanceof MapredWork) {  
             ReduceWork reduceWork = ((MapredWork) 
currTask.getWork()).getReduceWork();
             boolean mergeMapOnly =
-                hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) && reduceWork == 
null;
+              hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) && reduceWork == 
null;
             boolean mergeMapRed =
-                hconf.getBoolVar(ConfVars.HIVEMERGEMAPREDFILES) &&
-                reduceWork != null;
+              hconf.getBoolVar(ConfVars.HIVEMERGEMAPREDFILES) &&
+              reduceWork != null;
             if (mergeMapOnly || mergeMapRed) {
               return true;
             }

Modified: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1571588&r1=1571587&r2=1571588&view=diff
==============================================================================
--- 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
 (original)
+++ 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
 Tue Feb 25 06:43:15 2014
@@ -643,7 +643,9 @@ public class SemanticAnalyzer extends Ba
   }
 
   private void assertCombineInputFormat(Tree numerator, String message) throws 
SemanticException {
-    String inputFormat = HiveConf.getVar(conf, 
HiveConf.ConfVars.HIVEINPUTFORMAT);
+    String inputFormat = 
conf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez") ?
+      HiveConf.getVar(conf, HiveConf.ConfVars.HIVETEZINPUTFORMAT):
+      HiveConf.getVar(conf, HiveConf.ConfVars.HIVEINPUTFORMAT);
     if (!inputFormat.equals(CombineHiveInputFormat.class.getName())) {
       throw new SemanticException(generateErrorMessage((ASTNode) numerator,
           message + " sampling is not supported in " + inputFormat));

Modified: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java?rev=1571588&r1=1571587&r2=1571588&view=diff
==============================================================================
--- 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java 
(original)
+++ 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java 
Tue Feb 25 06:43:15 2014
@@ -88,10 +88,6 @@ public class TezCompiler extends TaskCom
     // We require the use of recursive input dirs for union processing
     conf.setBoolean("mapred.input.dir.recursive", true);
     HiveConf.setBoolVar(conf, ConfVars.HIVE_HADOOP_SUPPORTS_SUBDIRECTORIES, 
true);
-
-    // Don't auto-merge files in tez
-    HiveConf.setBoolVar(conf, ConfVars.HIVEMERGEMAPFILES, false);
-    HiveConf.setBoolVar(conf, ConfVars.HIVEMERGEMAPREDFILES, false);
   }
 
   @Override


Reply via email to