Author: hashutosh
Date: Mon Jul 28 23:09:48 2014
New Revision: 1614212
URL: http://svn.apache.org/r1614212
Log:
HIVE-7524 : Enable auto conversion of SMBjoin in presence of constant propagate
optimization (Ashutosh Chauhan via Vikram Dixit)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagate.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeJoinProc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeConstantDesc.java
hive/trunk/ql/src/test/results/clientpositive/join_nullsafe.q.out
hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_25.q.out
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java?rev=1614212&r1=1614211&r2=1614212&view=diff
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java
(original)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java
Mon Jul 28 23:09:48 2014
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Stack;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
@@ -50,6 +51,7 @@ import org.apache.hadoop.hive.ql.parse.Q
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.TableAccessAnalyzer;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -456,10 +458,19 @@ abstract public class AbstractBucketJoin
public static List<String> toColumns(List<ExprNodeDesc> keys) {
List<String> columns = new ArrayList<String>();
for (ExprNodeDesc key : keys) {
- if (!(key instanceof ExprNodeColumnDesc)) {
+ if (key instanceof ExprNodeColumnDesc) {
+ columns.add(((ExprNodeColumnDesc) key).getColumn());
+ } else if ((key instanceof ExprNodeConstantDesc)) {
+ ExprNodeConstantDesc constant = (ExprNodeConstantDesc) key;
+ String colName = constant.getFoldedFromCol();
+ if (colName == null){
+ return null;
+ } else {
+ columns.add(colName);
+ }
+ } else {
return null;
}
- columns.add(((ExprNodeColumnDesc) key).getColumn());
}
return columns;
}
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagate.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagate.java?rev=1614212&r1=1614211&r2=1614212&view=diff
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagate.java
(original)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagate.java
Mon Jul 28 23:09:48 2014
@@ -82,9 +82,7 @@ public class ConstantPropagate implement
// if the later is enabled.
return pactx;
}
- if (pactx.getConf().getBoolVar(ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN)) {
- return pactx;
- }
+
pGraphContext = pactx;
opToParseCtxMap = pGraphContext.getOpParseCtx();
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java?rev=1614212&r1=1614211&r2=1614212&view=diff
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java
(original)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java
Mon Jul 28 23:09:48 2014
@@ -552,6 +552,7 @@ public final class ConstantPropagateProc
* conditional expressions and extract assignment expressions and propagate
them.
*/
public static class ConstantPropagateFilterProc implements NodeProcessor {
+ @Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
Object... nodeOutputs)
throws SemanticException {
FilterOperator op = (FilterOperator) nd;
@@ -594,6 +595,7 @@ public final class ConstantPropagateProc
* Node Processor for Constant Propagate for Group By Operators.
*/
public static class ConstantPropagateGroupByProc implements NodeProcessor {
+ @Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
Object... nodeOutputs)
throws SemanticException {
GroupByOperator op = (GroupByOperator) nd;
@@ -630,6 +632,7 @@ public final class ConstantPropagateProc
* The Default Node Processor for Constant Propagation.
*/
public static class ConstantPropagateDefaultProc implements NodeProcessor {
+ @Override
@SuppressWarnings("unchecked")
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
Object... nodeOutputs)
throws SemanticException {
@@ -658,6 +661,7 @@ public final class ConstantPropagateProc
* The Node Processor for Constant Propagation for Select Operators.
*/
public static class ConstantPropagateSelectProc implements NodeProcessor {
+ @Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
Object... nodeOutputs)
throws SemanticException {
SelectOperator op = (SelectOperator) nd;
@@ -691,6 +695,7 @@ public final class ConstantPropagateProc
* propagation, this processor also prunes dynamic partitions to static
partitions if possible.
*/
public static class ConstantPropagateFileSinkProc implements NodeProcessor {
+ @Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
Object... nodeOutputs)
throws SemanticException {
FileSinkOperator op = (FileSinkOperator) nd;
@@ -743,6 +748,7 @@ public final class ConstantPropagateProc
* Currently these kinds of Operators include UnionOperator and
ScriptOperator.
*/
public static class ConstantPropagateStopProc implements NodeProcessor {
+ @Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
Object... nodeOutputs)
throws SemanticException {
Operator<?> op = (Operator<?>) nd;
@@ -763,6 +769,7 @@ public final class ConstantPropagateProc
* join (left table for left outer join and vice versa) can be propagated.
*/
public static class ConstantPropagateReduceSinkProc implements NodeProcessor
{
+ @Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
Object... nodeOutputs)
throws SemanticException {
ReduceSinkOperator op = (ReduceSinkOperator) nd;
@@ -795,7 +802,11 @@ public final class ConstantPropagateProc
// key columns
ArrayList<ExprNodeDesc> newKeyEpxrs = new ArrayList<ExprNodeDesc>();
for (ExprNodeDesc desc : rsDesc.getKeyCols()) {
- newKeyEpxrs.add(foldExpr(desc, constants, cppCtx, op, 0, false));
+ ExprNodeDesc newDesc = foldExpr(desc, constants, cppCtx, op, 0, false);
+ if (newDesc != desc && desc instanceof ExprNodeColumnDesc && newDesc
instanceof ExprNodeConstantDesc) {
+
((ExprNodeConstantDesc)newDesc).setFoldedFromCol(((ExprNodeColumnDesc)desc).getColumn());
+ }
+ newKeyEpxrs.add(newDesc);
}
rsDesc.setKeyCols(newKeyEpxrs);
@@ -854,6 +865,7 @@ public final class ConstantPropagateProc
* The Node Processor for Constant Propagation for Join Operators.
*/
public static class ConstantPropagateJoinProc implements NodeProcessor {
+ @Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
Object... nodeOutputs)
throws SemanticException {
JoinOperator op = (JoinOperator) nd;
@@ -916,6 +928,7 @@ public final class ConstantPropagateProc
* The Node Processor for Constant Propagation for Table Scan Operators.
*/
public static class ConstantPropagateTableScanProc implements NodeProcessor {
+ @Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
Object... nodeOutputs)
throws SemanticException {
TableScanOperator op = (TableScanOperator) nd;
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeJoinProc.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeJoinProc.java?rev=1614212&r1=1614211&r2=1614212&view=diff
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeJoinProc.java
(original)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeJoinProc.java
Mon Jul 28 23:09:48 2014
@@ -18,17 +18,13 @@
package org.apache.hadoop.hive.ql.optimizer;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Stack;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
-import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.QBJoinTree;
import org.apache.hadoop.hive.ql.parse.SemanticException;
public class SortedMergeJoinProc extends AbstractSMBJoinProc implements
NodeProcessor {
@@ -46,12 +42,6 @@ public class SortedMergeJoinProc extends
JoinOperator joinOp = (JoinOperator) nd;
SortBucketJoinProcCtx smbJoinContext = (SortBucketJoinProcCtx) procCtx;
- Map<MapJoinOperator, QBJoinTree> mapJoinMap =
pGraphContext.getMapJoinContext();
- if (mapJoinMap == null) {
- mapJoinMap = new HashMap<MapJoinOperator, QBJoinTree>();
- pGraphContext.setMapJoinContext(mapJoinMap);
- }
-
boolean convert =
canConvertJoinToSMBJoin(
joinOp, smbJoinContext, pGraphContext);
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeConstantDesc.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeConstantDesc.java?rev=1614212&r1=1614211&r2=1614212&view=diff
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeConstantDesc.java
(original)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeConstantDesc.java
Mon Jul 28 23:09:48 2014
@@ -36,6 +36,17 @@ public class ExprNodeConstantDesc extend
private static final long serialVersionUID = 1L;
final protected transient static char[] hexArray =
"0123456789ABCDEF".toCharArray();
private Object value;
+ // If this constant was created while doing constant folding, foldedFromCol
holds the name of
+ // original column from which it was folded.
+ private transient String foldedFromCol;
+
+ public String getFoldedFromCol() {
+ return foldedFromCol;
+ }
+
+ public void setFoldedFromCol(String foldedFromCol) {
+ this.foldedFromCol = foldedFromCol;
+ }
public ExprNodeConstantDesc() {
}
Modified: hive/trunk/ql/src/test/results/clientpositive/join_nullsafe.q.out
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/join_nullsafe.q.out?rev=1614212&r1=1614211&r2=1614212&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/join_nullsafe.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/join_nullsafe.q.out Mon Jul
28 23:09:48 2014
@@ -1519,9 +1519,8 @@ STAGE PLANS:
predicate: value is null (type: boolean)
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE
Column stats: NONE
Reduce Output Operator
- key expressions: value (type: int)
+ key expressions: null (type: void)
sort order: +
- Map-reduce partition columns: value (type: int)
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE
Column stats: NONE
value expressions: key (type: int)
TableScan
@@ -1531,9 +1530,8 @@ STAGE PLANS:
predicate: key is null (type: boolean)
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE
Column stats: NONE
Reduce Output Operator
- key expressions: key (type: int)
+ key expressions: null (type: void)
sort order: +
- Map-reduce partition columns: key (type: int)
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE
Column stats: NONE
value expressions: value (type: int)
Reduce Operator Tree:
@@ -1541,13 +1539,13 @@ STAGE PLANS:
condition map:
Inner Join 0 to 1
condition expressions:
- 0 {KEY.reducesinkkey0} {VALUE._col0}
- 1 {VALUE._col0} {KEY.reducesinkkey0}
+ 0 {VALUE._col0}
+ 1 {VALUE._col0}
nullSafes: [true]
- outputColumnNames: _col0, _col1, _col4, _col5
+ outputColumnNames: _col1, _col4
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column
stats: NONE
Select Operator
- expressions: _col0 (type: int), _col1 (type: int), _col4 (type:
int), _col5 (type: int)
+ expressions: null (type: void), _col1 (type: int), _col4 (type:
int), null (type: void)
outputColumnNames: _col0, _col1, _col2, _col3
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column
stats: NONE
File Output Operator
Modified: hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_25.q.out
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_25.q.out?rev=1614212&r1=1614211&r2=1614212&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_25.q.out
(original)
+++ hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_25.q.out Mon Jul
28 23:09:48 2014
@@ -219,14 +219,13 @@ STAGE PLANS:
condition map:
Inner Join 0 to 1
condition expressions:
- 0 {key}
+ 0
1
keys:
- 0 key (type: int)
- 1 key (type: int)
- outputColumnNames: _col0
+ 0 5 (type: int)
+ 1 5 (type: int)
Select Operator
- expressions: _col0 (type: int)
+ expressions: 5 (type: int)
outputColumnNames: _col0
Reduce Output Operator
key expressions: _col0 (type: int)
@@ -242,14 +241,13 @@ STAGE PLANS:
condition map:
Inner Join 0 to 1
condition expressions:
- 0 {key}
+ 0
1
keys:
- 0 key (type: int)
- 1 key (type: int)
- outputColumnNames: _col0
+ 0 5 (type: int)
+ 1 5 (type: int)
Select Operator
- expressions: _col0 (type: int)
+ expressions: 5 (type: int)
outputColumnNames: _col0
Reduce Output Operator
key expressions: _col0 (type: int)
@@ -266,7 +264,7 @@ STAGE PLANS:
Filter Operator
predicate: (_col1 = 5) (type: boolean)
Select Operator
- expressions: _col0 (type: int), _col1 (type: int)
+ expressions: _col0 (type: int), 5 (type: int)
outputColumnNames: _col0, _col1
File Output Operator
compressed: false