Author: gunther
Date: Mon Sep 30 22:53:35 2013
New Revision: 1527812
URL: http://svn.apache.org/r1527812
Log:
HIVE-5378: Need to move SetReducerParallelism to the optimize package. (Vikram
Dixit K via Gunther Hagleitner)
Added:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java
Removed:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/SetReducerParallelism.java
Modified:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
Added:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java
URL:
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java?rev=1527812&view=auto
==============================================================================
---
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java
(added)
+++
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java
Mon Sep 30 22:53:35 2013
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+
+/**
+ * SetReducerParallelism determines how many reducers should
+ * be run for a given reduce sink.
+ */
+public class SetReducerParallelism implements NodeProcessor {
+
+ static final private Log LOG =
LogFactory.getLog(SetReducerParallelism.class.getName());
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Object process(Node nd, Stack<Node> stack,
+ NodeProcessorCtx procContext, Object... nodeOutputs)
+ throws SemanticException {
+
+ OptimizeTezProcContext context = (OptimizeTezProcContext) procContext;
+
+ ReduceSinkOperator sink = (ReduceSinkOperator) nd;
+ ReduceSinkDesc desc = sink.getConf();
+
+ long bytesPerReducer =
context.conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER);
+ int maxReducers = context.conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
+ int constantReducers =
context.conf.getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS);
+
+ if (context.visitedReduceSinks.contains(sink)) {
+ // skip walking the children
+ LOG.debug("Already processed reduce sink: " + sink.getName());
+ return true;
+ }
+
+ context.visitedReduceSinks.add(sink);
+
+ try {
+ if (desc.getNumReducers() <= 0) {
+ if (constantReducers > 0) {
+ LOG.info("Parallelism for reduce sink "+sink+" set by user to
"+constantReducers);
+ desc.setNumReducers(constantReducers);
+ } else {
+ long numberOfBytes = 0;
+
+ // we need to add up all the estimates from the siblings of this
reduce sink
+ for (Operator<? extends OperatorDesc> sibling:
+ sink.getChildOperators().get(0).getParentOperators()) {
+ numberOfBytes +=
sibling.getStatistics(context.conf).getNumberOfBytes();
+ }
+
+ int numReducers = Utilities.estimateReducers(numberOfBytes,
bytesPerReducer,
+ maxReducers, false);
+ LOG.info("Set parallelism for reduce sink "+sink+" to:
"+numReducers);
+ desc.setNumReducers(numReducers);
+ }
+ } else {
+ LOG.info("Number of reducers determined to be:
"+desc.getNumReducers());
+ }
+ } catch (HiveException e) {
+ throw new SemanticException(e);
+ }
+
+ return false;
+ }
+
+}
Modified:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
URL:
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java?rev=1527812&r1=1527811&r2=1527812&view=diff
==============================================================================
---
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
(original)
+++
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
Mon Sep 30 22:53:35 2013
@@ -49,6 +49,7 @@ import org.apache.hadoop.hive.ql.lib.Rul
import org.apache.hadoop.hive.ql.lib.RuleRegExp;
import org.apache.hadoop.hive.ql.optimizer.ConvertJoinMapJoin;
import org.apache.hadoop.hive.ql.optimizer.ReduceSinkMapJoinProc;
+import org.apache.hadoop.hive.ql.optimizer.SetReducerParallelism;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MoveWork;