IMPALA-4548: BlockingJoinNode should wait for async build thread This is a minor clean up for the handling of the async build thread in BlockingJoinNode. In particular, the main thread used to block on the promise passed to the async build thread and won't proceed until the status is set. Async build thread relies on the fragile assumption that no other states (e.g. runtime state object) will be accessed once the promise is set. This has proven to be fragile with the use-after-free bug in IMPALA-4532.
This change removes the reliance on fragile assumption by making the main thread join the async build thread before proceeding. Change-Id: I33b07d60426cde61922b05c969ef09453ac0f342 Reviewed-on: http://gerrit.cloudera.org:8080/6664 Reviewed-by: Michael Ho <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/692c6a55 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/692c6a55 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/692c6a55 Branch: refs/heads/master Commit: 692c6a555811a09827dd963d4bf4f1ffd0a3aad4 Parents: 8660c40 Author: Michael Ho <[email protected]> Authored: Mon Apr 17 14:54:53 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Thu Apr 20 21:55:57 2017 +0000 ---------------------------------------------------------------------- be/src/exec/blocking-join-node.cc | 46 +++++++++++++++------------------- be/src/exec/blocking-join-node.h | 3 +-- 2 files changed, 21 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/692c6a55/be/src/exec/blocking-join-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/blocking-join-node.cc b/be/src/exec/blocking-join-node.cc index fca2f72..8fb0756 100644 --- a/be/src/exec/blocking-join-node.cc +++ b/be/src/exec/blocking-join-node.cc @@ -144,31 +144,24 @@ void BlockingJoinNode::Close(RuntimeState* state) { } void BlockingJoinNode::ProcessBuildInputAsync(RuntimeState* state, DataSink* build_sink, - Promise<Status>* status) { - Status s; - { - SCOPED_THREAD_COUNTER_MEASUREMENT(state->total_thread_statistics()); - if (build_sink == NULL){ - s = ProcessBuildInput(state); - } else { - s = SendBuildInputToSink<true>(state, build_sink); - } - // IMPALA-1863: If the build-side thread failed, then we need to close the right - // (build-side) child to avoid a potential deadlock between fragment instances. This - // is safe to do because while the build may have partially completed, it will not be - // probed. BlockJoinNode::Open() will return failure as soon as child(0)->Open() - // completes. - if (!s.ok()) child(1)->Close(state); - // Release the thread token as soon as possible (before the main thread joins - // on it). This way, if we had a chain of 10 joins using 1 additional thread, - // we'd keep the additional thread busy the whole time. - state->resource_pool()->ReleaseThreadToken(false); + Status* status) { + DCHECK(status != nullptr); + SCOPED_THREAD_COUNTER_MEASUREMENT(state->total_thread_statistics()); + if (build_sink == nullptr){ + *status = ProcessBuildInput(state); + } else { + *status = SendBuildInputToSink<true>(state, build_sink); } - // Please keep this as the last line in this function to avoid use-after-free problem. - // Once 'status' is set, ProcessBuildInputAndProbe() will start running and 'states' - // may have been freed after this line once the query completes. IMPALA-4532. - // TODO: Make this less fragile. - status->Set(s); + // IMPALA-1863: If the build-side thread failed, then we need to close the right + // (build-side) child to avoid a potential deadlock between fragment instances. This + // is safe to do because while the build may have partially completed, it will not be + // probed. BlockJoinNode::Open() will return failure as soon as child(0)->Open() + // completes. + if (!status->ok()) child(1)->Close(state); + // Release the thread token as soon as possible (before the main thread joins + // on it). This way, if we had a chain of 10 joins using 1 additional thread, + // we'd keep the additional thread busy the whole time. + state->resource_pool()->ReleaseThreadToken(false); } Status BlockingJoinNode::Open(RuntimeState* state) { @@ -194,7 +187,7 @@ Status BlockingJoinNode::ProcessBuildInputAndOpenProbe( // build side in a different thread, the overlap stops when the left child Open() // returns. if (!IsInSubplan() && state->resource_pool()->TryAcquireThreadToken()) { - Promise<Status> build_side_status; + Status build_side_status; runtime_profile()->AppendExecOption("Join Build-Side Prepared Asynchronously"); Thread build_thread( node_name_, "build thread", bind(&BlockingJoinNode::ProcessBuildInputAsync, this, @@ -209,7 +202,8 @@ Status BlockingJoinNode::ProcessBuildInputAndOpenProbe( // Blocks until ProcessBuildInput has returned, after which the build side structures // are fully constructed. - RETURN_IF_ERROR(build_side_status.Get()); + build_thread.Join(); + RETURN_IF_ERROR(build_side_status); RETURN_IF_ERROR(open_status); } else if (IsInSubplan()) { // When inside a subplan, open the first child before doing the build such that http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/692c6a55/be/src/exec/blocking-join-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/blocking-join-node.h b/be/src/exec/blocking-join-node.h index 4000b22..c184857 100644 --- a/be/src/exec/blocking-join-node.h +++ b/be/src/exec/blocking-join-node.h @@ -204,8 +204,7 @@ class BlockingJoinNode : public ExecNode { /// The main function for the thread that processes the build input asynchronously. /// Its status is returned in the 'status' promise. If 'build_sink' is non-NULL, it /// is used for the build. Otherwise, ProcessBuildInput() is called on the subclass. - void ProcessBuildInputAsync(RuntimeState* state, DataSink* build_sink, - Promise<Status>* status); + void ProcessBuildInputAsync(RuntimeState* state, DataSink* build_sink, Status* status); }; }
