Repository: impala Updated Branches: refs/heads/master d29300281 -> 3fa05604a
IMPALA-7422: Fix a race in QueryState::StartFInstances() A recent commit for IMPALA-7163 (cbc8c63) introduced a race between insertion into QueryState::fragment_map_ and the thread creation. In particular, after the aforementioned commit, the counting barrier 'instances_prepared_barrier_' is used for synchronizing callers of Cancel()/PublishFilter() and the PREPARE phase of fragment instances. Cancel()/PublishFilter() cannot proceed until all fragment instances have finished preparing; 'instances_prepared_barrier_' is updated by fragment instances once each of them is done preparing. The race is due to the fact that QueryState::StartFInstances() doesn't insert the fragment instance into 'fragment_map_' until after the fragment instance thread has been spawned. So, it's possible for the newly spawned thread to finish preparing and update the counting barrier before the insertion into 'fragment_map_' happens. It's therefore possible for PublishFilter() to have gotten unblocked before a fragment is inserted into 'fragment_map_', triggering the DCHECK() in IMPALA-7422. This change fixes the race by moving the insertion into fragment_map_ before the thread is spawned. Testing done: Exhaustive debug + release builds which previously ran into this race Change-Id: I35f2a5b0ea5143703850ffc229cec0e4294e6a3e Reviewed-on: http://gerrit.cloudera.org:8080/11270 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/8360277e Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/8360277e Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/8360277e Branch: refs/heads/master Commit: 8360277eaf6ab9743f253105dbaa25ecdfd6c5e7 Parents: d293002 Author: Michael Ho <[email protected]> Authored: Mon Aug 20 00:43:22 2018 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Tue Aug 21 09:44:20 2018 +0000 ---------------------------------------------------------------------- be/src/runtime/query-state.cc | 12 ++++++++---- be/src/runtime/query-state.h | 10 ++++++---- 2 files changed, 14 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/8360277e/be/src/runtime/query-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc index 4b8924a..329f757 100644 --- a/be/src/runtime/query-state.cc +++ b/be/src/runtime/query-state.cc @@ -396,9 +396,15 @@ void QueryState::StartFInstances() { refcnt_.Add(1); // decremented in ExecFInstance() AcquireExecResourceRefcount(); // decremented in ExecFInstance() - // Add the fragment instance ID to the 'fis_map_'. + // Add the fragment instance ID to the 'fis_map_'. Has to happen before the thread is + // spawned or we may race with users of 'fis_map_'. fis_map_.emplace(fis->instance_id(), fis); + // Update fragment_map_. Has to happen before the thread is spawned below or + // we may race with users of 'fragment_map_'. + vector<FragmentInstanceState*>& fis_list = fragment_map_[instance_ctx.fragment_idx]; + fis_list.push_back(fis); + string thread_name = Substitute("$0 (finst:$1)", FragmentInstanceState::FINST_THREAD_NAME_PREFIX, PrintId(instance_ctx.fragment_instance_id)); @@ -412,6 +418,7 @@ void QueryState::StartFInstances() { debug_action_status; if (!thread_create_status.ok()) { fis_map_.erase(fis->instance_id()); + fis_list.pop_back(); // Undo refcnt increments done immediately prior to Thread::Create(). The // reference counts were both greater than zero before the increments, so // neither of these decrements will free any structures. @@ -419,9 +426,6 @@ void QueryState::StartFInstances() { ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(this); break; } - // update fragment_map_ - vector<FragmentInstanceState*>& fis_list = fragment_map_[instance_ctx.fragment_idx]; - fis_list.push_back(fis); t->Detach(); --num_unstarted_instances; } http://git-wip-us.apache.org/repos/asf/impala/blob/8360277e/be/src/runtime/query-state.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h index 49fd8eb..148b06f 100644 --- a/be/src/runtime/query-state.h +++ b/be/src/runtime/query-state.h @@ -29,7 +29,6 @@ #include "gen-cpp/Types_types.h" #include "runtime/tmp-file-mgr.h" #include "util/counting-barrier.h" -#include "util/promise.h" #include "util/uid-util.h" namespace impala { @@ -341,12 +340,15 @@ class QueryState { std::unique_ptr<CountingBarrier> instances_finished_barrier_; /// map from instance id to its state (owned by obj_pool_), populated in - /// StartFInstances(); not valid to read from until instances_prepare_promise_ - /// is set + /// StartFInstances(); Not valid to read from until 'instances_prepared_barrier_' + /// is set (i.e. readers should always call WaitForPrepare()). std::unordered_map<TUniqueId, FragmentInstanceState*> fis_map_; /// map from fragment index to its instances (owned by obj_pool_), populated in - /// StartFInstances() + /// StartFInstances(). Only written by the query state thread (i.e. the thread + /// which executes StartFInstances()). Not valid to read from until + /// 'instances_prepared_barrier_' is set (i.e. accessor should always call + /// WaitForPrepare()). std::unordered_map<int, std::vector<FragmentInstanceState*>> fragment_map_; ObjectPool obj_pool_;
