This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 7fd046d00f9cd3106ff2c617c53ab6d183920785 Author: Tim Armstrong <[email protected]> AuthorDate: Sun Apr 5 12:44:53 2020 -0700 IMPALA-9611: fix hang when cancelling join builder The error could occur in the following scenario, where thread A is executing a join build fragment and thread B is cancelling the fragment instance. 1. Thread A is in HandoffToProbesAndWait(), reads is_cancelled_ and sees false. 2. Thread B in RuntimeState::Cancel() sets is_cancelled_ = true, acquires cancellation_cvs_lock_, then calls NotifyAll() on the condition variable 3. Thread A calls Wait() on the condition variable, blocks forever because cancellation already happened. The fix is for thread B to acquire the lock that thread A is holding. That prevents the race because #1 and #3 above are in the same critical section and thread B won't be able to signal the condition variable until thread A has released it. Testing: Added metric check to test_failpoints to make it easier to detect hangs caused by those tests in future. Looped test_failpoints.py overnight, which was previously enough to reproduce the failure within a couple of hours. Ran exhaustive tests. Change-Id: I996ad2055d6542eb57e12c663b89de5f84208f77 Reviewed-on: http://gerrit.cloudera.org:8080/15672 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/exec/join-builder.cc | 4 ++-- be/src/runtime/runtime-state.cc | 20 +++++++++++++++----- be/src/runtime/runtime-state.h | 21 ++++++++++++++++----- tests/failure/test_failpoints.py | 7 +++++++ 4 files changed, 40 insertions(+), 12 deletions(-) diff --git a/be/src/exec/join-builder.cc b/be/src/exec/join-builder.cc index 880dbd1..e98b084 100644 --- a/be/src/exec/join-builder.cc +++ b/be/src/exec/join-builder.cc @@ -67,7 +67,7 @@ void JoinBuilder::CloseFromProbe(RuntimeState* join_node_state) { Status JoinBuilder::WaitForInitialBuild(RuntimeState* join_node_state) { DCHECK(is_separate_build_); - join_node_state->AddCancellationCV(&probe_wakeup_cv_); + join_node_state->AddCancellationCV(&separate_build_lock_, &probe_wakeup_cv_); VLOG(2) << "JoinBuilder (id=" << join_node_id_ << ")" << " WaitForInitialBuild() called by finstance " << PrintId(join_node_state->fragment_instance_id()); @@ -97,7 +97,7 @@ Status JoinBuilder::WaitForInitialBuild(RuntimeState* join_node_state) { void JoinBuilder::HandoffToProbesAndWait(RuntimeState* build_side_state) { DCHECK(is_separate_build_) << "Doesn't make sense for embedded builder."; VLOG(2) << "Initial build ready JoinBuilder (id=" << join_node_id_ << ")"; - build_side_state->AddCancellationCV(&build_wakeup_cv_); + build_side_state->AddCancellationCV(&separate_build_lock_, &build_wakeup_cv_); { unique_lock<mutex> l(separate_build_lock_); ready_to_probe_ = true; diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc index 4cb68e3..d314906 100644 --- a/be/src/runtime/runtime-state.cc +++ b/be/src/runtime/runtime-state.cc @@ -229,20 +229,30 @@ void RuntimeState::Cancel() { is_cancelled_.Store(true); { lock_guard<SpinLock> l(cancellation_cvs_lock_); - for (ConditionVariable* cv : cancellation_cvs_) cv->NotifyAll(); + for (pair<std::mutex*, ConditionVariable*>& entry : cancellation_cvs_) { + // Acquire the lock to prevent races between readers of 'is_cancelled_' and this + // writing thread (e.g. IMPALA-9611) - the caller should read 'is_cancelled_' while + // holding the lock. Drop it before signalling the CV so that a blocked thread can + // immediately acquire the mutex when it wakes up. + { + lock_guard<mutex> l(*entry.first); + } + entry.second->NotifyAll(); + } for (CyclicBarrier* cb : cancellation_cbs_) { cb->Cancel(Status::CancelledInternal("RuntimeState::Cancel()")); } } + } -void RuntimeState::AddCancellationCV(ConditionVariable* cv) { +void RuntimeState::AddCancellationCV(mutex* mutex, ConditionVariable* cv) { lock_guard<SpinLock> l(cancellation_cvs_lock_); - for (ConditionVariable* cv2 : cancellation_cvs_) { + for (pair<std::mutex*, ConditionVariable*>& entry : cancellation_cvs_) { // Don't add if already present. - if (cv == cv2) return; + if (mutex == entry.first && cv == entry.second) return; } - cancellation_cvs_.push_back(cv); + cancellation_cvs_.push_back(make_pair(mutex, cv)); } void RuntimeState::AddBarrierToCancel(CyclicBarrier* cb) { diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h index 102db23..91e500e 100644 --- a/be/src/runtime/runtime-state.h +++ b/be/src/runtime/runtime-state.h @@ -19,10 +19,12 @@ #ifndef IMPALA_RUNTIME_RUNTIME_STATE_H #define IMPALA_RUNTIME_RUNTIME_STATE_H -#include <boost/scoped_ptr.hpp> +#include <mutex> +#include <string> #include <utility> #include <vector> -#include <string> + +#include <boost/scoped_ptr.hpp> // NOTE: try not to add more headers here: runtime-state.h is included in many many files. #include "common/global-types.h" // for PlanNodeId @@ -187,12 +189,21 @@ class RuntimeState { Status LogOrReturnError(const ErrorMsg& message); bool is_cancelled() const { return is_cancelled_.Load(); } + + /// Cancel this runtime state, signalling all condition variables and cancelling all + /// barriers added in AddCancellationCV() and AddBarrierToCancel(). This function will + /// acquire mutexes added in AddCancellationCV(), so the caller must not hold any locks + /// that must acquire after those mutexes in the lock order. void Cancel(); + /// Add a condition variable to be signalled when this RuntimeState is cancelled. /// Adding a condition variable multiple times is a no-op. Each distinct 'cv' will be - /// signalled once with NotifyAll() when is_cancelled() becomes true. + /// signalled once with NotifyAll() when is_cancelled() becomes true. 'mutex' will + /// be acquired by the cancelling thread after is_cancelled() becomes true. The caller + /// must hold 'mutex' when checking is_cancelled() to avoid a race like IMPALA-9611 + /// where the notification on 'cv' is lost. /// The condition variable must have query lifetime. - void AddCancellationCV(ConditionVariable* cv); + void AddCancellationCV(std::mutex* mutex, ConditionVariable* cv); /// Add a barrier to be cancelled when this RuntimeState is cancelled. Adding a barrier /// multiple times is a no-op. Each distinct 'cb' will be cancelled with status code @@ -371,7 +382,7 @@ class RuntimeState { /// Condition variables that will be signalled by Cancel(). Protected by /// 'cancellation_cvs_lock_'. - std::vector<ConditionVariable*> cancellation_cvs_; + std::vector<std::pair<std::mutex*, ConditionVariable*>> cancellation_cvs_; /// Cyclic barriers that will be signalled by Cancel(). Protected by /// 'cancellation_cvs_lock_'. diff --git a/tests/failure/test_failpoints.py b/tests/failure/test_failpoints.py index c3711aa..033b485 100644 --- a/tests/failure/test_failpoints.py +++ b/tests/failure/test_failpoints.py @@ -25,11 +25,13 @@ from collections import defaultdict from time import sleep from tests.beeswax.impala_beeswax import ImpalaBeeswaxException +from tests.common.impala_cluster import ImpalaCluster from tests.common.impala_test_suite import ImpalaTestSuite, LOG from tests.common.skip import SkipIf, SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, \ SkipIfLocal from tests.common.test_dimensions import create_exec_option_dimension from tests.common.test_vector import ImpalaTestDimension +from tests.verifiers.metric_verifier import MetricVerifier FAILPOINT_ACTIONS = ['FAIL', 'CANCEL', 'MEM_LIMIT_EXCEEDED'] # Not included: @@ -125,6 +127,11 @@ class TestFailpoints(ImpalaTestSuite): del vector.get_value('exec_option')['debug_action'] self.execute_query(query, vector.get_value('exec_option')) + # Detect any hung fragments left from this test. + for impalad in ImpalaCluster.get_e2e_test_cluster().impalads: + verifier = MetricVerifier(impalad.service) + verifier.wait_for_metric("impala-server.num-fragments-in-flight", 0) + def __parse_plan_nodes_from_explain(self, query, vector): """Parses the EXPLAIN <query> output and returns a list of node ids. Expects format of <ID>:<NAME>"""
