Repository: hive
Updated Branches:
  refs/heads/master a28f6cd84 -> 3401efce6


HIVE-12762: Common join on parquet tables returns incorrect result when 
hive.optimize.index.filter set to true (reviewed by Sergio Pena)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3401efce
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3401efce
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3401efce

Branch: refs/heads/master
Commit: 3401efce64be84f2605e51341bbb41dc1ba11899
Parents: a28f6cd
Author: Aihua Xu <aihu...@apache.org>
Authored: Wed Dec 30 19:32:33 2015 -0500
Committer: Aihua Xu <aihu...@apache.org>
Committed: Fri Jan 8 09:37:08 2016 -0500

----------------------------------------------------------------------
 .../apache/hadoop/hive/ql/exec/Utilities.java   | 30 +++++--
 .../hive/ql/io/parquet/ProjectionPusher.java    | 87 ++++++++++++++------
 .../test/queries/clientpositive/parquet_join2.q | 14 ++++
 .../results/clientpositive/parquet_join2.q.out  | 62 ++++++++++++++
 .../hadoop/hive/ql/io/sarg/ExpressionTree.java  |  6 +-
 .../hive/ql/io/sarg/SearchArgumentImpl.java     | 31 +++++--
 6 files changed, 189 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/3401efce/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 9a7d990..2d317a0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -2398,12 +2398,11 @@ public final class Utilities {
     return builder.toString();
   }
 
-  public static void setColumnNameList(JobConf jobConf, Operator op) {
-    setColumnNameList(jobConf, op, false);
+  public static void setColumnNameList(JobConf jobConf, RowSchema rowSchema) {
+    setColumnNameList(jobConf, rowSchema, false);
   }
 
-  public static void setColumnNameList(JobConf jobConf, Operator op, boolean 
excludeVCs) {
-    RowSchema rowSchema = op.getSchema();
+  public static void setColumnNameList(JobConf jobConf, RowSchema rowSchema, 
boolean excludeVCs) {
     if (rowSchema == null) {
       return;
     }
@@ -2421,12 +2420,20 @@ public final class Utilities {
     jobConf.set(serdeConstants.LIST_COLUMNS, columnNamesString);
   }
 
-  public static void setColumnTypeList(JobConf jobConf, Operator op) {
-    setColumnTypeList(jobConf, op, false);
+  public static void setColumnNameList(JobConf jobConf, Operator op) {
+    setColumnNameList(jobConf, op, false);
   }
 
-  public static void setColumnTypeList(JobConf jobConf, Operator op, boolean 
excludeVCs) {
+  public static void setColumnNameList(JobConf jobConf, Operator op, boolean 
excludeVCs) {
     RowSchema rowSchema = op.getSchema();
+    setColumnNameList(jobConf, rowSchema, excludeVCs);
+  }
+
+  public static void setColumnTypeList(JobConf jobConf, RowSchema rowSchema) {
+    setColumnTypeList(jobConf, rowSchema, false);
+  }
+
+  public static void setColumnTypeList(JobConf jobConf, RowSchema rowSchema, 
boolean excludeVCs) {
     if (rowSchema == null) {
       return;
     }
@@ -2444,6 +2451,15 @@ public final class Utilities {
     jobConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypesString);
   }
 
+  public static void setColumnTypeList(JobConf jobConf, Operator op) {
+    setColumnTypeList(jobConf, op, false);
+  }
+
+  public static void setColumnTypeList(JobConf jobConf, Operator op, boolean 
excludeVCs) {
+    RowSchema rowSchema = op.getSchema();
+    setColumnTypeList(jobConf, rowSchema, excludeVCs);
+  }
+
   public static String suffix = ".hashtable";
 
   public static Path generatePath(Path basePath, String dumpFilePrefix,

http://git-wip-us.apache.org/repos/asf/hive/blob/3401efce/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java
index 017676b..db923fa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java
@@ -16,11 +16,14 @@ package org.apache.hadoop.hive.ql.io.parquet;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.slf4j.Logger;
@@ -28,12 +31,16 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.mapred.JobConf;
 
@@ -68,7 +75,8 @@ public class ProjectionPusher {
 
   @Deprecated  // Uses deprecated methods on ColumnProjectionUtils
   private void pushProjectionsAndFilters(final JobConf jobConf,
-      final String splitPath, final String splitPathWithNoSchema) {
+      final String splitPath,
+      final String splitPathWithNoSchema) {
 
     if (mapWork == null) {
       return;
@@ -76,53 +84,80 @@ public class ProjectionPusher {
       return;
     }
 
-    final ArrayList<String> aliases = new ArrayList<String>();
-    final Iterator<Entry<String, ArrayList<String>>> iterator = 
mapWork.getPathToAliases().entrySet().iterator();
+    final Set<String> aliases = new HashSet<String>();
+    final Iterator<Entry<String, ArrayList<String>>> iterator =
+        mapWork.getPathToAliases().entrySet().iterator();
 
     while (iterator.hasNext()) {
       final Entry<String, ArrayList<String>> entry = iterator.next();
       final String key = new Path(entry.getKey()).toUri().getPath();
       if (splitPath.equals(key) || splitPathWithNoSchema.equals(key)) {
-        final ArrayList<String> list = entry.getValue();
-        for (final String val : list) {
-          aliases.add(val);
-        }
+        aliases.addAll(entry.getValue());
       }
     }
 
-    for (final String alias : aliases) {
-      final Operator<? extends Serializable> op = mapWork.getAliasToWork().get(
-          alias);
+    // Collect the needed columns from all the aliases and create ORed filter
+    // expression for the table.
+    boolean allColumnsNeeded = false;
+    boolean noFilters = false;
+    Set<Integer> neededColumnIDs = new HashSet<Integer>();
+    List<ExprNodeGenericFuncDesc> filterExprs = new 
ArrayList<ExprNodeGenericFuncDesc>();
+    RowSchema rowSchema = null;
+
+    for(String alias : aliases) {
+      final Operator<? extends Serializable> op =
+          mapWork.getAliasToWork().get(alias);
       if (op != null && op instanceof TableScanOperator) {
-        final TableScanOperator tableScan = (TableScanOperator) op;
-
-        // push down projections
-        final List<Integer> list = tableScan.getNeededColumnIDs();
+        final TableScanOperator ts = (TableScanOperator) op;
 
-        if (list != null) {
-          ColumnProjectionUtils.appendReadColumnIDs(jobConf, list);
+        if (ts.getNeededColumnIDs() == null) {
+          allColumnsNeeded = true;
         } else {
-          ColumnProjectionUtils.setFullyReadColumns(jobConf);
+          neededColumnIDs.addAll(ts.getNeededColumnIDs());
         }
 
-        pushFilters(jobConf, tableScan);
+        rowSchema = ts.getSchema();
+        ExprNodeGenericFuncDesc filterExpr =
+            ts.getConf() == null ? null : ts.getConf().getFilterExpr();
+        noFilters = filterExpr == null; // No filter if any TS has no filter 
expression
+        filterExprs.add(filterExpr);
       }
     }
-  }
 
-  private void pushFilters(final JobConf jobConf, final TableScanOperator 
tableScan) {
+    ExprNodeGenericFuncDesc tableFilterExpr = null;
+    if (!noFilters) {
+      try {
+        for (ExprNodeGenericFuncDesc filterExpr : filterExprs) {
+          if (tableFilterExpr == null ) {
+            tableFilterExpr = filterExpr;
+          } else {
+            tableFilterExpr = ExprNodeGenericFuncDesc.newInstance(new 
GenericUDFOPOr(),
+                Arrays.<ExprNodeDesc>asList(tableFilterExpr, filterExpr));
+          }
+        }
+      } catch(UDFArgumentException ex) {
+        LOG.debug("Turn off filtering due to " + ex);
+        tableFilterExpr = null;
+      }
+    }
 
-    final TableScanDesc scanDesc = tableScan.getConf();
-    if (scanDesc == null) {
-      LOG.debug("Not pushing filters because TableScanDesc is null");
-      return;
+    // push down projections
+    if (!allColumnsNeeded) {
+      if (!neededColumnIDs.isEmpty()) {
+        ColumnProjectionUtils.appendReadColumnIDs(jobConf, new 
ArrayList<Integer>(neededColumnIDs));
+      }
+    } else {
+      ColumnProjectionUtils.setFullyReadColumns(jobConf);
     }
 
+    pushFilters(jobConf, rowSchema, tableFilterExpr);
+  }
+
+  private void pushFilters(final JobConf jobConf, RowSchema rowSchema, 
ExprNodeGenericFuncDesc filterExpr) {
     // construct column name list for reference by filter push down
-    Utilities.setColumnNameList(jobConf, tableScan);
+    Utilities.setColumnNameList(jobConf, rowSchema);
 
     // push down filters
-    final ExprNodeGenericFuncDesc filterExpr = scanDesc.getFilterExpr();
     if (filterExpr == null) {
       LOG.debug("Not pushing filters because FilterExpr is null");
       return;

http://git-wip-us.apache.org/repos/asf/hive/blob/3401efce/ql/src/test/queries/clientpositive/parquet_join2.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/parquet_join2.q 
b/ql/src/test/queries/clientpositive/parquet_join2.q
new file mode 100644
index 0000000..9d107c7
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/parquet_join2.q
@@ -0,0 +1,14 @@
+set hive.optimize.index.filter = true;
+set hive.auto.convert.join=false;
+
+CREATE TABLE tbl1(id INT) STORED AS PARQUET;
+INSERT INTO tbl1 VALUES(1), (2);
+
+CREATE TABLE tbl2(id INT, value STRING) STORED AS PARQUET;
+INSERT INTO tbl2 VALUES(1, 'value1');
+INSERT INTO tbl2 VALUES(1, 'value2');
+
+select tbl1.id, t1.value, t2.value
+FROM tbl1
+JOIN (SELECT * FROM tbl2 WHERE value='value1') t1 ON tbl1.id=t1.id
+JOIN (SELECT * FROM tbl2 WHERE value='value2') t2 ON tbl1.id=t2.id;

http://git-wip-us.apache.org/repos/asf/hive/blob/3401efce/ql/src/test/results/clientpositive/parquet_join2.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/parquet_join2.q.out 
b/ql/src/test/results/clientpositive/parquet_join2.q.out
new file mode 100644
index 0000000..f25dcd8
--- /dev/null
+++ b/ql/src/test/results/clientpositive/parquet_join2.q.out
@@ -0,0 +1,62 @@
+PREHOOK: query: CREATE TABLE tbl1(id INT) STORED AS PARQUET
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tbl1
+POSTHOOK: query: CREATE TABLE tbl1(id INT) STORED AS PARQUET
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tbl1
+PREHOOK: query: INSERT INTO tbl1 VALUES(1), (2)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@values__tmp__table__1
+PREHOOK: Output: default@tbl1
+POSTHOOK: query: INSERT INTO tbl1 VALUES(1), (2)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@values__tmp__table__1
+POSTHOOK: Output: default@tbl1
+POSTHOOK: Lineage: tbl1.id EXPRESSION 
[(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, 
type:string, comment:), ]
+PREHOOK: query: CREATE TABLE tbl2(id INT, value STRING) STORED AS PARQUET
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tbl2
+POSTHOOK: query: CREATE TABLE tbl2(id INT, value STRING) STORED AS PARQUET
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tbl2
+PREHOOK: query: INSERT INTO tbl2 VALUES(1, 'value1')
+PREHOOK: type: QUERY
+PREHOOK: Input: default@values__tmp__table__2
+PREHOOK: Output: default@tbl2
+POSTHOOK: query: INSERT INTO tbl2 VALUES(1, 'value1')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@values__tmp__table__2
+POSTHOOK: Output: default@tbl2
+POSTHOOK: Lineage: tbl2.id EXPRESSION 
[(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col1, 
type:string, comment:), ]
+POSTHOOK: Lineage: tbl2.value SIMPLE 
[(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col2, 
type:string, comment:), ]
+PREHOOK: query: INSERT INTO tbl2 VALUES(1, 'value2')
+PREHOOK: type: QUERY
+PREHOOK: Input: default@values__tmp__table__3
+PREHOOK: Output: default@tbl2
+POSTHOOK: query: INSERT INTO tbl2 VALUES(1, 'value2')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@values__tmp__table__3
+POSTHOOK: Output: default@tbl2
+POSTHOOK: Lineage: tbl2.id EXPRESSION 
[(values__tmp__table__3)values__tmp__table__3.FieldSchema(name:tmp_values_col1, 
type:string, comment:), ]
+POSTHOOK: Lineage: tbl2.value SIMPLE 
[(values__tmp__table__3)values__tmp__table__3.FieldSchema(name:tmp_values_col2, 
type:string, comment:), ]
+PREHOOK: query: select tbl1.id, t1.value, t2.value
+FROM tbl1
+JOIN (SELECT * FROM tbl2 WHERE value='value1') t1 ON tbl1.id=t1.id
+JOIN (SELECT * FROM tbl2 WHERE value='value2') t2 ON tbl1.id=t2.id
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tbl1
+PREHOOK: Input: default@tbl2
+#### A masked pattern was here ####
+POSTHOOK: query: select tbl1.id, t1.value, t2.value
+FROM tbl1
+JOIN (SELECT * FROM tbl2 WHERE value='value1') t1 ON tbl1.id=t1.id
+JOIN (SELECT * FROM tbl2 WHERE value='value2') t2 ON tbl1.id=t2.id
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tbl1
+POSTHOOK: Input: default@tbl2
+#### A masked pattern was here ####
+1      value1  value2

http://git-wip-us.apache.org/repos/asf/hive/blob/3401efce/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/ExpressionTree.java
----------------------------------------------------------------------
diff --git 
a/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/ExpressionTree.java 
b/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/ExpressionTree.java
index 577d95d..443083d 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/ExpressionTree.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/ExpressionTree.java
@@ -31,7 +31,7 @@ public class ExpressionTree {
   public enum Operator {OR, AND, NOT, LEAF, CONSTANT}
   private final Operator operator;
   private final List<ExpressionTree> children;
-  private final int leaf;
+  private int leaf;
   private final SearchArgument.TruthValue constant;
 
   ExpressionTree() {
@@ -153,4 +153,8 @@ public class ExpressionTree {
   public int getLeaf() {
     return leaf;
   }
+
+  public void setLeaf(int leaf) {
+    this.leaf = leaf;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/3401efce/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java
----------------------------------------------------------------------
diff --git 
a/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java
 
b/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java
index eeff131..be5e67b 100644
--- 
a/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java
+++ 
b/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java
@@ -24,8 +24,12 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Deque;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
 
 /**
  * The implementation of SearchArguments.
@@ -429,15 +433,28 @@ final class SearchArgumentImpl implements SearchArgument {
      * @return the fixed root
      */
     static ExpressionTree rewriteLeaves(ExpressionTree root,
-                                        int[] leafReorder) {
-      if (root.getOperator() == ExpressionTree.Operator.LEAF) {
-        return new ExpressionTree(leafReorder[root.getLeaf()]);
-      } else if (root.getChildren() != null){
-        List<ExpressionTree> children = root.getChildren();
-        for(int i=0; i < children.size(); ++i) {
-          children.set(i, rewriteLeaves(children.get(i), leafReorder));
+                              int[] leafReorder) {
+      // The leaves could be shared in the tree. Use Set to remove the 
duplicates.
+      Set<ExpressionTree> leaves = new HashSet<ExpressionTree>();
+      Queue<ExpressionTree> nodes = new LinkedList<ExpressionTree>();
+      nodes.add(root);
+
+      while(!nodes.isEmpty()) {
+        ExpressionTree node = nodes.remove();
+        if (node.getOperator() == ExpressionTree.Operator.LEAF) {
+          leaves.add(node);
+        } else {
+          if (node.getChildren() != null){
+            nodes.addAll(node.getChildren());
+          }
         }
       }
+
+      // Update the leaf in place
+      for(ExpressionTree leaf : leaves) {
+        leaf.setLeaf(leafReorder[leaf.getLeaf()]);
+      }
+
       return root;
     }
 

Reply via email to