Repository: hive
Updated Branches:
  refs/heads/hive-14535 ccea0d6ff -> 187eb760d


http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java
index a694cf8..c81131e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java
@@ -26,7 +26,6 @@ import static 
org.apache.hadoop.hive.ql.optimizer.physical.LlapDecider.LlapMode.
 
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Deque;
 import java.util.EnumSet;
@@ -51,7 +50,6 @@ import org.apache.hadoop.hive.ql.exec.SelectOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
 import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
@@ -106,12 +104,20 @@ public class LlapDecider implements PhysicalPlanResolver {
   }
 
   private LlapMode mode;
+  private final LlapClusterStateForCompile clusterState;
+
+  public LlapDecider(LlapClusterStateForCompile clusterState) {
+    this.clusterState = clusterState;
+  }
+
 
   class LlapDecisionDispatcher implements Dispatcher {
     private final HiveConf conf;
     private final boolean doSkipUdfCheck;
     private final boolean arePermanentFnsAllowed;
     private final boolean shouldUber;
+    private final float minReducersPerExec;
+    private final int executorsPerNode;
     private List<MapJoinOperator> mapJoinOpList;
     private final Map<Rule, NodeProcessor> rules;
 
@@ -121,6 +127,9 @@ public class LlapDecider implements PhysicalPlanResolver {
       arePermanentFnsAllowed = HiveConf.getBoolVar(conf, 
ConfVars.LLAP_ALLOW_PERMANENT_FNS);
       // Don't user uber in "all" mode - everything can go into LLAP, which is 
better than uber.
       shouldUber = HiveConf.getBoolVar(conf, ConfVars.LLAP_AUTO_ALLOW_UBER) && 
(mode != all);
+      minReducersPerExec = HiveConf.getFloatVar(
+          conf, ConfVars.TEZ_LLAP_MIN_REDUCER_PER_EXECUTOR);
+      executorsPerNode = HiveConf.getIntVar(conf, 
ConfVars.LLAP_DAEMON_NUM_EXECUTORS); // TODO# hmm
       mapJoinOpList = new ArrayList<MapJoinOperator>();
       rules = getRules();
     }
@@ -139,22 +148,57 @@ public class LlapDecider implements PhysicalPlanResolver {
       return null;
     }
 
-    private void handleWork(TezWork tezWork, BaseWork work)
-      throws SemanticException {
+    private void handleWork(TezWork tezWork, BaseWork work) throws 
SemanticException {
       boolean workCanBeDoneInLlap = evaluateWork(tezWork, work);
       LOG.debug(
           "Work " + work + " " + (workCanBeDoneInLlap ? "can" : "cannot") + " 
be done in LLAP");
       if (workCanBeDoneInLlap) {
         for (MapJoinOperator graceMapJoinOp : mapJoinOpList) {
-          LOG.debug(
-              "Disabling hybrid grace hash join in case of LLAP and 
non-dynamic partition hash join.");
+          LOG.debug("Disabling hybrid grace hash join in case of LLAP "
+              + "and non-dynamic partition hash join.");
           graceMapJoinOp.getConf().setHybridHashJoin(false);
         }
+        adjustAutoParallelism(work);
+        
         convertWork(tezWork, work);
       }
       mapJoinOpList.clear();
     }
 
+    private void adjustAutoParallelism(BaseWork work) {
+      if (minReducersPerExec <= 0 || !(work instanceof ReduceWork)) return;
+      ReduceWork reduceWork = (ReduceWork)work;
+      if (reduceWork.isAutoReduceParallelism() == false && 
reduceWork.isUniformDistribution() == false) {
+        return; // Not based on ARP and cannot assume uniform distribution, 
bail.
+      }
+      clusterState.initClusterInfo();
+      int targetCount = 0;
+      if (!clusterState.hasClusterInfo()) {
+        LOG.warn("Cannot determine LLAP cluster information");
+        targetCount = (int)Math.ceil(minReducersPerExec * 1 * 
executorsPerNode);
+      } else {
+        targetCount = (int)Math.ceil(minReducersPerExec * 
(clusterState.getKnownExecutorCount()
+            + clusterState.getNodeCountWithUnknownExecutors() * 
executorsPerNode));
+      }
+      // We only increase the targets here.
+      if (reduceWork.isAutoReduceParallelism()) {
+        int newMin = Math.max(reduceWork.getMinReduceTasks(), targetCount);
+        if (newMin < reduceWork.getMaxReduceTasks()) {
+          reduceWork.setMinReduceTasks(newMin);
+          reduceWork.getEdgePropRef().setAutoReduce(conf, true, newMin,
+              reduceWork.getMaxReduceTasks(), 
conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER));
+        } else {
+          reduceWork.setAutoReduceParallelism(false);
+          reduceWork.setNumReduceTasks(newMin);
+          // TODO: is this correct? based on the same logic as HIVE-14200
+          reduceWork.getEdgePropRef().setAutoReduce(null, false, 0, 0, 0);
+        }
+      } else {
+        // UNIFORM || AUTOPARALLEL (maxed out)
+        reduceWork.setNumReduceTasks(Math.max(reduceWork.getNumReduceTasks(), 
targetCount));
+      }
+    }
+
 
     private void convertWork(TezWork tezWork, BaseWork work)
       throws SemanticException {

Reply via email to