IMPALA-3654: Parquet stats filtering for IN predicate This generates min/max predicates for InPredicates that have only constant values in the IN list. It is only used for statistics filtering on Parquet files.
Change-Id: I4a88963a7206f40a867e49eceeaf03fdd4f71997 Reviewed-on: http://gerrit.cloudera.org:8080/6810 Reviewed-by: Alex Behm <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/aa05c649 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/aa05c649 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/aa05c649 Branch: refs/heads/master Commit: aa05c6493b0ff8bbf422a4c38cf780bde34d51c7 Parents: c26a485 Author: Joe McDonnell <[email protected]> Authored: Fri Apr 14 11:59:08 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Sat May 6 03:40:57 2017 +0000 ---------------------------------------------------------------------- .../org/apache/impala/planner/HdfsScanNode.java | 110 +++++++++++++------ .../queries/PlannerTest/parquet-filtering.test | 39 ++++++- .../queries/QueryTest/parquet_stats.test | 17 +++ 3 files changed, 130 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/aa05c649/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java index 2a9c84c..bd260e0 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java @@ -32,6 +32,10 @@ import org.apache.impala.analysis.Analyzer; import org.apache.impala.analysis.BinaryPredicate; import org.apache.impala.analysis.DescriptorTable; import org.apache.impala.analysis.Expr; +import org.apache.impala.analysis.FunctionCallExpr; +import org.apache.impala.analysis.InPredicate; +import org.apache.impala.analysis.LiteralExpr; +import org.apache.impala.analysis.NullLiteral; import org.apache.impala.analysis.SlotDescriptor; import org.apache.impala.analysis.SlotId; import org.apache.impala.analysis.SlotRef; @@ -326,6 +330,77 @@ public class HdfsScanNode extends ScanNode { minMaxConjuncts_.add(statsPred); } + private void tryComputeBinaryMinMaxPredicate(Analyzer analyzer, + BinaryPredicate binaryPred) { + // We only support slot refs on the left hand side of the predicate, a rewriting + // rule makes sure that all compatible exprs are rewritten into this form. Only + // implicit casts are supported. + SlotRef slot = binaryPred.getChild(0).unwrapSlotRef(true); + if (slot == null) return; + + // This node is a table scan, so this must be a scanning slot. + Preconditions.checkState(slot.getDesc().isScanSlot()); + // If the column is null, then this can be a 'pos' scanning slot of a nested type. + if (slot.getDesc().getColumn() == null) return; + + Expr constExpr = binaryPred.getChild(1); + // Only constant exprs can be evaluated against parquet::Statistics. This includes + // LiteralExpr, but can also be an expr like "1 + 2". + if (!constExpr.isConstant()) return; + if (constExpr.isNullLiteral()) return; + + BinaryPredicate.Operator op = binaryPred.getOp(); + if (op == BinaryPredicate.Operator.LT || op == BinaryPredicate.Operator.LE || + op == BinaryPredicate.Operator.GE || op == BinaryPredicate.Operator.GT) { + minMaxOriginalConjuncts_.add(binaryPred); + buildStatsPredicate(analyzer, slot, binaryPred, op); + } else if (op == BinaryPredicate.Operator.EQ) { + minMaxOriginalConjuncts_.add(binaryPred); + // TODO: this could be optimized for boolean columns. + buildStatsPredicate(analyzer, slot, binaryPred, BinaryPredicate.Operator.LE); + buildStatsPredicate(analyzer, slot, binaryPred, BinaryPredicate.Operator.GE); + } + } + + private void tryComputeInListMinMaxPredicate(Analyzer analyzer, InPredicate inPred) { + // Retrieve the left side of the IN predicate. It must be a simple slot to + // proceed. + SlotRef slot = inPred.getBoundSlot(); + if (slot == null) return; + // This node is a table scan, so this must be a scanning slot. + Preconditions.checkState(slot.getDesc().isScanSlot()); + // If the column is null, then this can be a 'pos' scanning slot of a nested type. + if (slot.getDesc().getColumn() == null) return; + if (inPred.isNotIn()) return; + + ArrayList<Expr> children = inPred.getChildren(); + LiteralExpr min = null; + LiteralExpr max = null; + for (int i = 1; i < children.size(); ++i) { + Expr child = children.get(i); + + // If any child is not a literal, then nothing can be done + if (!child.isLiteral()) return; + LiteralExpr literalChild = (LiteralExpr) child; + // If any child is NULL, then there is not a valid min/max. Nothing can be done. + if (literalChild instanceof NullLiteral) return; + + if (min == null || literalChild.compareTo(min) < 0) min = literalChild; + if (max == null || literalChild.compareTo(max) > 0) max = literalChild; + } + Preconditions.checkState(min != null); + Preconditions.checkState(max != null); + + BinaryPredicate minBound = new BinaryPredicate(BinaryPredicate.Operator.GE, + children.get(0).clone(), min.clone()); + BinaryPredicate maxBound = new BinaryPredicate(BinaryPredicate.Operator.LE, + children.get(0).clone(), max.clone()); + + minMaxOriginalConjuncts_.add(inPred); + buildStatsPredicate(analyzer, slot, minBound, minBound.getOp()); + buildStatsPredicate(analyzer, slot, maxBound, maxBound.getOp()); + } + /** * Analyzes 'conjuncts_', populates 'minMaxTuple_' with slots for statistics values, and * populates 'minMaxConjuncts_' with conjuncts pointing into the 'minMaxTuple_'. Only @@ -340,38 +415,11 @@ public class HdfsScanNode extends ScanNode { minMaxTuple_.setPath(desc_.getPath()); for (Expr pred: conjuncts_) { - if (!(pred instanceof BinaryPredicate)) continue; - BinaryPredicate binaryPred = (BinaryPredicate) pred; - - // We only support slot refs on the left hand side of the predicate, a rewriting - // rule makes sure that all compatible exprs are rewritten into this form. Only - // implicit casts are supported. - SlotRef slot = binaryPred.getChild(0).unwrapSlotRef(true); - if (slot == null) continue; - - // This node is a table scan, so this must be a scanning slot. - Preconditions.checkState(slot.getDesc().isScanSlot()); - // If the column is null, then this can be a 'pos' scanning slot of a nested type. - if (slot.getDesc().getColumn() == null) continue; - - Expr constExpr = binaryPred.getChild(1); - // Only constant exprs can be evaluated against parquet::Statistics. This includes - // LiteralExpr, but can also be an expr like "1 + 2". - if (!constExpr.isConstant()) continue; - if (constExpr.isNullLiteral()) continue; - - BinaryPredicate.Operator op = binaryPred.getOp(); - if (op == BinaryPredicate.Operator.LT || op == BinaryPredicate.Operator.LE || - op == BinaryPredicate.Operator.GE || op == BinaryPredicate.Operator.GT) { - minMaxOriginalConjuncts_.add(pred); - buildStatsPredicate(analyzer, slot, binaryPred, op); - } else if (op == BinaryPredicate.Operator.EQ) { - minMaxOriginalConjuncts_.add(pred); - // TODO: this could be optimized for boolean columns. - buildStatsPredicate(analyzer, slot, binaryPred, BinaryPredicate.Operator.LE); - buildStatsPredicate(analyzer, slot, binaryPred, BinaryPredicate.Operator.GE); + if (pred instanceof BinaryPredicate) { + tryComputeBinaryMinMaxPredicate(analyzer, (BinaryPredicate) pred); + } else if (pred instanceof InPredicate) { + tryComputeInListMinMaxPredicate(analyzer, (InPredicate) pred); } - } minMaxTuple_.computeMemLayout(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/aa05c649/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test index df5f99d..31451aa 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test @@ -28,9 +28,10 @@ PLAN-ROOT SINK ==== # Test a variety of types select count(*) from functional_parquet.alltypes -where id = 1 and bool_col and tinyint_col < 50 and smallint_col > 50 +where id = 1 and bool_col and tinyint_col < 50 and smallint_col in (1,2,3,4,5) and mod(int_col,2) = 1 and bigint_col < 5000 and float_col > 50.00 -and double_col > 100.00 and date_string_col > '1993-10-01' and string_col > 'aaaa' +and double_col > 100.00 and date_string_col > '1993-10-01' +and string_col in ('aaaa', 'bbbb', 'cccc') and timestamp_cmp(timestamp_col, '2016-11-20 00:00:00') = 1 and year > 2000 and month < 12; ---- PLAN @@ -45,11 +46,39 @@ PLAN-ROOT SINK | 00:SCAN HDFS [functional_parquet.alltypes] partitions=22/24 files=22 size=143.36KB - predicates: bool_col, bigint_col < 5000, double_col > 100.00, float_col > 50.00, id = 1, smallint_col > 50, tinyint_col < 50, string_col > 'aaaa', mod(int_col, 2) = 1, timestamp_cmp(timestamp_col, TIMESTAMP '2016-11-20 00:00:00') = 1, date_string_col > '1993-10-01' + predicates: bool_col, bigint_col < 5000, double_col > 100.00, float_col > 50.00, id = 1, tinyint_col < 50, string_col IN ('aaaa', 'bbbb', 'cccc'), smallint_col IN (1, 2, 3, 4, 5), mod(int_col, 2) = 1, timestamp_cmp(timestamp_col, TIMESTAMP '2016-11-20 00:00:00') = 1, date_string_col > '1993-10-01' table stats: unavailable columns missing stats: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col - parquet statistics predicates: bigint_col < 5000, double_col > 100.00, float_col > 50.00, id = 1, smallint_col > 50, tinyint_col < 50, string_col > 'aaaa', date_string_col > '1993-10-01' - parquet dictionary predicates: bool_col, bigint_col < 5000, double_col > 100.00, float_col > 50.00, id = 1, smallint_col > 50, tinyint_col < 50, string_col > 'aaaa', mod(int_col, 2) = 1, timestamp_cmp(timestamp_col, TIMESTAMP '2016-11-20 00:00:00') = 1, date_string_col > '1993-10-01' + parquet statistics predicates: bigint_col < 5000, double_col > 100.00, float_col > 50.00, id = 1, tinyint_col < 50, string_col IN ('aaaa', 'bbbb', 'cccc'), smallint_col IN (1, 2, 3, 4, 5), date_string_col > '1993-10-01' + parquet dictionary predicates: bool_col, bigint_col < 5000, double_col > 100.00, float_col > 50.00, id = 1, tinyint_col < 50, string_col IN ('aaaa', 'bbbb', 'cccc'), smallint_col IN (1, 2, 3, 4, 5), mod(int_col, 2) = 1, timestamp_cmp(timestamp_col, TIMESTAMP '2016-11-20 00:00:00') = 1, date_string_col > '1993-10-01' mem-estimate=128.00MB mem-reservation=0B tuple-ids=0 row-size=80B cardinality=unavailable ==== +# Test negative cases for IN predicate min/max filtering +# - NOT IN +# - IN list with NULL +# - IN list contains non-Literals +# - complex expression on left side of IN +select count(*) from functional_parquet.alltypes +where id NOT IN (0,1,2) and string_col IN ('aaaa', 'bbbb', 'cccc', NULL) +and mod(int_col,50) IN (0,1) +and id IN (int_col); +---- PLAN +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 + PLAN-ROOT SINK + | mem-estimate=0B mem-reservation=0B + | + 01:AGGREGATE [FINALIZE] + | output: count(*) + | mem-estimate=10.00MB mem-reservation=0B + | tuple-ids=1 row-size=8B cardinality=1 + | + 00:SCAN HDFS [functional_parquet.alltypes] + partitions=24/24 files=24 size=173.09KB + predicates: id IN (int_col), id NOT IN (0, 1, 2), string_col IN ('aaaa', 'bbbb', 'cccc', NULL), mod(int_col, 50) IN (0, 1) + table stats: unavailable + column stats: unavailable + parquet dictionary predicates: id NOT IN (0, 1, 2), string_col IN ('aaaa', 'bbbb', 'cccc', NULL), mod(int_col, 50) IN (0, 1) + mem-estimate=48.00MB mem-reservation=0B + tuple-ids=0 row-size=24B cardinality=unavailable +==== \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/aa05c649/testdata/workloads/functional-query/queries/QueryTest/parquet_stats.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet_stats.test b/testdata/workloads/functional-query/queries/QueryTest/parquet_stats.test index 6f9393d..f88d1df 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/parquet_stats.test +++ b/testdata/workloads/functional-query/queries/QueryTest/parquet_stats.test @@ -279,3 +279,20 @@ select count(*) from functional_parquet.complextypestbl.int_array where pos < 5; row_regex: .*NumRowGroups: 2 .* row_regex: .*NumStatsFilteredRowGroups: 0 .* ==== +---- QUERY +# Test the conversion of constant IN lists to min/max predicats +select count(*) from functional_parquet.alltypes where int_col in (-1,-2,-3,-4); +---- RESULTS +0 +---- RUNTIME_PROFILE +aggregation(SUM, NumRowGroups): 24 +aggregation(SUM, NumStatsFilteredRowGroups): 24 +==== +---- QUERY +select count(*) from functional_parquet.alltypes where id IN (1,25,49); +---- RESULTS +3 +---- RUNTIME_PROFILE +aggregation(SUM, NumRowGroups): 24 +aggregation(SUM, NumStatsFilteredRowGroups): 23 +==== \ No newline at end of file
