Repository: impala
Updated Branches:
  refs/heads/master d29300281 -> 3fa05604a


IMPALA-7422: Fix a race in QueryState::StartFInstances()

A recent commit for IMPALA-7163 (cbc8c63) introduced a race between
insertion into QueryState::fragment_map_ and the thread creation.
In particular, after the aforementioned commit, the counting barrier
'instances_prepared_barrier_' is used for synchronizing callers of
Cancel()/PublishFilter() and the PREPARE phase of fragment instances.
Cancel()/PublishFilter() cannot proceed until all fragment instances
have finished preparing; 'instances_prepared_barrier_' is updated by
fragment instances once each of them is done preparing.

The race is due to the fact that QueryState::StartFInstances() doesn't
insert the fragment instance into 'fragment_map_' until after the fragment
instance thread has been spawned. So, it's possible for the newly spawned
thread to finish preparing and update the counting barrier before the insertion
into 'fragment_map_' happens. It's therefore possible for PublishFilter() to
have gotten unblocked before a fragment is inserted into 'fragment_map_',
triggering the DCHECK() in IMPALA-7422.

This change fixes the race by moving the insertion into fragment_map_
before the thread is spawned.

Testing done: Exhaustive debug + release builds which previously ran into this 
race

Change-Id: I35f2a5b0ea5143703850ffc229cec0e4294e6a3e
Reviewed-on: http://gerrit.cloudera.org:8080/11270
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/8360277e
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/8360277e
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/8360277e

Branch: refs/heads/master
Commit: 8360277eaf6ab9743f253105dbaa25ecdfd6c5e7
Parents: d293002
Author: Michael Ho <[email protected]>
Authored: Mon Aug 20 00:43:22 2018 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Tue Aug 21 09:44:20 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/query-state.cc | 12 ++++++++----
 be/src/runtime/query-state.h  | 10 ++++++----
 2 files changed, 14 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/8360277e/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 4b8924a..329f757 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -396,9 +396,15 @@ void QueryState::StartFInstances() {
     refcnt_.Add(1); // decremented in ExecFInstance()
     AcquireExecResourceRefcount(); // decremented in ExecFInstance()
 
-    // Add the fragment instance ID to the 'fis_map_'.
+    // Add the fragment instance ID to the 'fis_map_'. Has to happen before 
the thread is
+    // spawned or we may race with users of 'fis_map_'.
     fis_map_.emplace(fis->instance_id(), fis);
 
+    // Update fragment_map_. Has to happen before the thread is spawned below 
or
+    // we may race with users of 'fragment_map_'.
+    vector<FragmentInstanceState*>& fis_list = 
fragment_map_[instance_ctx.fragment_idx];
+    fis_list.push_back(fis);
+
     string thread_name = Substitute("$0 (finst:$1)",
         FragmentInstanceState::FINST_THREAD_NAME_PREFIX,
         PrintId(instance_ctx.fragment_instance_id));
@@ -412,6 +418,7 @@ void QueryState::StartFInstances() {
         debug_action_status;
     if (!thread_create_status.ok()) {
       fis_map_.erase(fis->instance_id());
+      fis_list.pop_back();
       // Undo refcnt increments done immediately prior to Thread::Create(). The
       // reference counts were both greater than zero before the increments, so
       // neither of these decrements will free any structures.
@@ -419,9 +426,6 @@ void QueryState::StartFInstances() {
       ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(this);
       break;
     }
-    // update fragment_map_
-    vector<FragmentInstanceState*>& fis_list = 
fragment_map_[instance_ctx.fragment_idx];
-    fis_list.push_back(fis);
     t->Detach();
     --num_unstarted_instances;
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/8360277e/be/src/runtime/query-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index 49fd8eb..148b06f 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -29,7 +29,6 @@
 #include "gen-cpp/Types_types.h"
 #include "runtime/tmp-file-mgr.h"
 #include "util/counting-barrier.h"
-#include "util/promise.h"
 #include "util/uid-util.h"
 
 namespace impala {
@@ -341,12 +340,15 @@ class QueryState {
   std::unique_ptr<CountingBarrier> instances_finished_barrier_;
 
   /// map from instance id to its state (owned by obj_pool_), populated in
-  /// StartFInstances(); not valid to read from until 
instances_prepare_promise_
-  /// is set
+  /// StartFInstances(); Not valid to read from until 
'instances_prepared_barrier_'
+  /// is set (i.e. readers should always call WaitForPrepare()).
   std::unordered_map<TUniqueId, FragmentInstanceState*> fis_map_;
 
   /// map from fragment index to its instances (owned by obj_pool_), populated 
in
-  /// StartFInstances()
+  /// StartFInstances(). Only written by the query state thread (i.e. the 
thread
+  /// which executes StartFInstances()). Not valid to read from until
+  /// 'instances_prepared_barrier_' is set (i.e. accessor should always call
+  /// WaitForPrepare()).
   std::unordered_map<int, std::vector<FragmentInstanceState*>> fragment_map_;
 
   ObjectPool obj_pool_;

Reply via email to