Repository: hive Updated Branches: refs/heads/branch-2.0 a72ece6ec -> b36ec222b
HIVE-12762: Common join on parquet tables returns incorrect result when hive.optimize.index.filter set to true (reviewed by Sergio Pena) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b36ec222 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b36ec222 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b36ec222 Branch: refs/heads/branch-2.0 Commit: b36ec222b3b9136a1e48a12408d0cf61de33403e Parents: a72ece6 Author: Aihua Xu <aihu...@apache.org> Authored: Wed Dec 30 19:32:33 2015 -0500 Committer: Aihua Xu <aihu...@apache.org> Committed: Fri Jan 8 09:44:50 2016 -0500 ---------------------------------------------------------------------- .../apache/hadoop/hive/ql/exec/Utilities.java | 30 +++++-- .../hive/ql/io/parquet/ProjectionPusher.java | 87 ++++++++++++++------ .../test/queries/clientpositive/parquet_join2.q | 14 ++++ .../results/clientpositive/parquet_join2.q.out | 62 ++++++++++++++ .../hadoop/hive/ql/io/sarg/ExpressionTree.java | 6 +- .../hive/ql/io/sarg/SearchArgumentImpl.java | 31 +++++-- 6 files changed, 189 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/b36ec222/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 9a7d990..2d317a0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -2398,12 +2398,11 @@ public final class Utilities { return builder.toString(); } - public static void setColumnNameList(JobConf jobConf, Operator op) { - setColumnNameList(jobConf, op, false); + public static void setColumnNameList(JobConf jobConf, RowSchema rowSchema) { + setColumnNameList(jobConf, rowSchema, false); } - public static void setColumnNameList(JobConf jobConf, Operator op, boolean excludeVCs) { - RowSchema rowSchema = op.getSchema(); + public static void setColumnNameList(JobConf jobConf, RowSchema rowSchema, boolean excludeVCs) { if (rowSchema == null) { return; } @@ -2421,12 +2420,20 @@ public final class Utilities { jobConf.set(serdeConstants.LIST_COLUMNS, columnNamesString); } - public static void setColumnTypeList(JobConf jobConf, Operator op) { - setColumnTypeList(jobConf, op, false); + public static void setColumnNameList(JobConf jobConf, Operator op) { + setColumnNameList(jobConf, op, false); } - public static void setColumnTypeList(JobConf jobConf, Operator op, boolean excludeVCs) { + public static void setColumnNameList(JobConf jobConf, Operator op, boolean excludeVCs) { RowSchema rowSchema = op.getSchema(); + setColumnNameList(jobConf, rowSchema, excludeVCs); + } + + public static void setColumnTypeList(JobConf jobConf, RowSchema rowSchema) { + setColumnTypeList(jobConf, rowSchema, false); + } + + public static void setColumnTypeList(JobConf jobConf, RowSchema rowSchema, boolean excludeVCs) { if (rowSchema == null) { return; } @@ -2444,6 +2451,15 @@ public final class Utilities { jobConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypesString); } + public static void setColumnTypeList(JobConf jobConf, Operator op) { + setColumnTypeList(jobConf, op, false); + } + + public static void setColumnTypeList(JobConf jobConf, Operator op, boolean excludeVCs) { + RowSchema rowSchema = op.getSchema(); + setColumnTypeList(jobConf, rowSchema, excludeVCs); + } + public static String suffix = ".hashtable"; public static Path generatePath(Path basePath, String dumpFilePrefix, http://git-wip-us.apache.org/repos/asf/hive/blob/b36ec222/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java index 017676b..db923fa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java @@ -16,11 +16,14 @@ package org.apache.hadoop.hive.ql.io.parquet; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.slf4j.Logger; @@ -28,12 +31,16 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.mapred.JobConf; @@ -68,7 +75,8 @@ public class ProjectionPusher { @Deprecated // Uses deprecated methods on ColumnProjectionUtils private void pushProjectionsAndFilters(final JobConf jobConf, - final String splitPath, final String splitPathWithNoSchema) { + final String splitPath, + final String splitPathWithNoSchema) { if (mapWork == null) { return; @@ -76,53 +84,80 @@ public class ProjectionPusher { return; } - final ArrayList<String> aliases = new ArrayList<String>(); - final Iterator<Entry<String, ArrayList<String>>> iterator = mapWork.getPathToAliases().entrySet().iterator(); + final Set<String> aliases = new HashSet<String>(); + final Iterator<Entry<String, ArrayList<String>>> iterator = + mapWork.getPathToAliases().entrySet().iterator(); while (iterator.hasNext()) { final Entry<String, ArrayList<String>> entry = iterator.next(); final String key = new Path(entry.getKey()).toUri().getPath(); if (splitPath.equals(key) || splitPathWithNoSchema.equals(key)) { - final ArrayList<String> list = entry.getValue(); - for (final String val : list) { - aliases.add(val); - } + aliases.addAll(entry.getValue()); } } - for (final String alias : aliases) { - final Operator<? extends Serializable> op = mapWork.getAliasToWork().get( - alias); + // Collect the needed columns from all the aliases and create ORed filter + // expression for the table. + boolean allColumnsNeeded = false; + boolean noFilters = false; + Set<Integer> neededColumnIDs = new HashSet<Integer>(); + List<ExprNodeGenericFuncDesc> filterExprs = new ArrayList<ExprNodeGenericFuncDesc>(); + RowSchema rowSchema = null; + + for(String alias : aliases) { + final Operator<? extends Serializable> op = + mapWork.getAliasToWork().get(alias); if (op != null && op instanceof TableScanOperator) { - final TableScanOperator tableScan = (TableScanOperator) op; - - // push down projections - final List<Integer> list = tableScan.getNeededColumnIDs(); + final TableScanOperator ts = (TableScanOperator) op; - if (list != null) { - ColumnProjectionUtils.appendReadColumnIDs(jobConf, list); + if (ts.getNeededColumnIDs() == null) { + allColumnsNeeded = true; } else { - ColumnProjectionUtils.setFullyReadColumns(jobConf); + neededColumnIDs.addAll(ts.getNeededColumnIDs()); } - pushFilters(jobConf, tableScan); + rowSchema = ts.getSchema(); + ExprNodeGenericFuncDesc filterExpr = + ts.getConf() == null ? null : ts.getConf().getFilterExpr(); + noFilters = filterExpr == null; // No filter if any TS has no filter expression + filterExprs.add(filterExpr); } } - } - private void pushFilters(final JobConf jobConf, final TableScanOperator tableScan) { + ExprNodeGenericFuncDesc tableFilterExpr = null; + if (!noFilters) { + try { + for (ExprNodeGenericFuncDesc filterExpr : filterExprs) { + if (tableFilterExpr == null ) { + tableFilterExpr = filterExpr; + } else { + tableFilterExpr = ExprNodeGenericFuncDesc.newInstance(new GenericUDFOPOr(), + Arrays.<ExprNodeDesc>asList(tableFilterExpr, filterExpr)); + } + } + } catch(UDFArgumentException ex) { + LOG.debug("Turn off filtering due to " + ex); + tableFilterExpr = null; + } + } - final TableScanDesc scanDesc = tableScan.getConf(); - if (scanDesc == null) { - LOG.debug("Not pushing filters because TableScanDesc is null"); - return; + // push down projections + if (!allColumnsNeeded) { + if (!neededColumnIDs.isEmpty()) { + ColumnProjectionUtils.appendReadColumnIDs(jobConf, new ArrayList<Integer>(neededColumnIDs)); + } + } else { + ColumnProjectionUtils.setFullyReadColumns(jobConf); } + pushFilters(jobConf, rowSchema, tableFilterExpr); + } + + private void pushFilters(final JobConf jobConf, RowSchema rowSchema, ExprNodeGenericFuncDesc filterExpr) { // construct column name list for reference by filter push down - Utilities.setColumnNameList(jobConf, tableScan); + Utilities.setColumnNameList(jobConf, rowSchema); // push down filters - final ExprNodeGenericFuncDesc filterExpr = scanDesc.getFilterExpr(); if (filterExpr == null) { LOG.debug("Not pushing filters because FilterExpr is null"); return; http://git-wip-us.apache.org/repos/asf/hive/blob/b36ec222/ql/src/test/queries/clientpositive/parquet_join2.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/parquet_join2.q b/ql/src/test/queries/clientpositive/parquet_join2.q new file mode 100644 index 0000000..9d107c7 --- /dev/null +++ b/ql/src/test/queries/clientpositive/parquet_join2.q @@ -0,0 +1,14 @@ +set hive.optimize.index.filter = true; +set hive.auto.convert.join=false; + +CREATE TABLE tbl1(id INT) STORED AS PARQUET; +INSERT INTO tbl1 VALUES(1), (2); + +CREATE TABLE tbl2(id INT, value STRING) STORED AS PARQUET; +INSERT INTO tbl2 VALUES(1, 'value1'); +INSERT INTO tbl2 VALUES(1, 'value2'); + +select tbl1.id, t1.value, t2.value +FROM tbl1 +JOIN (SELECT * FROM tbl2 WHERE value='value1') t1 ON tbl1.id=t1.id +JOIN (SELECT * FROM tbl2 WHERE value='value2') t2 ON tbl1.id=t2.id; http://git-wip-us.apache.org/repos/asf/hive/blob/b36ec222/ql/src/test/results/clientpositive/parquet_join2.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/parquet_join2.q.out b/ql/src/test/results/clientpositive/parquet_join2.q.out new file mode 100644 index 0000000..f25dcd8 --- /dev/null +++ b/ql/src/test/results/clientpositive/parquet_join2.q.out @@ -0,0 +1,62 @@ +PREHOOK: query: CREATE TABLE tbl1(id INT) STORED AS PARQUET +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@tbl1 +POSTHOOK: query: CREATE TABLE tbl1(id INT) STORED AS PARQUET +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@tbl1 +PREHOOK: query: INSERT INTO tbl1 VALUES(1), (2) +PREHOOK: type: QUERY +PREHOOK: Input: default@values__tmp__table__1 +PREHOOK: Output: default@tbl1 +POSTHOOK: query: INSERT INTO tbl1 VALUES(1), (2) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@values__tmp__table__1 +POSTHOOK: Output: default@tbl1 +POSTHOOK: Lineage: tbl1.id EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +PREHOOK: query: CREATE TABLE tbl2(id INT, value STRING) STORED AS PARQUET +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@tbl2 +POSTHOOK: query: CREATE TABLE tbl2(id INT, value STRING) STORED AS PARQUET +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@tbl2 +PREHOOK: query: INSERT INTO tbl2 VALUES(1, 'value1') +PREHOOK: type: QUERY +PREHOOK: Input: default@values__tmp__table__2 +PREHOOK: Output: default@tbl2 +POSTHOOK: query: INSERT INTO tbl2 VALUES(1, 'value1') +POSTHOOK: type: QUERY +POSTHOOK: Input: default@values__tmp__table__2 +POSTHOOK: Output: default@tbl2 +POSTHOOK: Lineage: tbl2.id EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: tbl2.value SIMPLE [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +PREHOOK: query: INSERT INTO tbl2 VALUES(1, 'value2') +PREHOOK: type: QUERY +PREHOOK: Input: default@values__tmp__table__3 +PREHOOK: Output: default@tbl2 +POSTHOOK: query: INSERT INTO tbl2 VALUES(1, 'value2') +POSTHOOK: type: QUERY +POSTHOOK: Input: default@values__tmp__table__3 +POSTHOOK: Output: default@tbl2 +POSTHOOK: Lineage: tbl2.id EXPRESSION [(values__tmp__table__3)values__tmp__table__3.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: tbl2.value SIMPLE [(values__tmp__table__3)values__tmp__table__3.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +PREHOOK: query: select tbl1.id, t1.value, t2.value +FROM tbl1 +JOIN (SELECT * FROM tbl2 WHERE value='value1') t1 ON tbl1.id=t1.id +JOIN (SELECT * FROM tbl2 WHERE value='value2') t2 ON tbl1.id=t2.id +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl1 +PREHOOK: Input: default@tbl2 +#### A masked pattern was here #### +POSTHOOK: query: select tbl1.id, t1.value, t2.value +FROM tbl1 +JOIN (SELECT * FROM tbl2 WHERE value='value1') t1 ON tbl1.id=t1.id +JOIN (SELECT * FROM tbl2 WHERE value='value2') t2 ON tbl1.id=t2.id +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl1 +POSTHOOK: Input: default@tbl2 +#### A masked pattern was here #### +1 value1 value2 http://git-wip-us.apache.org/repos/asf/hive/blob/b36ec222/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/ExpressionTree.java ---------------------------------------------------------------------- diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/ExpressionTree.java b/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/ExpressionTree.java index 577d95d..443083d 100644 --- a/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/ExpressionTree.java +++ b/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/ExpressionTree.java @@ -31,7 +31,7 @@ public class ExpressionTree { public enum Operator {OR, AND, NOT, LEAF, CONSTANT} private final Operator operator; private final List<ExpressionTree> children; - private final int leaf; + private int leaf; private final SearchArgument.TruthValue constant; ExpressionTree() { @@ -153,4 +153,8 @@ public class ExpressionTree { public int getLeaf() { return leaf; } + + public void setLeaf(int leaf) { + this.leaf = leaf; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/b36ec222/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java ---------------------------------------------------------------------- diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java b/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java index eeff131..be5e67b 100644 --- a/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java +++ b/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java @@ -24,8 +24,12 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Deque; import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Queue; +import java.util.Set; /** * The implementation of SearchArguments. @@ -429,15 +433,28 @@ final class SearchArgumentImpl implements SearchArgument { * @return the fixed root */ static ExpressionTree rewriteLeaves(ExpressionTree root, - int[] leafReorder) { - if (root.getOperator() == ExpressionTree.Operator.LEAF) { - return new ExpressionTree(leafReorder[root.getLeaf()]); - } else if (root.getChildren() != null){ - List<ExpressionTree> children = root.getChildren(); - for(int i=0; i < children.size(); ++i) { - children.set(i, rewriteLeaves(children.get(i), leafReorder)); + int[] leafReorder) { + // The leaves could be shared in the tree. Use Set to remove the duplicates. + Set<ExpressionTree> leaves = new HashSet<ExpressionTree>(); + Queue<ExpressionTree> nodes = new LinkedList<ExpressionTree>(); + nodes.add(root); + + while(!nodes.isEmpty()) { + ExpressionTree node = nodes.remove(); + if (node.getOperator() == ExpressionTree.Operator.LEAF) { + leaves.add(node); + } else { + if (node.getChildren() != null){ + nodes.addAll(node.getChildren()); + } } } + + // Update the leaf in place + for(ExpressionTree leaf : leaves) { + leaf.setLeaf(leafReorder[leaf.getLeaf()]); + } + return root; }