IMPALA-4548: BlockingJoinNode should wait for async build thread

This is a minor clean up for the handling of the async build thread
in BlockingJoinNode. In particular, the main thread used to block on
the promise passed to the async build thread and won't proceed until
the status is set. Async build thread relies on the fragile assumption
that no other states (e.g. runtime state object) will be accessed
once the promise is set. This has proven to be fragile with the
use-after-free bug in IMPALA-4532.

This change removes the reliance on fragile assumption by making
the main thread join the async build thread before proceeding.

Change-Id: I33b07d60426cde61922b05c969ef09453ac0f342
Reviewed-on: http://gerrit.cloudera.org:8080/6664
Reviewed-by: Michael Ho <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/692c6a55
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/692c6a55
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/692c6a55

Branch: refs/heads/master
Commit: 692c6a555811a09827dd963d4bf4f1ffd0a3aad4
Parents: 8660c40
Author: Michael Ho <[email protected]>
Authored: Mon Apr 17 14:54:53 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Thu Apr 20 21:55:57 2017 +0000

----------------------------------------------------------------------
 be/src/exec/blocking-join-node.cc | 46 +++++++++++++++-------------------
 be/src/exec/blocking-join-node.h  |  3 +--
 2 files changed, 21 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/692c6a55/be/src/exec/blocking-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/blocking-join-node.cc 
b/be/src/exec/blocking-join-node.cc
index fca2f72..8fb0756 100644
--- a/be/src/exec/blocking-join-node.cc
+++ b/be/src/exec/blocking-join-node.cc
@@ -144,31 +144,24 @@ void BlockingJoinNode::Close(RuntimeState* state) {
 }
 
 void BlockingJoinNode::ProcessBuildInputAsync(RuntimeState* state, DataSink* 
build_sink,
-    Promise<Status>* status) {
-  Status s;
-  {
-    SCOPED_THREAD_COUNTER_MEASUREMENT(state->total_thread_statistics());
-    if  (build_sink == NULL){
-      s = ProcessBuildInput(state);
-    } else {
-      s = SendBuildInputToSink<true>(state, build_sink);
-    }
-    // IMPALA-1863: If the build-side thread failed, then we need to close the 
right
-    // (build-side) child to avoid a potential deadlock between fragment 
instances.  This
-    // is safe to do because while the build may have partially completed, it 
will not be
-    // probed.  BlockJoinNode::Open() will return failure as soon as 
child(0)->Open()
-    // completes.
-    if (!s.ok()) child(1)->Close(state);
-    // Release the thread token as soon as possible (before the main thread 
joins
-    // on it).  This way, if we had a chain of 10 joins using 1 additional 
thread,
-    // we'd keep the additional thread busy the whole time.
-    state->resource_pool()->ReleaseThreadToken(false);
+    Status* status) {
+  DCHECK(status != nullptr);
+  SCOPED_THREAD_COUNTER_MEASUREMENT(state->total_thread_statistics());
+  if  (build_sink == nullptr){
+    *status = ProcessBuildInput(state);
+  } else {
+    *status = SendBuildInputToSink<true>(state, build_sink);
   }
-  // Please keep this as the last line in this function to avoid 
use-after-free problem.
-  // Once 'status' is set, ProcessBuildInputAndProbe() will start running and 
'states'
-  // may have been freed after this line once the query completes. IMPALA-4532.
-  // TODO: Make this less fragile.
-  status->Set(s);
+  // IMPALA-1863: If the build-side thread failed, then we need to close the 
right
+  // (build-side) child to avoid a potential deadlock between fragment 
instances.  This
+  // is safe to do because while the build may have partially completed, it 
will not be
+  // probed.  BlockJoinNode::Open() will return failure as soon as 
child(0)->Open()
+  // completes.
+  if (!status->ok()) child(1)->Close(state);
+  // Release the thread token as soon as possible (before the main thread joins
+  // on it).  This way, if we had a chain of 10 joins using 1 additional 
thread,
+  // we'd keep the additional thread busy the whole time.
+  state->resource_pool()->ReleaseThreadToken(false);
 }
 
 Status BlockingJoinNode::Open(RuntimeState* state) {
@@ -194,7 +187,7 @@ Status BlockingJoinNode::ProcessBuildInputAndOpenProbe(
   // build side in a different thread, the overlap stops when the left child 
Open()
   // returns.
   if (!IsInSubplan() && state->resource_pool()->TryAcquireThreadToken()) {
-    Promise<Status> build_side_status;
+    Status build_side_status;
     runtime_profile()->AppendExecOption("Join Build-Side Prepared 
Asynchronously");
     Thread build_thread(
         node_name_, "build thread", 
bind(&BlockingJoinNode::ProcessBuildInputAsync, this,
@@ -209,7 +202,8 @@ Status BlockingJoinNode::ProcessBuildInputAndOpenProbe(
 
     // Blocks until ProcessBuildInput has returned, after which the build side 
structures
     // are fully constructed.
-    RETURN_IF_ERROR(build_side_status.Get());
+    build_thread.Join();
+    RETURN_IF_ERROR(build_side_status);
     RETURN_IF_ERROR(open_status);
   } else if (IsInSubplan()) {
     // When inside a subplan, open the first child before doing the build such 
that

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/692c6a55/be/src/exec/blocking-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/blocking-join-node.h b/be/src/exec/blocking-join-node.h
index 4000b22..c184857 100644
--- a/be/src/exec/blocking-join-node.h
+++ b/be/src/exec/blocking-join-node.h
@@ -204,8 +204,7 @@ class BlockingJoinNode : public ExecNode {
   /// The main function for the thread that processes the build input 
asynchronously.
   /// Its status is returned in the 'status' promise. If 'build_sink' is 
non-NULL, it
   /// is used for the build. Otherwise, ProcessBuildInput() is called on the 
subclass.
-  void ProcessBuildInputAsync(RuntimeState* state, DataSink* build_sink,
-      Promise<Status>* status);
+  void ProcessBuildInputAsync(RuntimeState* state, DataSink* build_sink, 
Status* status);
 };
 
 }

Reply via email to