Repository: incubator-impala Updated Branches: refs/heads/master 157da298d -> 13837262b
IMPALA-2328: Address additional comments - test_parquet_stats.py was missing and the tests weren't run during GVO. - The tests in parquet_stats.test assume that the queries were executed in a single fragment, so they now run with 'num_nodes = 1'. - Parquet columns are now resolved correctly. - Parquet files with missing columns are now handled correctly. - Predicates with implicit casts can now be evaluated against parquet::Statistics. - This change also cleans up some old friend declarations I came across. Change-Id: I54c205fad7afc4a0b0a7d0f654859de76db29a02 Reviewed-on: http://gerrit.cloudera.org:8080/6147 Reviewed-by: Lars Volker <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/996fb5ea Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/996fb5ea Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/996fb5ea Branch: refs/heads/master Commit: 996fb5eab3e9cefc738afbc8171b901ad116dfb8 Parents: 157da29 Author: Lars Volker <[email protected]> Authored: Thu Feb 23 22:26:32 2017 -0800 Committer: Impala Public Jenkins <[email protected]> Committed: Fri Mar 3 02:34:10 2017 +0000 ---------------------------------------------------------------------- be/src/exec/hdfs-parquet-scanner.cc | 41 ++++++++++++---- be/src/exprs/case-expr.h | 1 - be/src/exprs/expr.h | 4 -- .../org/apache/impala/planner/HdfsScanNode.java | 9 ++-- .../queries/QueryTest/parquet_stats.test | 50 ++++++++++++++++++++ tests/query_test/test_parquet_stats.py | 41 ++++++++++++++++ 6 files changed, 129 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/996fb5ea/be/src/exec/hdfs-parquet-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc index 2b0454e..ab40713 100644 --- a/be/src/exec/hdfs-parquet-scanner.cc +++ b/be/src/exec/hdfs-parquet-scanner.cc @@ -193,7 +193,7 @@ Status HdfsParquetScanner::Open(ScannerContext* context) { // Allocate tuple buffer to evaluate conjuncts on parquet::Statistics. const TupleDescriptor* min_max_tuple_desc = scan_node_->min_max_tuple_desc(); - if (min_max_tuple_desc) { + if (min_max_tuple_desc != nullptr) { int64_t tuple_size = min_max_tuple_desc->byte_size(); if (!min_max_tuple_buffer_.TryAllocate(tuple_size)) { return Status(Substitute("Could not allocate buffer of $0 bytes for Parquet " @@ -484,16 +484,39 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts(const parquet::RowGroup& row_g Tuple* min_max_tuple = reinterpret_cast<Tuple*>(min_max_tuple_buffer_.buffer()); min_max_tuple->Init(tuple_size); - DCHECK(min_max_tuple_desc->slots().size() == min_max_conjuncts_ctxs_.size()); + DCHECK_EQ(min_max_tuple_desc->slots().size(), min_max_conjuncts_ctxs_.size()); min_max_conjuncts_ctxs_to_eval_.clear(); for (int i = 0; i < min_max_conjuncts_ctxs_.size(); ++i) { SlotDescriptor* slot_desc = min_max_tuple_desc->slots()[i]; ExprContext* conjunct = min_max_conjuncts_ctxs_[i]; - Expr* e = conjunct->root(); - DCHECK(e->GetChild(0)->is_slotref()); - int col_idx = slot_desc->col_pos() - scan_node_->num_partition_keys(); + // Resolve column path to determine col idx. + SchemaNode* node = nullptr; + bool pos_field; + bool missing_field; + RETURN_IF_ERROR(schema_resolver_->ResolvePath(slot_desc->col_path(), + &node, &pos_field, &missing_field)); + + if (missing_field) { + // We are selecting a column that is not in the file. We would set its slot to NULL + // during the scan, so any predicate would evaluate to false. Return early. NULL + // comparisons cannot happen here, since predicates with NULL literals are filtered + // in the frontend. + *skip_row_group = true; + return Status::OK(); + } + + if (pos_field) { + // The planner should not send predicates with 'pos' for stats filtering to the BE. + // In case there is a bug, we return an error, which will abort the query. + stringstream err; + err << "Statistics not supported for pos fields: " << slot_desc->DebugString(); + DCHECK(false) << err.str(); + return Status(err.str()); + } + + int col_idx = node->col_idx; DCHECK(col_idx < row_group.columns.size()); if (!ParquetMetadataUtils::HasRowGroupStats(row_group, col_idx)) continue; @@ -503,17 +526,17 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts(const parquet::RowGroup& row_g void* slot = min_max_tuple->GetSlot(slot_desc->tuple_offset()); const ColumnType& col_type = slot_desc->type(); - if (e->function_name() == "lt" || e->function_name() == "le") { + const string& fn_name = conjunct->root()->function_name(); + if (fn_name == "lt" || fn_name == "le") { // We need to get min stats. stats_read = ColumnStatsBase::ReadFromThrift(stats, col_type, ColumnStatsBase::StatsField::MIN, slot); - } else if (e->function_name() == "gt" || e->function_name() == "ge") { + } else if (fn_name == "gt" || fn_name == "ge") { // We need to get max stats. stats_read = ColumnStatsBase::ReadFromThrift(stats, col_type, ColumnStatsBase::StatsField::MAX, slot); } else { - DCHECK(false) << "Unsupported function name for statistics evaluation: " - << e->function_name(); + DCHECK(false) << "Unsupported function name for statistics evaluation: " << fn_name; } if (stats_read) min_max_conjuncts_ctxs_to_eval_.push_back(conjunct); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/996fb5ea/be/src/exprs/case-expr.h ---------------------------------------------------------------------- diff --git a/be/src/exprs/case-expr.h b/be/src/exprs/case-expr.h index 87bab76..a59f5c2 100644 --- a/be/src/exprs/case-expr.h +++ b/be/src/exprs/case-expr.h @@ -45,7 +45,6 @@ class CaseExpr: public Expr { protected: friend class Expr; - friend class ComputeFunctions; friend class ConditionalFunctions; friend class DecimalOperators; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/996fb5ea/be/src/exprs/expr.h ---------------------------------------------------------------------- diff --git a/be/src/exprs/expr.h b/be/src/exprs/expr.h index 8f14d6d..8919f59 100644 --- a/be/src/exprs/expr.h +++ b/be/src/exprs/expr.h @@ -255,10 +255,7 @@ class Expr { protected: friend class AggFnEvaluator; - friend class CastExpr; - friend class ComputeFunctions; friend class DecimalFunctions; - friend class DecimalLliteral; friend class DecimalOperators; friend class MathFunctions; friend class StringFunctions; @@ -267,7 +264,6 @@ class Expr { friend class UtilityFunctions; friend class CaseExpr; friend class InPredicate; - friend class FunctionCall; friend class ScalarFnCall; Expr(const ColumnType& type, bool is_constant, bool is_slotref); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/996fb5ea/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java index c5c552b..b1cd86a 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java @@ -336,12 +336,15 @@ public class HdfsScanNode extends ScanNode { BinaryPredicate binaryPred = (BinaryPredicate) pred; // We only support slot refs on the left hand side of the predicate, a rewriting - // rule makes sure that all compatible exprs are rewritten into this form. - if (!(binaryPred.getChild(0) instanceof SlotRef)) continue; - SlotRef slot = (SlotRef) binaryPred.getChild(0); + // rule makes sure that all compatible exprs are rewritten into this form. Only + // implicit casts are supported. + SlotRef slot = binaryPred.getChild(0).unwrapSlotRef(true); + if (slot == null) continue; // This node is a table scan, so this must be a scanning slot. Preconditions.checkState(slot.getDesc().isScanSlot()); + // If the column is null, then this can be a 'pos' scanning slot of a nested type. + if (slot.getDesc().getColumn() == null) continue; Expr constExpr = binaryPred.getChild(1); // Only constant exprs can be evaluated against parquet::Statistics. This includes http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/996fb5ea/testdata/workloads/functional-query/queries/QueryTest/parquet_stats.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet_stats.test b/testdata/workloads/functional-query/queries/QueryTest/parquet_stats.test index 327fba6..6f9393d 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/parquet_stats.test +++ b/testdata/workloads/functional-query/queries/QueryTest/parquet_stats.test @@ -229,3 +229,53 @@ select id, bool_col from functional_parquet.alltypessmall where 5 + 5 < int_col row_regex: .*NumRowGroups: 4 .* row_regex: .*NumStatsFilteredRowGroups: 4 .* ==== +---- QUERY +# Test name based column resolution +create table name_resolve stored as parquet as select * from functional_parquet.alltypessmall; +alter table name_resolve replace columns (int_col int, bool_col boolean, tinyint_col tinyint, smallint_col smallint, id int); +set parquet_fallback_schema_resolution=NAME; +# If this picks up the stats from int_col, then it will filter all row groups and return +# an incorrect result. +select count(*) from name_resolve where id > 10; +---- RESULTS +89 +---- RUNTIME_PROFILE +row_regex: .*NumRowGroups: 1 .* +row_regex: .*NumStatsFilteredRowGroups: 0 .* +==== +---- QUERY +# Query that has an implicit cast to a larger integer type +select count(*) from functional_parquet.alltypessmall where tinyint_col > 1000000000000 +---- RESULTS +0 +---- RUNTIME_PROFILE +row_regex: .*NumRowGroups: 4 .* +row_regex: .*NumStatsFilteredRowGroups: 4 .* +==== +---- QUERY +# Predicates with explicit casts are not supported when evaluating parquet::Statistics. +select count(*) from functional_parquet.alltypessmall where '0' > cast(tinyint_col as string) +---- RESULTS +0 +---- RUNTIME_PROFILE +row_regex: .*NumRowGroups: 4 .* +row_regex: .*NumStatsFilteredRowGroups: 0 .* +==== +---- QUERY +# Explicit casts between numerical types can violate the transitivity of "min()", so they +# are not supported when evaluating parquet::Statistics. +select count(*) from functional_parquet.alltypes where cast(id as tinyint) < 10; +---- RESULTS +3878 +---- RUNTIME_PROFILE +row_regex: .*NumRowGroups: 24 .* +row_regex: .*NumStatsFilteredRowGroups: 0 .* +==== +---- QUERY +select count(*) from functional_parquet.complextypestbl.int_array where pos < 5; +---- RESULTS +9 +---- RUNTIME_PROFILE +row_regex: .*NumRowGroups: 2 .* +row_regex: .*NumStatsFilteredRowGroups: 0 .* +==== http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/996fb5ea/tests/query_test/test_parquet_stats.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_parquet_stats.py b/tests/query_test/test_parquet_stats.py new file mode 100644 index 0000000..502301a --- /dev/null +++ b/tests/query_test/test_parquet_stats.py @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pytest + +from tests.common.impala_test_suite import ImpalaTestSuite + +class TestParquetStats(ImpalaTestSuite): + """ + This suite tests runtime optimizations based on Parquet statistics. + """ + + @classmethod + def get_workload(cls): + return 'functional-query' + + @classmethod + def add_test_dimensions(cls): + super(TestParquetStats, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_constraint( + lambda v: v.get_value('table_format').file_format == 'parquet') + + def test_parquet_stats(self, vector, unique_database): + # The test makes assumptions about the number of row groups that are processed and + # skipped inside a fragment, so we ensure that the tests run in a single fragment. + vector.get_value('exec_option')['num_nodes'] = 1 + self.run_test_case('QueryTest/parquet_stats', vector, use_db=unique_database)
