IMPALA-3857: KuduScanNode race on returning "optional" threads The KuduScanNode could return all active scanner threads when there were no more "optional" tokens available. In this case, any remaining scan ranges wouldn't be picked up and the query would produce incorrect results. This fixes the issue by cleaning up the ScannerThread code and making sure not to exit the last thread.
This was tested by running the tpch workload repeatedly under load. That work to incorporate tpch data loading for Kudu is actively being worked on, so this was tested manually. Change-Id: I22adf2109b43b1b37d9a597de85e063431dff155 Reviewed-on: http://gerrit.cloudera.org:8080/3798 Reviewed-by: Dan Hecht <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/857b94d0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/857b94d0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/857b94d0 Branch: refs/heads/master Commit: 857b94d03cf719da37d6bb95695c57944f004d36 Parents: c77fb62 Author: Matthew Jacobs <[email protected]> Authored: Mon Jul 25 16:59:56 2016 -0700 Committer: Internal Jenkins <[email protected]> Committed: Fri Jul 29 21:36:50 2016 +0000 ---------------------------------------------------------------------- be/src/exec/kudu-scan-node.cc | 98 +++++++++++++++++++++++--------------- be/src/exec/kudu-scan-node.h | 10 ++-- 2 files changed, 66 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/857b94d0/be/src/exec/kudu-scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc index a08034f..c2ed16c 100644 --- a/be/src/exec/kudu-scan-node.cc +++ b/be/src/exec/kudu-scan-node.cc @@ -408,60 +408,80 @@ void KuduScanNode::ThreadTokenAvailableCb( } } -void KuduScanNode::ScannerThread(const string& name, const TKuduKeyRange* key_range) { +Status KuduScanNode::ProcessRange(KuduScanner* scanner, const TKuduKeyRange* key_range) { + RETURN_IF_ERROR(scanner->OpenNextRange(*key_range)); + bool eos = false; + while (!eos) { + gscoped_ptr<RowBatch> row_batch(new RowBatch( + row_desc(), runtime_state_->batch_size(), mem_tracker())); + RETURN_IF_ERROR(scanner->GetNext(row_batch.get(), &eos)); + while (!done_) { + scanner->KeepKuduScannerAlive(); + if (materialized_row_batches_->AddBatchWithTimeout(row_batch.get(), 1000000)) { + ignore_result(row_batch.release()); + break; + } + } + } + // Mark the current scan range as complete. + if (eos) scan_ranges_complete_counter()->Add(1); + return Status::OK(); +} + +void KuduScanNode::ScannerThread(const string& name, const TKuduKeyRange* initial_range) { + DCHECK(initial_range != NULL); SCOPED_THREAD_COUNTER_MEASUREMENT(scanner_thread_counters()); SCOPED_TIMER(runtime_state_->total_cpu_timer()); + // Set to true if this thread observes that the number of optional threads has been + // exceeded and is exiting early. + bool optional_thread_exiting = false; KuduScanner scanner(this, runtime_state_); Status status = scanner.Open(); - if (!status.ok()) goto done; - - while (!done_) { - status = scanner.OpenNextRange(*key_range); - if (!status.ok()) goto done; - // Keep looping through all the ranges. - bool eos = false; - while (!eos) { - // Keep looping through all the rows. - gscoped_ptr<RowBatch> row_batch(new RowBatch( - row_desc(), runtime_state_->batch_size(), mem_tracker())); - status = scanner.GetNext(row_batch.get(), &eos); - if (!status.ok()) goto done; - while (true) { - if (done_) goto done; - scanner.KeepKuduScannerAlive(); - if (materialized_row_batches_->AddBatchWithTimeout(row_batch.get(), 1000000)) { - ignore_result(row_batch.release()); + if (status.ok()) { + const TKuduKeyRange* key_range = initial_range; + while (!done_ && key_range != NULL) { + status = ProcessRange(&scanner, key_range); + if (!status.ok()) break; + + // Check if the number of optional threads has been exceeded. + if (runtime_state_->resource_pool()->optional_exceeded()) { + unique_lock<mutex> l(lock_); + // Don't exit if this is the last thread. Otherwise, the scan will indicate it's + // done before all ranges have been processed. + if (num_active_scanners_ > 1) { + --num_active_scanners_; + optional_thread_exiting = true; break; } } + key_range = GetNextKeyRange(); } - // Mark the current scan range as complete. - scan_ranges_complete_counter()->Add(1); - if (runtime_state_->resource_pool()->optional_exceeded()) goto done; - key_range = GetNextKeyRange(); - if (key_range == NULL) goto done; + scanner.Close(); } -done: - VLOG(1) << "Thread done: " << name; - scanner.Close(); - runtime_state_->resource_pool()->ReleaseThreadToken(false); - - unique_lock<mutex> l(lock_); - if (!status.ok()) { - if (status_.ok()) { - status_ = status; + { + unique_lock<mutex> l(lock_); + if (!status.ok()) { + if (status_.ok()) { + status_ = status; + done_ = true; + } + } + // Decrement num_active_scanners_ unless handling the case of an early exit when + // optional threads have been exceeded, in which case it already was decremented. + if (!optional_thread_exiting) --num_active_scanners_; + if (num_active_scanners_ == 0) { done_ = true; + materialized_row_batches_->Shutdown(); } } - --num_active_scanners_; - if (num_active_scanners_ == 0) { - // If we got here and we are the last thread, we're all done. - done_ = true; - materialized_row_batches_->Shutdown(); - } + + // lock_ is released before calling ThreadResourceMgr::ReleaseThreadToken() which + // invokes ThreadTokenAvailableCb() which attempts to take the same lock. + VLOG(1) << "Thread done: " << name; + runtime_state_->resource_pool()->ReleaseThreadToken(false); } } // namespace impala http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/857b94d0/be/src/exec/kudu-scan-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-scan-node.h b/be/src/exec/kudu-scan-node.h index 329f52a..5dfb309 100644 --- a/be/src/exec/kudu-scan-node.h +++ b/be/src/exec/kudu-scan-node.h @@ -163,11 +163,15 @@ class KuduScanNode : public ScanNode { void ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool); /// Main function for scanner thread which executes a KuduScanner. Begins by processing - /// 'initial_range', once that range is completed it fetches more ranges with 'GetNextKeyRange()' - /// until there are no more ranges to fetch, an error occurred or the limit has been reached. - /// Scanned batches are enqueued in 'materialized_row_batches_'. + /// 'initial_range', and continues processing ranges returned by 'GetNextKeyRange()' + /// until there are no more ranges, an error occurs, or the limit is reached. void ScannerThread(const string& name, const TKuduKeyRange* initial_range); + /// Processes a single scan range. Row batches are fetched using 'scanner' and enqueued + /// in 'materialized_row_batches_' until the scanner reports eos for 'key_range', an + /// error occurs, or the limit is reached. + Status ProcessRange(KuduScanner* scanner, const TKuduKeyRange* key_range); + /// Returns the next partition key range to read. Thread safe. Returns NULL if there are /// no more ranges. TKuduKeyRange* GetNextKeyRange();
