IMPALA-4180: Synchronize accesses to RuntimeState::reader_contexts_ HdfsScanNodeBase::Close() may add its outstanding DiskIO context to RuntimeState::reader_contexts_ to be unregistered later when the fragment is closed. In a plan fragment with multiple HDFS scan nodes, it's possible for HdfsScanNodeBase::Close() to be called concurrently. To allow safe concurrent accesses, this change adds a SpinLock to synchronize accesses to 'reader_contexts_' in RuntimeState.
Change-Id: I911fda526a99514b12f88a3e9fb5952ea4fe1973 Reviewed-on: http://gerrit.cloudera.org:8080/4558 Reviewed-by: Dan Hecht <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/2a31fbdb Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/2a31fbdb Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/2a31fbdb Branch: refs/heads/master Commit: 2a31fbdbfac9a7092c96e4ab9894e0db0e4ce9ca Parents: f640b3a Author: Michael Ho <[email protected]> Authored: Wed Sep 28 14:32:55 2016 -0700 Committer: Internal Jenkins <[email protected]> Committed: Fri Sep 30 01:21:05 2016 +0000 ---------------------------------------------------------------------- be/src/exec/hdfs-scan-node-base.cc | 2 +- be/src/runtime/plan-fragment-executor.cc | 4 +--- be/src/runtime/runtime-state.cc | 13 +++++++++++++ be/src/runtime/runtime-state.h | 12 +++++++++++- .../queries/QueryTest/single-node-nlj.test | 15 +++++++++++++-- 5 files changed, 39 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2a31fbdb/be/src/exec/hdfs-scan-node-base.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc index c03817b..4acf3f5 100644 --- a/be/src/exec/hdfs-scan-node-base.cc +++ b/be/src/exec/hdfs-scan-node-base.cc @@ -446,7 +446,7 @@ void HdfsScanNodeBase::Close(RuntimeState* state) { // There may still be io buffers used by parent nodes so we can't unregister the // reader context yet. The runtime state keeps a list of all the reader contexts and // they are unregistered when the fragment is closed. - state->reader_contexts()->push_back(reader_context_); + state->AcquireReaderContext(reader_context_); // Need to wait for all the active scanner threads to finish to ensure there is no // more memory tracked by this scan node's mem tracker. state->io_mgr()->CancelContext(reader_context_, true); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2a31fbdb/be/src/runtime/plan-fragment-executor.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/plan-fragment-executor.cc b/be/src/runtime/plan-fragment-executor.cc index 7300f44..e0d314b 100644 --- a/be/src/runtime/plan-fragment-executor.cc +++ b/be/src/runtime/plan-fragment-executor.cc @@ -508,9 +508,7 @@ void PlanFragmentExecutor::Close() { // Prepare may not have been called, which sets runtime_state_ if (runtime_state_.get() != NULL) { if (plan_ != NULL) plan_->Close(runtime_state_.get()); - for (DiskIoRequestContext* context: *runtime_state_->reader_contexts()) { - runtime_state_->io_mgr()->UnregisterContext(context); - } + runtime_state_->UnregisterReaderContexts(); exec_env_->thread_mgr()->UnregisterPool(runtime_state_->resource_pool()); runtime_state_->desc_tbl().ClosePartitionExprs(runtime_state_.get()); runtime_state_->filter_bank()->Close(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2a31fbdb/be/src/runtime/runtime-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc index 5249076..a05b3ef 100644 --- a/be/src/runtime/runtime-state.cc +++ b/be/src/runtime/runtime-state.cc @@ -292,4 +292,17 @@ Status RuntimeState::GetCodegen(LlvmCodeGen** codegen, bool initialize) { return Status::OK(); } +void RuntimeState::AcquireReaderContext(DiskIoRequestContext* reader_context) { + boost::lock_guard<SpinLock> l(reader_contexts_lock_); + reader_contexts_.push_back(reader_context); +} + +void RuntimeState::UnregisterReaderContexts() { + boost::lock_guard<SpinLock> l(reader_contexts_lock_); + for (DiskIoRequestContext* context : reader_contexts_) { + io_mgr()->UnregisterContext(context); + } + reader_contexts_.clear(); +} + } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2a31fbdb/be/src/runtime/runtime-state.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h index 0bf9db5..3496d9c 100644 --- a/be/src/runtime/runtime-state.h +++ b/be/src/runtime/runtime-state.h @@ -138,7 +138,6 @@ class RuntimeState { ThreadResourceMgr::ResourcePool* resource_pool() { return resource_pool_; } FileMoveMap* hdfs_files_to_move() { return &hdfs_files_to_move_; } - std::vector<DiskIoRequestContext*>* reader_contexts() { return &reader_contexts_; } void set_fragment_root_id(PlanNodeId id) { DCHECK_EQ(root_node_id_, -1) << "Should not set this twice."; @@ -163,6 +162,14 @@ class RuntimeState { /// even when codegen is enabled if nothing has been codegen'd. bool codegen_created() const { return codegen_.get() != NULL; } + /// Takes ownership of a scan node's reader context and plan fragment executor will call + /// UnregisterReaderContexts() to unregister it when the fragment is closed. The IO + /// buffers may still be in use and thus the deferred unregistration. + void AcquireReaderContext(DiskIoRequestContext* reader_context); + + /// Unregisters all reader contexts acquired through AcquireReaderContext(). + void UnregisterReaderContexts(); + /// Returns codegen_ in 'codegen'. If 'initialize' is true, codegen_ will be created if /// it has not been initialized by a previous call already. If 'initialize' is false, /// 'codegen' will be set to NULL if codegen_ has not been initialized. @@ -344,6 +351,9 @@ class RuntimeState { Status query_status_; /// Reader contexts that need to be closed when the fragment is closed. + /// Synchronization is needed if there are multiple scan nodes in a plan fragment and + /// Close() may be called on them concurrently (see IMPALA-4180). + SpinLock reader_contexts_lock_; std::vector<DiskIoRequestContext*> reader_contexts_; /// BufferedBlockMgr object used to allocate and manage blocks of input data in memory http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2a31fbdb/testdata/workloads/functional-query/queries/QueryTest/single-node-nlj.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/single-node-nlj.test b/testdata/workloads/functional-query/queries/QueryTest/single-node-nlj.test index 49cdf9d..fa1ccfc 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/single-node-nlj.test +++ b/testdata/workloads/functional-query/queries/QueryTest/single-node-nlj.test @@ -149,7 +149,6 @@ group by id) v2; ---- TYPES TINYINT,INT,BIGINT ==== -==== ---- QUERY # Regression test for IMPALA-561: Multiple scan nodes in a plan fragment. select count(*) @@ -161,6 +160,18 @@ left join functional.alltypes a2 on a2.tinyint_col >= 1 BIGINT ==== ---- QUERY +# Regression test for IMPALA-4180: a single node plan with blocking join node +# and multiple top-n + scan nodes to trigger concurrent Close() on scan nodes. +with t as (select int_col x from functional.alltypestiny order by id limit 2) +select * from t t1 left join t t2 on t1.x > 0 +---- RESULTS +0,NULL +1,0 +1,1 +---- TYPES +INT,INT +==== +---- QUERY # Right non-equi-join with empty build. select straight_join at.id from alltypes at @@ -211,4 +222,4 @@ limit 5 7295 ---- TYPES INT -==== +==== \ No newline at end of file
