IMPALA-4285/IMPALA-4286: Fixes for Parquet scanner with MT_DOP > 0. IMPALA-4258: The problem was that there was a reference to HdfsScanner::batch_ hidden inside WriteEmptyTuples(). The batch_ reference is NULL when the scanner is run with MT_DOP > 1.
IMPALA-4286: When there are no scan ranges HdfsScanNodeBase::Open() exits early without initializing the reader context. This lead to a DCHECK in IoMgr::GetNextRange() called from HdfsScanNodeMt. The fix is to remove that unnecessary short-circuit Open(). I combined these two bugfixes because the new basic test covers both cases. Testing: Added a new test_mt_dop.py test. A private code/hdfs run passed. Change-Id: I79c0f6fd2aeb4bc6fa5f87219a485194fef2db1b Reviewed-on: http://gerrit.cloudera.org:8080/4767 Reviewed-by: Alex Behm <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ff6b450a Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ff6b450a Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ff6b450a Branch: refs/heads/master Commit: ff6b450ad380ce840e18875a89d9cf98058277a3 Parents: 51268c0 Author: Alex Behm <[email protected]> Authored: Wed Oct 19 23:27:14 2016 -0700 Committer: Internal Jenkins <[email protected]> Committed: Sat Oct 22 10:24:24 2016 +0000 ---------------------------------------------------------------------- be/src/exec/hdfs-avro-scanner.cc | 2 +- be/src/exec/hdfs-parquet-scanner.cc | 2 +- be/src/exec/hdfs-rcfile-scanner.cc | 2 +- be/src/exec/hdfs-scan-node-base.cc | 2 - be/src/exec/hdfs-scanner.cc | 64 ++------------------ be/src/exec/hdfs-scanner.h | 14 ++--- be/src/exec/hdfs-sequence-scanner.cc | 4 +- be/src/exec/hdfs-text-scanner.cc | 2 +- .../queries/QueryTest/mt-dop.test | 9 +++ tests/query_test/test_mt_dop.py | 47 ++++++++++++++ 10 files changed, 73 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/be/src/exec/hdfs-avro-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-avro-scanner.cc b/be/src/exec/hdfs-avro-scanner.cc index 88d6d3a..91a9d03 100644 --- a/be/src/exec/hdfs-avro-scanner.cc +++ b/be/src/exec/hdfs-avro-scanner.cc @@ -538,7 +538,7 @@ Status HdfsAvroScanner::ProcessRange() { int num_to_commit; if (scan_node_->materialized_slots().empty()) { // No slots to materialize (e.g. count(*)), no need to decode data - num_to_commit = WriteEmptyTuples(context_, tuple_row, max_tuples); + num_to_commit = WriteTemplateTuples(tuple_row, max_tuples); } else { if (codegend_decode_avro_data_ != NULL) { num_to_commit = codegend_decode_avro_data_(this, max_tuples, pool, &data, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/be/src/exec/hdfs-parquet-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc index e91a7ec..542f4cc 100644 --- a/be/src/exec/hdfs-parquet-scanner.cc +++ b/be/src/exec/hdfs-parquet-scanner.cc @@ -339,7 +339,7 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) { int rows_remaining = file_metadata_.num_rows - row_group_rows_read_; int max_tuples = min(row_batch->capacity(), rows_remaining); TupleRow* current_row = row_batch->GetRow(row_batch->AddRow()); - int num_to_commit = WriteEmptyTuples(context_, current_row, max_tuples); + int num_to_commit = WriteTemplateTuples(current_row, max_tuples); Status status = CommitRows(row_batch, num_to_commit); assemble_rows_timer_.Stop(); RETURN_IF_ERROR(status); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/be/src/exec/hdfs-rcfile-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-rcfile-scanner.cc b/be/src/exec/hdfs-rcfile-scanner.cc index f43b2aa..012a424 100644 --- a/be/src/exec/hdfs-rcfile-scanner.cc +++ b/be/src/exec/hdfs-rcfile-scanner.cc @@ -485,7 +485,7 @@ Status HdfsRCFileScanner::ProcessRange() { // If there are no materialized slots (e.g. count(*) or just partition cols) // we can shortcircuit the parse loop row_pos_ += max_tuples; - int num_to_commit = WriteEmptyTuples(context_, current_row, max_tuples); + int num_to_commit = WriteTemplateTuples(current_row, max_tuples); COUNTER_ADD(scan_node_->rows_read_counter(), max_tuples); RETURN_IF_ERROR(CommitRows(num_to_commit)); continue; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/be/src/exec/hdfs-scan-node-base.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc index cf6708c..957338d 100644 --- a/be/src/exec/hdfs-scan-node-base.cc +++ b/be/src/exec/hdfs-scan-node-base.cc @@ -351,8 +351,6 @@ void HdfsScanNodeBase::Codegen(RuntimeState* state) { Status HdfsScanNodeBase::Open(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::Open(state)); - if (file_descs_.empty()) return Status::OK(); - // Open collection conjuncts for (const auto& entry: conjuncts_map_) { // conjuncts_ are already opened in ExecNode::Open() http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/be/src/exec/hdfs-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc index 0b6e8c5..3885522 100644 --- a/be/src/exec/hdfs-scanner.cc +++ b/be/src/exec/hdfs-scanner.cc @@ -211,66 +211,14 @@ Status HdfsScanner::CommitRows(int num_rows) { return Status::OK(); } -// In this code path, no slots were materialized from the input files. The only -// slots are from partition keys. This lets us simplify writing out the batches. -// 1. template_tuple_ is the complete tuple. -// 2. Eval conjuncts against the tuple. -// 3. If it passes, stamp out 'num_tuples' copies of it into the row_batch. -int HdfsScanner::WriteEmptyTuples(RowBatch* row_batch, int num_tuples) { - DCHECK_GT(num_tuples, 0); - - if (template_tuple_ == NULL) { - // No slots from partitions keys or slots. This is count(*). Just add the - // number of rows to the batch. - row_batch->AddRows(num_tuples); - row_batch->CommitRows(num_tuples); - } else { - // Make a row and evaluate the row - int row_idx = row_batch->AddRow(); - - TupleRow* current_row = row_batch->GetRow(row_idx); - current_row->SetTuple(scan_node_->tuple_idx(), template_tuple_); - if (!EvalConjuncts(current_row)) return 0; - // Add first tuple - row_batch->CommitLastRow(); - --num_tuples; - - DCHECK_LE(num_tuples, row_batch->capacity() - row_batch->num_rows()); - - for (int n = 0; n < num_tuples; ++n) { - DCHECK(!row_batch->AtCapacity()); - TupleRow* current_row = row_batch->GetRow(row_batch->AddRow()); - current_row->SetTuple(scan_node_->tuple_idx(), template_tuple_); - row_batch->CommitLastRow(); - } - } - return num_tuples; -} - -// In this code path, no slots were materialized from the input files. The only -// slots are from partition keys. This lets us simplify writing out the batches. -// 1. template_tuple_ is the complete tuple. -// 2. Eval conjuncts against the tuple. -// 3. If it passes, stamp out 'num_tuples' copies of it into the row_batch. -int HdfsScanner::WriteEmptyTuples(ScannerContext* context, - TupleRow* row, int num_tuples) { +int HdfsScanner::WriteTemplateTuples(TupleRow* row, int num_tuples) { DCHECK_GE(num_tuples, 0); - if (num_tuples == 0) return 0; - - if (template_tuple_ == NULL) { - // Must be conjuncts on constant exprs. - if (!EvalConjuncts(row)) return 0; - return num_tuples; - } else { - row->SetTuple(scan_node_->tuple_idx(), template_tuple_); - if (!EvalConjuncts(row)) return 0; - row = next_row(row); + DCHECK_EQ(scan_node_->tuple_idx(), 0); + DCHECK_EQ(scanner_conjunct_ctxs_->size(), 0); + if (num_tuples == 0 || template_tuple_ == NULL) return num_tuples; - for (int n = 1; n < num_tuples; ++n) { - row->SetTuple(scan_node_->tuple_idx(), template_tuple_); - row = next_row(row); - } - } + Tuple** row_tuple = reinterpret_cast<Tuple**>(row); + for (int i = 0; i < num_tuples; ++i) row_tuple[i] = template_tuple_; return num_tuples; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/be/src/exec/hdfs-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h index 4a4d366..71efd5a 100644 --- a/be/src/exec/hdfs-scanner.h +++ b/be/src/exec/hdfs-scanner.h @@ -347,14 +347,9 @@ class HdfsScanner { scanner_conjunct_ctxs_->size(), row); } - /// Utility method to write out tuples when there are no materialized - /// fields (e.g. select count(*) or only partition keys). - /// num_tuples - Total number of tuples to write out. - /// Returns the number of tuples added to the row batch. - int WriteEmptyTuples(RowBatch* row_batch, int num_tuples); - - /// Write empty tuples and commit them to the context object - int WriteEmptyTuples(ScannerContext* context, TupleRow* tuple_row, int num_tuples); + /// Sets 'num_tuples' template tuples in the batch that 'row' points to. Assumes the + /// 'tuple_row' only has a single tuple. Returns the number of tuples set. + int WriteTemplateTuples(TupleRow* row, int num_tuples); /// Processes batches of fields and writes them out to tuple_row_mem. /// - 'pool' mempool to allocate from for auxiliary tuple memory @@ -455,9 +450,10 @@ class HdfsScanner { return reinterpret_cast<Tuple*>(mem + tuple_byte_size); } + /// Assumes the row only has a single tuple. inline TupleRow* next_row(TupleRow* r) const { uint8_t* mem = reinterpret_cast<uint8_t*>(r); - return reinterpret_cast<TupleRow*>(mem + batch_->row_byte_size()); + return reinterpret_cast<TupleRow*>(mem + sizeof(Tuple*)); } /// Simple wrapper around scanner_conjunct_ctxs_. Used in the codegen'd version of http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/be/src/exec/hdfs-sequence-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc index fd552be..33be362 100644 --- a/be/src/exec/hdfs-sequence-scanner.cc +++ b/be/src/exec/hdfs-sequence-scanner.cc @@ -214,7 +214,7 @@ Status HdfsSequenceScanner::ProcessDecompressedBlock() { if (scan_node_->materialized_slots().empty()) { // Handle case where there are no slots to materialize (e.g. count(*)) - num_to_process = WriteEmptyTuples(context_, tuple_row, num_to_process); + num_to_process = WriteTemplateTuples(tuple_row, num_to_process); COUNTER_ADD(scan_node_->rows_read_counter(), num_to_process); RETURN_IF_ERROR(CommitRows(num_to_process)); return Status::OK(); @@ -334,7 +334,7 @@ Status HdfsSequenceScanner::ProcessRange() { RETURN_IF_ERROR(parse_status_); } } else { - add_row = WriteEmptyTuples(context_, tuple_row_mem, 1); + add_row = WriteTemplateTuples(tuple_row_mem, 1); } COUNTER_ADD(scan_node_->rows_read_counter(), 1); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/be/src/exec/hdfs-text-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc index cc63408..0b048f4 100644 --- a/be/src/exec/hdfs-text-scanner.cc +++ b/be/src/exec/hdfs-text-scanner.cc @@ -400,7 +400,7 @@ Status HdfsTextScanner::ProcessRange(int* num_tuples, bool past_scan_range) { SCOPED_TIMER(scan_node_->materialize_tuple_timer()); // If we are doing count(*) then we return tuples only containing partition keys boundary_row_.Clear(); - num_tuples_materialized = WriteEmptyTuples(context_, tuple_row_mem, *num_tuples); + num_tuples_materialized = WriteTemplateTuples(tuple_row_mem, *num_tuples); } // Save contents that are split across buffers if we are going to return this column http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/testdata/workloads/functional-query/queries/QueryTest/mt-dop.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/mt-dop.test b/testdata/workloads/functional-query/queries/QueryTest/mt-dop.test new file mode 100644 index 0000000..ac453ca --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/mt-dop.test @@ -0,0 +1,9 @@ +==== +---- QUERY +# IMPALA-4285: Test scan with no materialized slots. +select count(*) from alltypes +---- RESULTS +7300 +---- TYPES +BIGINT +==== http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/tests/query_test/test_mt_dop.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_mt_dop.py b/tests/query_test/test_mt_dop.py new file mode 100644 index 0000000..1cd6d31 --- /dev/null +++ b/tests/query_test/test_mt_dop.py @@ -0,0 +1,47 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Tests queries with the MT_DOP query option. + +import pytest + +from copy import deepcopy +from tests.common.impala_test_suite import ImpalaTestSuite +from tests.common.test_vector import TestDimension +from tests.common.test_vector import TestVector + +MT_DOP_VALUES = [1, 2, 8] + +class TestMtDop(ImpalaTestSuite): + @classmethod + def add_test_dimensions(cls): + super(TestMtDop, cls).add_test_dimensions() + cls.TestMatrix.add_dimension(TestDimension('mt_dop', *MT_DOP_VALUES)) + # IMPALA-4332: The MT scheduler does not work for Kudu or HBase tables. + cls.TestMatrix.add_constraint(\ + lambda v: v.get_value('table_format').file_format != 'hbase') + cls.TestMatrix.add_constraint(\ + lambda v: v.get_value('table_format').file_format != 'kudu') + + @classmethod + def get_workload(cls): + return 'functional-query' + + def test_mt_dop(self, vector): + new_vector = deepcopy(vector) + new_vector.get_value('exec_option')['mt_dop'] = vector.get_value('mt_dop') + self.run_test_case('QueryTest/mt-dop', new_vector)
