Repository: incubator-impala Updated Branches: refs/heads/master b222d90bd -> fc4ee65f9
IMPALA-4654: KuduScanner must return when ReachedLimit() Fixes a bug in the KuduScanner where the scan node's limit was not respected and thus the scanner thread would continue executing until the scan range was fully consumed. This could result in completed queries leaving fragments running and those threads could be using significant CPU and memory. For example, the query 'select * from tpch_kudu.lineitem limit 90' when running in the minicluster and lineitem is partitioned into 3 hash partitions would end up leaving a scanner thread running for ~60 seconds. In real world scenarios this can cause unexpected resource consumption. This could build up over time leading to query failures if these queries are submitted frequently. The fix is to ensure KuduScanner::GetNext() returns with eos=true when it finds ReachedLimit=true. An unnecessary and somewhat confusing flag 'batch_done' was being returned by a helper function DecodeRowsIntoRowBatch, which isn't necessary and was removed in order to make it more clear how the code in GetNext() should behave. Change-Id: Iaddd51111a1b2647995d68e6d37d0500b3a322de Reviewed-on: http://gerrit.cloudera.org:8080/5493 Reviewed-by: Alex Behm <[email protected]> Reviewed-by: Tim Armstrong <[email protected]> Reviewed-by: Dan Hecht <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/652e7d56 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/652e7d56 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/652e7d56 Branch: refs/heads/master Commit: 652e7d56d9ac52a8c3d36ca4b04298d4b89897aa Parents: b222d90 Author: Matthew Jacobs <[email protected]> Authored: Tue Dec 13 14:57:01 2016 -0800 Committer: Internal Jenkins <[email protected]> Committed: Wed Dec 14 23:24:47 2016 +0000 ---------------------------------------------------------------------- be/src/exec/kudu-scanner.cc | 25 +++++++------------------ be/src/exec/kudu-scanner.h | 10 ++++++---- tests/query_test/test_kudu.py | 12 ++++++------ 3 files changed, 19 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/652e7d56/be/src/exec/kudu-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-scanner.cc b/be/src/exec/kudu-scanner.cc index 55975e0..9ec5201 100644 --- a/be/src/exec/kudu-scanner.cc +++ b/be/src/exec/kudu-scanner.cc @@ -105,12 +105,11 @@ Status KuduScanner::GetNext(RowBatch* row_batch, bool* eos) { RETURN_IF_CANCELLED(state_); if (cur_kudu_batch_num_read_ < cur_kudu_batch_.NumRows()) { - bool batch_done; - RETURN_IF_ERROR(DecodeRowsIntoRowBatch(row_batch, &tuple, &batch_done)); - if (batch_done) break; + RETURN_IF_ERROR(DecodeRowsIntoRowBatch(row_batch, &tuple)); + if (row_batch->AtCapacity()) break; } - if (scanner_->HasMoreRows()) { + if (scanner_->HasMoreRows() && !scan_node_->ReachedLimit()) { RETURN_IF_ERROR(GetNextScannerBatch()); continue; } @@ -161,26 +160,19 @@ void KuduScanner::CloseCurrentClientScanner() { scanner_.reset(); } -Status KuduScanner::HandleEmptyProjection(RowBatch* row_batch, bool* batch_done) { +Status KuduScanner::HandleEmptyProjection(RowBatch* row_batch) { int num_rows_remaining = cur_kudu_batch_.NumRows() - cur_kudu_batch_num_read_; int rows_to_add = std::min(row_batch->capacity() - row_batch->num_rows(), num_rows_remaining); cur_kudu_batch_num_read_ += rows_to_add; row_batch->CommitRows(rows_to_add); - // If we've reached the capacity, or the LIMIT for the scan, return. - if (row_batch->AtCapacity() || scan_node_->ReachedLimit()) { - *batch_done = true; - } return Status::OK(); } -Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* row_batch, Tuple** tuple_mem, - bool* batch_done) { - *batch_done = false; - +Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* row_batch, Tuple** tuple_mem) { // Short-circuit the count(*) case. if (scan_node_->tuple_desc_->slots().empty()) { - return HandleEmptyProjection(row_batch, batch_done); + return HandleEmptyProjection(row_batch); } // Iterate through the Kudu rows, evaluate conjuncts and deep-copy survivors into @@ -205,10 +197,7 @@ Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* row_batch, Tuple** tuple_me row->SetTuple(0, *tuple_mem); row_batch->CommitLastRow(); // If we've reached the capacity, or the LIMIT for the scan, return. - if (row_batch->AtCapacity() || scan_node_->ReachedLimit()) { - *batch_done = true; - break; - } + if (row_batch->AtCapacity() || scan_node_->ReachedLimit()) break; // Move to the next tuple in the tuple buffer. *tuple_mem = next_tuple(*tuple_mem); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/652e7d56/be/src/exec/kudu-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-scanner.h b/be/src/exec/kudu-scanner.h index bf84b08..8c8c663 100644 --- a/be/src/exec/kudu-scanner.h +++ b/be/src/exec/kudu-scanner.h @@ -61,14 +61,16 @@ class KuduScanner { private: /// Handles the case where the projection is empty (e.g. count(*)). /// Does this by adding sets of rows to 'row_batch' instead of adding one-by-one. - Status HandleEmptyProjection(RowBatch* row_batch, bool* batch_done); + Status HandleEmptyProjection(RowBatch* row_batch); /// Decodes rows previously fetched from kudu, now in 'cur_rows_' into a RowBatch. /// - 'batch' is the batch that will point to the new tuples. /// - *tuple_mem should be the location to output tuples. - /// - Sets 'batch_done' to true to indicate that the batch was filled to capacity or - /// the limit was reached. - Status DecodeRowsIntoRowBatch(RowBatch* batch, Tuple** tuple_mem, bool* batch_done); + /// Returns OK when one of the following conditions occur: + /// - cur_kudu_batch_ is fully consumed + /// - batch is full + /// - scan_node_ limit has been reached + Status DecodeRowsIntoRowBatch(RowBatch* batch, Tuple** tuple_mem); /// Fetches the next batch of rows from the current kudu::client::KuduScanner. Status GetNextScannerBatch(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/652e7d56/tests/query_test/test_kudu.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_kudu.py b/tests/query_test/test_kudu.py index 17769bd..5b28120 100644 --- a/tests/query_test/test_kudu.py +++ b/tests/query_test/test_kudu.py @@ -617,11 +617,11 @@ class TestKuduMemLimits(KuduTestSuite): raise assert "Memory limit exceeded" in str(e) - # IMPALA-4645: Wait for fragments to complete; in some tests KuduScanNodes took some - # time to Close() after the query returned all rows. This is necessary to ensure - # these queries do not impact other tests. - # TODO: Scan nodes shouldn't take so long to shutdown; remove when this is - # fixed (IMPALA-4654). + # IMPALA-4654: Validate the fix for a bug where LimitReached() wasn't respected in + # the KuduScanner and the limit query above would result in a fragment running an + # additional minute. This ensures that the num fragments 'in flight' reaches 0 in + # less time than IMPALA-4654 was reproducing (~60sec) but yet still enough time that + # this test won't be flaky. verifiers = [ MetricVerifier(i.service) for i in ImpalaCluster().impalads ] for v in verifiers: - v.wait_for_metric("impala-server.num-fragments-in-flight", 0, timeout=120) + v.wait_for_metric("impala-server.num-fragments-in-flight", 0, timeout=30)
