HIVE-11401: Predicate push down does not work with Parquet when partitions are in the expression (Sergio Pena, reviewed by Szehon Ho)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/724b3193 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/724b3193 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/724b3193 Branch: refs/heads/llap Commit: 724b31930718eea606dfe6d95eda7385209caa5f Parents: 7df9d7a Author: Sergio Pena <[email protected]> Authored: Fri Jul 31 09:48:28 2015 -0500 Committer: Sergio Pena <[email protected]> Committed: Fri Jul 31 09:48:28 2015 -0500 ---------------------------------------------------------------------- .../read/ParquetFilterPredicateConverter.java | 148 +++++++++++++++++++ .../read/ParquetRecordReaderWrapper.java | 122 ++------------- .../parquet/TestParquetRecordReaderWrapper.java | 14 +- .../read/TestParquetFilterPredicate.java | 51 +++++++ .../ql/io/sarg/TestConvertAstToSearchArg.java | 25 ++-- .../clientpositive/parquet_predicate_pushdown.q | 9 ++ .../parquet_predicate_pushdown.q.out | 47 ++++++ 7 files changed, 283 insertions(+), 133 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/724b3193/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetFilterPredicateConverter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetFilterPredicateConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetFilterPredicateConverter.java new file mode 100644 index 0000000..f170026 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetFilterPredicateConverter.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.parquet.read; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.io.parquet.FilterPredicateLeafBuilder; +import org.apache.hadoop.hive.ql.io.parquet.LeafFilterFactory; +import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.parquet.filter2.predicate.FilterApi; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class ParquetFilterPredicateConverter { + private static final Log LOG = LogFactory.getLog(ParquetFilterPredicateConverter.class); + + /** + * Translate the search argument to the filter predicate parquet uses + * @return translate the sarg into a filter predicate + */ + public static FilterPredicate toFilterPredicate(SearchArgument sarg) { + return toFilterPredicate(sarg, null); + } + + /** + * Translate the search argument to the filter predicate parquet uses. It includes + * only the columns from the passed schema. + * @return translate the sarg into a filter predicate + */ + public static FilterPredicate toFilterPredicate(SearchArgument sarg, MessageType schema) { + Set<String> columns = null; + if (schema != null) { + columns = new HashSet<String>(); + for (Type field : schema.getFields()) { + columns.add(field.getName()); + } + } + + return translate(sarg.getExpression(), sarg.getLeaves(), columns); + } + + private static FilterPredicate translate(ExpressionTree root, List<PredicateLeaf> leaves, Set<String> columns) { + FilterPredicate p = null; + switch (root.getOperator()) { + case OR: + for(ExpressionTree child: root.getChildren()) { + if (p == null) { + p = translate(child, leaves, columns); + } else { + FilterPredicate right = translate(child, leaves, columns); + // constant means no filter, ignore it when it is null + if(right != null){ + p = FilterApi.or(p, right); + } + } + } + return p; + case AND: + for(ExpressionTree child: root.getChildren()) { + if (p == null) { + p = translate(child, leaves, columns); + } else { + FilterPredicate right = translate(child, leaves, columns); + // constant means no filter, ignore it when it is null + if(right != null){ + p = FilterApi.and(p, right); + } + } + } + return p; + case NOT: + FilterPredicate op = translate(root.getChildren().get(0), leaves, columns); + if (op != null) { + return FilterApi.not(op); + } else { + return null; + } + case LEAF: + PredicateLeaf leaf = leaves.get(root.getLeaf()); + + // If columns is null, then we need to create the leaf + if (columns == null || columns.contains(leaf.getColumnName())) { + return buildFilterPredicateFromPredicateLeaf(leaf); + } else { + // Do not create predicate if the leaf is not on the passed schema. + return null; + } + case CONSTANT: + return null;// no filter will be executed for constant + default: + throw new IllegalStateException("Unknown operator: " + + root.getOperator()); + } + } + + private static FilterPredicate buildFilterPredicateFromPredicateLeaf + (PredicateLeaf leaf) { + LeafFilterFactory leafFilterFactory = new LeafFilterFactory(); + FilterPredicateLeafBuilder builder; + try { + builder = leafFilterFactory + .getLeafFilterBuilderByType(leaf.getType()); + if (builder == null) { + return null; + } + if (isMultiLiteralsOperator(leaf.getOperator())) { + return builder.buildPredicate(leaf.getOperator(), + leaf.getLiteralList(), + leaf.getColumnName()); + } else { + return builder + .buildPredict(leaf.getOperator(), + leaf.getLiteral(), + leaf.getColumnName()); + } + } catch (Exception e) { + LOG.error("fail to build predicate filter leaf with errors" + e, e); + return null; + } + } + + private static boolean isMultiLiteralsOperator(PredicateLeaf.Operator op) { + return (op == PredicateLeaf.Operator.IN) || + (op == PredicateLeaf.Operator.BETWEEN); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/724b3193/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java index 49e52da..f689b90 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java @@ -22,17 +22,10 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.IOConstants; -import org.apache.hadoop.hive.ql.io.parquet.FilterPredicateLeafBuilder; -import org.apache.hadoop.hive.ql.io.parquet.LeafFilterFactory; import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher; import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; -import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree; -import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; -import org.apache.hadoop.hive.ql.plan.TableScanDesc; -import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; @@ -46,7 +39,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.compat.RowGroupFilter; -import org.apache.parquet.filter2.predicate.FilterApi; import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetInputFormat; @@ -57,6 +49,7 @@ import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.FileMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.hadoop.util.ContextUtil; +import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageTypeParser; import com.google.common.base.Strings; @@ -139,26 +132,23 @@ public class ParquetRecordReaderWrapper implements RecordReader<NullWritable, A } } - public FilterCompat.Filter setFilter(final JobConf conf) { - String serializedPushdown = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR); - String columnNamesString = - conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR); - if (serializedPushdown == null || columnNamesString == null || serializedPushdown.isEmpty() || - columnNamesString.isEmpty()) { + public FilterCompat.Filter setFilter(final JobConf conf, MessageType schema) { + SearchArgument sarg = ConvertAstToSearchArg.createFromConf(conf); + if (sarg == null) { return null; } - SearchArgument sarg = - ConvertAstToSearchArg.create(Utilities.deserializeExpression - (serializedPushdown)); - FilterPredicate p = toFilterPredicate(sarg); + // Create the Parquet FilterPredicate without including columns that do not exist + // on the shema (such as partition columns). + FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg, schema); if (p != null) { - LOG.debug("Predicate filter for parquet is " + p.toString()); + // Filter may have sensitive information. Do not send to debug. + LOG.debug("PARQUET predicate push down generated."); ParquetInputFormat.setFilterPredicate(conf, p); return FilterCompat.get(p); } else { - LOG.debug("No predicate filter can be generated for " + TableScanDesc.FILTER_EXPR_CONF_STR + - " with the value of " + serializedPushdown); + // Filter may have sensitive information. Do not send to debug. + LOG.debug("No PARQUET predicate push down is generated."); return null; } } @@ -250,7 +240,6 @@ public class ParquetRecordReaderWrapper implements RecordReader<NullWritable, A if (oldSplit instanceof FileSplit) { final Path finalPath = ((FileSplit) oldSplit).getPath(); jobConf = projectionPusher.pushProjectionsAndFilters(conf, finalPath.getParent()); - FilterCompat.Filter filter = setFilter(jobConf); final ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(jobConf, finalPath); final List<BlockMetaData> blocks = parquetMetadata.getBlocks(); @@ -274,6 +263,7 @@ public class ParquetRecordReaderWrapper implements RecordReader<NullWritable, A return null; } + FilterCompat.Filter filter = setFilter(jobConf, fileMetaData.getSchema()); if (filter != null) { filtedBlocks = RowGroupFilter.filterRowGroups(filter, splitGroup, fileMetaData.getSchema()); if (filtedBlocks.isEmpty()) { @@ -310,92 +300,4 @@ public class ParquetRecordReaderWrapper implements RecordReader<NullWritable, A public List<BlockMetaData> getFiltedBlocks() { return filtedBlocks; } - - /** - * Translate the search argument to the filter predicate parquet used - * @return translate the sarg into a filter predicate - */ - public static FilterPredicate toFilterPredicate(SearchArgument sarg) { - return translate(sarg.getExpression(), sarg.getLeaves()); - } - - private static boolean isMultiLiteralsOperator(PredicateLeaf.Operator op) { - return (op == PredicateLeaf.Operator.IN) || - (op == PredicateLeaf.Operator.BETWEEN); - } - - private static FilterPredicate translate(ExpressionTree root, - List<PredicateLeaf> leafs){ - FilterPredicate p = null; - switch (root.getOperator()) { - case OR: - for(ExpressionTree child: root.getChildren()) { - if (p == null) { - p = translate(child, leafs); - } else { - FilterPredicate right = translate(child, leafs); - // constant means no filter, ignore it when it is null - if(right != null){ - p = FilterApi.or(p, right); - } - } - } - return p; - case AND: - for(ExpressionTree child: root.getChildren()) { - if (p == null) { - p = translate(child, leafs); - } else { - FilterPredicate right = translate(child, leafs); - // constant means no filter, ignore it when it is null - if(right != null){ - p = FilterApi.and(p, right); - } - } - } - return p; - case NOT: - FilterPredicate op = translate(root.getChildren().get(0), leafs); - if (op != null) { - return FilterApi.not(op); - } else { - return null; - } - case LEAF: - return buildFilterPredicateFromPredicateLeaf(leafs.get(root.getLeaf())); - case CONSTANT: - return null;// no filter will be executed for constant - default: - throw new IllegalStateException("Unknown operator: " + - root.getOperator()); - } - } - - private static FilterPredicate buildFilterPredicateFromPredicateLeaf - (PredicateLeaf leaf) { - LeafFilterFactory leafFilterFactory = new LeafFilterFactory(); - FilterPredicateLeafBuilder builder; - try { - builder = leafFilterFactory - .getLeafFilterBuilderByType(leaf.getType()); - if (builder == null) { - return null; - } - if (isMultiLiteralsOperator(leaf.getOperator())) { - return builder.buildPredicate(leaf.getOperator(), - leaf.getLiteralList(), - leaf.getColumnName()); - } else { - return builder - .buildPredict(leaf.getOperator(), - leaf.getLiteral(), - leaf.getColumnName()); - } - } catch (Exception e) { - LOG.error("fail to build predicate filter leaf with errors" + e, e); - return null; - } - } - - } http://git-wip-us.apache.org/repos/asf/hive/blob/724b3193/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRecordReaderWrapper.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRecordReaderWrapper.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRecordReaderWrapper.java index 87dd344..f9ca528 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRecordReaderWrapper.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRecordReaderWrapper.java @@ -22,7 +22,7 @@ import static junit.framework.Assert.assertEquals; import org.apache.hadoop.hive.common.type.HiveChar; import org.apache.hadoop.hive.common.type.HiveVarchar; -import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; +import org.apache.hadoop.hive.ql.io.parquet.read.ParquetFilterPredicateConverter; import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue; @@ -56,7 +56,7 @@ public class TestParquetRecordReaderWrapper { .end() .build(); - FilterPredicate p = ParquetRecordReaderWrapper.toFilterPredicate(sarg); + FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg); String expected = "and(and(and(not(eq(x, null)), not(and(lt(y, 20), not(lteq(y, 10))))), not(or(or(eq(z, 1), " + "eq(z, 2)), eq(z, 3)))), not(eq(a, Binary{\"stinger\"})))"; @@ -76,7 +76,7 @@ public class TestParquetRecordReaderWrapper { .end() .build(); assertEquals("lteq(y, Binary{\"hi \"})", - ParquetRecordReaderWrapper.toFilterPredicate(sarg).toString()); + ParquetFilterPredicateConverter.toFilterPredicate(sarg).toString()); sarg = SearchArgumentFactory.newBuilder() .startNot() @@ -91,7 +91,7 @@ public class TestParquetRecordReaderWrapper { .end() .build(); - FilterPredicate p = ParquetRecordReaderWrapper.toFilterPredicate(sarg); + FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg); String expected = "and(and(not(eq(x, null)), not(or(or(eq(z, 1), eq(z, 2)), eq(z, 3)))), " + "not(eq(a, Binary{\"stinger\"})))"; @@ -111,7 +111,7 @@ public class TestParquetRecordReaderWrapper { .end() .build(); assertEquals("lteq(y, Binary{\"hi \"})", - ParquetRecordReaderWrapper.toFilterPredicate(sarg).toString()); + ParquetFilterPredicateConverter.toFilterPredicate(sarg).toString()); sarg = SearchArgumentFactory.newBuilder() .startNot() @@ -126,7 +126,7 @@ public class TestParquetRecordReaderWrapper { .end() .build(); - FilterPredicate p = ParquetRecordReaderWrapper.toFilterPredicate(sarg); + FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg); String expected = "and(and(not(eq(x, null)), not(or(or(eq(z, 1), eq(z, 2)), eq(z, 3)))), " + "not(eq(a, Binary{\"stinger\"})))"; assertEquals(expected, p.toString()); @@ -146,7 +146,7 @@ public class TestParquetRecordReaderWrapper { .end() .build(); - FilterPredicate p = ParquetRecordReaderWrapper.toFilterPredicate(sarg); + FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg); String expected = "and(and(and(and(lt(x, 22), lt(x1, 22))," + " lteq(y, Binary{\"hi \"})), eq(z, " + "0.22)), eq(z1, 0.22))"; http://git-wip-us.apache.org/repos/asf/hive/blob/724b3193/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/read/TestParquetFilterPredicate.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/read/TestParquetFilterPredicate.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/read/TestParquetFilterPredicate.java new file mode 100644 index 0000000..847a02b --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/read/TestParquetFilterPredicate.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.parquet.read; + +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; +import org.junit.Test; + +import static junit.framework.Assert.assertEquals; + +public class TestParquetFilterPredicate { + @Test + public void testFilterColumnsThatDoNoExistOnSchema() { + MessageType schema = MessageTypeParser.parseMessageType("message test { required int32 a; required binary stinger; }"); + SearchArgument sarg = SearchArgumentFactory.newBuilder() + .startNot() + .startOr() + .isNull("a", PredicateLeaf.Type.INTEGER) + .between("y", PredicateLeaf.Type.INTEGER, 10, 20) // Column will be removed from filter + .in("z", PredicateLeaf.Type.INTEGER, 1, 2, 3) // Column will be removed from filter + .nullSafeEquals("a", PredicateLeaf.Type.STRING, "stinger") + .end() + .end() + .build(); + + FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg, schema); + + String expected = "and(not(eq(a, null)), not(eq(a, Binary{\"stinger\"})))"; + assertEquals(expected, p.toString()); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/724b3193/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestConvertAstToSearchArg.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestConvertAstToSearchArg.java b/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestConvertAstToSearchArg.java index 85e952f..9e8425a 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestConvertAstToSearchArg.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestConvertAstToSearchArg.java @@ -24,22 +24,15 @@ import static junit.framework.Assert.assertTrue; import com.google.common.collect.Sets; -import org.apache.hadoop.hive.common.type.HiveChar; -import org.apache.hadoop.hive.common.type.HiveVarchar; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; +import org.apache.hadoop.hive.ql.io.parquet.read.ParquetFilterPredicateConverter; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue; -import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl.PredicateLeafImpl; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; -import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.junit.Test; import java.beans.XMLDecoder; import java.io.ByteArrayInputStream; import java.io.UnsupportedEncodingException; -import java.lang.reflect.Field; -import java.sql.Date; -import java.sql.Timestamp; import java.util.List; import java.util.Set; @@ -557,7 +550,7 @@ public class TestConvertAstToSearchArg { List<PredicateLeaf> leaves = sarg.getLeaves(); assertEquals(9, leaves.size()); - FilterPredicate p = ParquetRecordReaderWrapper.toFilterPredicate(sarg); + FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg); String[] conditions = new String[]{ "eq(first_name, Binary{\"john\"})", /* first_name = 'john' */ "not(lteq(first_name, Binary{\"greg\"}))", /* 'greg' < first_name */ @@ -849,7 +842,7 @@ public class TestConvertAstToSearchArg { "lteq(id, 4)" /* id <= 4 */ }; - FilterPredicate p = ParquetRecordReaderWrapper.toFilterPredicate(sarg); + FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg); String expected = String.format("or(or(or(%1$s, %2$s), %3$s), %4$s)", conditions); assertEquals(expected, p.toString()); @@ -1279,7 +1272,7 @@ public class TestConvertAstToSearchArg { "eq(last_name, Binary{\"smith\"})" /* 'smith' = last_name */ }; - FilterPredicate p = ParquetRecordReaderWrapper.toFilterPredicate(sarg); + FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg); String expected = String.format("and(and(and(%1$s, %2$s), %3$s), %4$s)", conditions); assertEquals(expected, p.toString()); @@ -1500,7 +1493,7 @@ public class TestConvertAstToSearchArg { "or(eq(id, 34), eq(id, 50))" /* id in (34,50) */ }; - FilterPredicate p = ParquetRecordReaderWrapper.toFilterPredicate(sarg); + FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg); String expected = String.format("and(and(%1$s, %2$s), %3$s)", conditions); assertEquals(expected, p.toString()); @@ -1759,7 +1752,7 @@ public class TestConvertAstToSearchArg { List<PredicateLeaf> leaves = sarg.getLeaves(); assertEquals(1, leaves.size()); - FilterPredicate p = ParquetRecordReaderWrapper.toFilterPredicate(sarg); + FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg); String expected = "and(lt(first_name, Binary{\"greg\"}), not(lteq(first_name, Binary{\"david\"})))"; assertEquals(p.toString(), expected); @@ -2239,7 +2232,7 @@ public class TestConvertAstToSearchArg { List<PredicateLeaf> leaves = sarg.getLeaves(); assertEquals(9, leaves.size()); - FilterPredicate p = ParquetRecordReaderWrapper.toFilterPredicate(sarg); + FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg); String expected = "and(and(and(and(and(and(and(and(and(and(and(and(and(and(and(and(and(" + "or(or(or(lt(id, 18), lt(id, 10)), lt(id, 13)), lt(id, 16)), " + "or(or(or(lt(id, 18), lt(id, 11)), lt(id, 13)), lt(id, 16))), " + @@ -2395,7 +2388,7 @@ public class TestConvertAstToSearchArg { List<PredicateLeaf> leaves = sarg.getLeaves(); assertEquals(0, leaves.size()); - FilterPredicate p = ParquetRecordReaderWrapper.toFilterPredicate(sarg); + FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg); assertNull(p); assertEquals("YES_NO_NULL", @@ -2650,7 +2643,7 @@ public class TestConvertAstToSearchArg { List<PredicateLeaf> leaves = sarg.getLeaves(); assertEquals(1, leaves.size()); - FilterPredicate p = ParquetRecordReaderWrapper.toFilterPredicate(sarg); + FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg); String expected = "and(not(lt(id, 10)), not(lt(id, 10)))"; assertEquals(expected, p.toString()); http://git-wip-us.apache.org/repos/asf/hive/blob/724b3193/ql/src/test/queries/clientpositive/parquet_predicate_pushdown.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/parquet_predicate_pushdown.q b/ql/src/test/queries/clientpositive/parquet_predicate_pushdown.q new file mode 100644 index 0000000..08af84f --- /dev/null +++ b/ql/src/test/queries/clientpositive/parquet_predicate_pushdown.q @@ -0,0 +1,9 @@ +SET hive.optimize.index.filter=true; +SET hive.optimize.ppd=true; + +-- Test predicate with partitioned columns +CREATE TABLE part1 (id int, content string) PARTITIONED BY (p string) STORED AS PARQUET; +ALTER TABLE part1 ADD PARTITION (p='p1'); +INSERT INTO TABLE part1 PARTITION (p='p1') VALUES (1, 'a'), (2, 'b'); +SELECT * FROM part1 WHERE p='p1'; +DROP TABLE part1 PURGE; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/724b3193/ql/src/test/results/clientpositive/parquet_predicate_pushdown.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/parquet_predicate_pushdown.q.out b/ql/src/test/results/clientpositive/parquet_predicate_pushdown.q.out new file mode 100644 index 0000000..4186618 --- /dev/null +++ b/ql/src/test/results/clientpositive/parquet_predicate_pushdown.q.out @@ -0,0 +1,47 @@ +PREHOOK: query: -- Test predicate with partitioned columns +CREATE TABLE part1 (id int, content string) PARTITIONED BY (p string) STORED AS PARQUET +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@part1 +POSTHOOK: query: -- Test predicate with partitioned columns +CREATE TABLE part1 (id int, content string) PARTITIONED BY (p string) STORED AS PARQUET +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@part1 +PREHOOK: query: ALTER TABLE part1 ADD PARTITION (p='p1') +PREHOOK: type: ALTERTABLE_ADDPARTS +PREHOOK: Output: default@part1 +POSTHOOK: query: ALTER TABLE part1 ADD PARTITION (p='p1') +POSTHOOK: type: ALTERTABLE_ADDPARTS +POSTHOOK: Output: default@part1 +POSTHOOK: Output: default@part1@p=p1 +PREHOOK: query: INSERT INTO TABLE part1 PARTITION (p='p1') VALUES (1, 'a'), (2, 'b') +PREHOOK: type: QUERY +PREHOOK: Input: default@values__tmp__table__1 +PREHOOK: Output: default@part1@p=p1 +POSTHOOK: query: INSERT INTO TABLE part1 PARTITION (p='p1') VALUES (1, 'a'), (2, 'b') +POSTHOOK: type: QUERY +POSTHOOK: Input: default@values__tmp__table__1 +POSTHOOK: Output: default@part1@p=p1 +POSTHOOK: Lineage: part1 PARTITION(p=p1).content SIMPLE [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +POSTHOOK: Lineage: part1 PARTITION(p=p1).id EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +PREHOOK: query: SELECT * FROM part1 WHERE p='p1' +PREHOOK: type: QUERY +PREHOOK: Input: default@part1 +PREHOOK: Input: default@part1@p=p1 +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM part1 WHERE p='p1' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@part1 +POSTHOOK: Input: default@part1@p=p1 +#### A masked pattern was here #### +1 a p1 +2 b p1 +PREHOOK: query: DROP TABLE part1 PURGE +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@part1 +PREHOOK: Output: default@part1 +POSTHOOK: query: DROP TABLE part1 PURGE +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@part1 +POSTHOOK: Output: default@part1
