Repository: impala Updated Branches: refs/heads/master e988c36bf -> 4ce2af9ff
IMPALA-7205: Respond to ReportExecStatus() RPC with CANCELLED if query execution has terminated Otherwise, if the coordinator to backend CancelFInstances() RPC had failed, the query can hang (and/or finstances can continue running until the query is closed. Testing: - the modified test reproduces the hang without the impalad fix Change-Id: I7bb2c26edace89853f14a329f891d1f9a065a991 Reviewed-on: http://gerrit.cloudera.org:8080/10815 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/8000c31d Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/8000c31d Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/8000c31d Branch: refs/heads/master Commit: 8000c31d14a439a37a5f8dd626763a433f014504 Parents: e988c36 Author: Dan Hecht <[email protected]> Authored: Mon Jun 25 12:05:16 2018 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Thu Jun 28 07:38:42 2018 +0000 ---------------------------------------------------------------------- be/src/runtime/coordinator-backend-state.cc | 50 +++++++++++++----------- be/src/runtime/coordinator-backend-state.h | 10 +++-- be/src/runtime/coordinator.cc | 9 ++--- tests/query_test/test_cancellation.py | 20 ++++++---- 4 files changed, 51 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/8000c31d/be/src/runtime/coordinator-backend-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc index c871a48..a99acdb 100644 --- a/be/src/runtime/coordinator-backend-state.cc +++ b/be/src/runtime/coordinator-backend-state.cc @@ -40,8 +40,8 @@ using namespace rapidjson; namespace accumulators = boost::accumulators; Coordinator::BackendState::BackendState( - const TUniqueId& query_id, int state_idx, TRuntimeFilterMode::type filter_mode) - : query_id_(query_id), + const Coordinator& coord, int state_idx, TRuntimeFilterMode::type filter_mode) + : coord_(coord), state_idx_(state_idx), filter_mode_(filter_mode) { } @@ -150,16 +150,16 @@ void Coordinator::BackendState::SetRpcParams(const DebugOptions& debug_options, } void Coordinator::BackendState::Exec( - const TQueryCtx& query_ctx, const DebugOptions& debug_options, + const DebugOptions& debug_options, const FilterRoutingTable& filter_routing_table, CountingBarrier* exec_complete_barrier) { NotifyBarrierOnExit notifier(exec_complete_barrier); TExecQueryFInstancesParams rpc_params; - rpc_params.__set_query_ctx(query_ctx); + rpc_params.__set_query_ctx(query_ctx()); SetRpcParams(debug_options, filter_routing_table, &rpc_params); VLOG_FILE << "making rpc: ExecQueryFInstances" << " host=" << TNetworkAddressToString(impalad_address()) << " query_id=" - << PrintId(query_id_); + << PrintId(query_id()); // guard against concurrent UpdateBackendExecStatus() that may arrive after RPC returns lock_guard<mutex> l(lock_); @@ -180,7 +180,7 @@ void Coordinator::BackendState::Exec( if (!rpc_status.ok()) { const string& err_msg = - Substitute(ERR_TEMPLATE, PrintId(query_id_), rpc_status.msg().msg()); + Substitute(ERR_TEMPLATE, PrintId(query_id()), rpc_status.msg().msg()); VLOG_QUERY << err_msg; status_ = Status::Expected(err_msg); return; @@ -188,7 +188,7 @@ void Coordinator::BackendState::Exec( Status exec_status = Status(thrift_result.status); if (!exec_status.ok()) { - const string& err_msg = Substitute(ERR_TEMPLATE, PrintId(query_id_), + const string& err_msg = Substitute(ERR_TEMPLATE, PrintId(query_id()), exec_status.msg().GetFullMessageDetails()); VLOG_QUERY << err_msg; status_ = Status::Expected(err_msg); @@ -196,7 +196,7 @@ void Coordinator::BackendState::Exec( } for (const auto& entry: instance_stats_map_) entry.second->stopwatch_.Start(); - VLOG_FILE << "rpc succeeded: ExecQueryFInstances query_id=" << PrintId(query_id_); + VLOG_FILE << "rpc succeeded: ExecQueryFInstances query_id=" << PrintId(query_id()); } Status Coordinator::BackendState::GetStatus(bool* is_fragment_failure, @@ -225,7 +225,7 @@ void Coordinator::BackendState::LogFirstInProgress( for (Coordinator::BackendState* backend_state : backend_states) { lock_guard<mutex> l(backend_state->lock_); if (!backend_state->IsDone()) { - VLOG_QUERY << "query_id=" << PrintId(backend_state->query_id_) + VLOG_QUERY << "query_id=" << PrintId(backend_state->query_id()) << ": first in-progress backend: " << TNetworkAddressToString(backend_state->impalad_address()); break; @@ -351,9 +351,9 @@ bool Coordinator::BackendState::Cancel() { TCancelQueryFInstancesParams params; params.protocol_version = ImpalaInternalServiceVersion::V1; - params.__set_query_id(query_id_); + params.__set_query_id(query_id()); TCancelQueryFInstancesResult dummy; - VLOG_QUERY << "sending CancelQueryFInstances rpc for query_id=" << PrintId(query_id_) << + VLOG_QUERY << "sending CancelQueryFInstances rpc for query_id=" << PrintId(query_id()) << " backend=" << TNetworkAddressToString(impalad_address()); Status rpc_status; @@ -362,26 +362,30 @@ bool Coordinator::BackendState::Cancel() { for (int i = 0; i < 3; ++i) { ImpalaBackendConnection backend_client(ExecEnv::GetInstance()->impalad_client_cache(), impalad_address(), &client_status); - if (client_status.ok()) { - // The return value 'dummy' is ignored as it's only set if the fragment instance - // cannot be found in the backend. The fragment instances of a query can all be - // cancelled locally in a backend due to RPC failure to coordinator. In which case, - // the query state can be gone already. - rpc_status = backend_client.DoRpc( - &ImpalaBackendClient::CancelQueryFInstances, params, &dummy); - if (rpc_status.ok()) break; - } + if (!client_status.ok()) continue; + + rpc_status = DebugAction(query_ctx().client_request.query_options, + "COORD_CANCEL_QUERY_FINSTANCES_RPC"); + if (!rpc_status.ok()) continue; + + // The return value 'dummy' is ignored as it's only set if the fragment + // instance cannot be found in the backend. The fragment instances of a query + // can all be cancelled locally in a backend due to RPC failure to + // coordinator. In which case, the query state can be gone already. + rpc_status = backend_client.DoRpc( + &ImpalaBackendClient::CancelQueryFInstances, params, &dummy); + if (rpc_status.ok()) break; } if (!client_status.ok()) { status_.MergeStatus(client_status); - VLOG_QUERY << "CancelQueryFInstances query_id= " << PrintId(query_id_) + VLOG_QUERY << "CancelQueryFInstances query_id= " << PrintId(query_id()) << " failed to connect to " << TNetworkAddressToString(impalad_address()) << " :" << client_status.msg().msg(); return true; } if (!rpc_status.ok()) { status_.MergeStatus(rpc_status); - VLOG_QUERY << "CancelQueryFInstances query_id= " << PrintId(query_id_) + VLOG_QUERY << "CancelQueryFInstances query_id= " << PrintId(query_id()) << " rpc to " << TNetworkAddressToString(impalad_address()) << " failed: " << rpc_status.msg().msg(); return true; @@ -390,7 +394,7 @@ bool Coordinator::BackendState::Cancel() { } void Coordinator::BackendState::PublishFilter(const TPublishFilterParams& rpc_params) { - DCHECK(rpc_params.dst_query_id == query_id_); + DCHECK(rpc_params.dst_query_id == query_id()); { // If the backend is already done, it's not waiting for this filter, so we skip // sending it in this case. http://git-wip-us.apache.org/repos/asf/impala/blob/8000c31d/be/src/runtime/coordinator-backend-state.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h index e7af2e2..c51c16c 100644 --- a/be/src/runtime/coordinator-backend-state.h +++ b/be/src/runtime/coordinator-backend-state.h @@ -54,7 +54,7 @@ struct FInstanceExecParams; /// Thread-safe unless pointed out otherwise. class Coordinator::BackendState { public: - BackendState(const TUniqueId& query_id, int state_idx, + BackendState(const Coordinator& coord, int state_idx, TRuntimeFilterMode::type filter_mode); /// Creates InstanceStats for all instance in backend_exec_params in obj_pool @@ -70,7 +70,7 @@ class Coordinator::BackendState { /// that weren't selected during its construction. /// The debug_options are applied to the appropriate TPlanFragmentInstanceCtxs, based /// on their node_id/instance_idx. - void Exec(const TQueryCtx& query_ctx, const DebugOptions& debug_options, + void Exec(const DebugOptions& debug_options, const FilterRoutingTable& filter_routing_table, CountingBarrier* rpc_complete_barrier); @@ -202,7 +202,8 @@ class Coordinator::BackendState { void InitCounters(); }; - const TUniqueId query_id_; + const Coordinator& coord_; /// Coordinator object that owns this BackendState + const int state_idx_; /// index of 'this' in Coordinator::backend_states_ const TRuntimeFilterMode::type filter_mode_; @@ -256,6 +257,9 @@ class Coordinator::BackendState { /// Set in ApplyExecStatusReport(). Uses MonotonicMillis(). int64_t last_report_time_ms_ = 0; + const TQueryCtx& query_ctx() const { return coord_.query_ctx(); } + const TUniqueId& query_id() const { return coord_.query_id(); } + /// Fill in rpc_params based on state. Uses filter_routing_table to remove filters /// that weren't selected during its construction. void SetRpcParams(const DebugOptions& debug_options, http://git-wip-us.apache.org/repos/asf/impala/blob/8000c31d/be/src/runtime/coordinator.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc index 5470be6..b790f22 100644 --- a/be/src/runtime/coordinator.cc +++ b/be/src/runtime/coordinator.cc @@ -205,7 +205,7 @@ void Coordinator::InitBackendStates() { int backend_idx = 0; for (const auto& entry: schedule_.per_backend_exec_params()) { BackendState* backend_state = obj_pool()->Add( - new BackendState(query_id(), backend_idx, filter_mode_)); + new BackendState(*this, backend_idx, filter_mode_)); backend_state->Init(entry.second, fragment_stats_, obj_pool()); backend_states_[backend_idx++] = backend_state; } @@ -335,7 +335,7 @@ void Coordinator::StartBackendExec() { ExecEnv::GetInstance()->exec_rpc_thread_pool()->Offer( [backend_state, this, &debug_options]() { DebugActionNoFail(schedule_.query_options(), "COORD_BEFORE_EXEC_RPC"); - backend_state->Exec(query_ctx(), debug_options, filter_routing_table_, + backend_state->Exec(debug_options, filter_routing_table_, exec_rpcs_complete_barrier_.get()); }); } @@ -708,10 +708,9 @@ Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& param // We've applied all changes from the final status report - notify waiting threads. backend_exec_complete_barrier_->Notify(); } - // If all results have been returned, return a cancelled status to force the fragment + // If query execution has terminated, return a cancelled status to force the fragment // instance to stop executing. - // TODO: Make returning CANCELLED unnecessary with IMPALA-6984. - return ReturnedAllResults() ? Status::CANCELLED : Status::OK(); + return exec_state_.Load() == ExecState::EXECUTING ? Status::OK() : Status::CANCELLED; } // TODO: add histogram/percentile http://git-wip-us.apache.org/repos/asf/impala/blob/8000c31d/tests/query_test/test_cancellation.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_cancellation.py b/tests/query_test/test_cancellation.py index 6fd3778..a63a628 100644 --- a/tests/query_test/test_cancellation.py +++ b/tests/query_test/test_cancellation.py @@ -47,8 +47,11 @@ CANCEL_DELAY_IN_SECONDS = [0, 0.01, 0.1, 1, 4] # Number of times to execute/cancel each query under test NUM_CANCELATION_ITERATIONS = 1 -# Test cancellation on both running and hung queries -DEBUG_ACTIONS = [None, 'WAIT'] +# Test cancellation on both running and hung queries. Node ID 0 is the scan node +WAIT_ACTIONS = [None, '0:GETNEXT:WAIT'] + +# Verify that failed CancelFInstances() RPCs don't lead to hung queries +FAIL_RPC_ACTIONS = [None, 'COORD_CANCEL_QUERY_FINSTANCES_RPC:FAIL'] # Verify close rpc running concurrently with fetch rpc. The two cases verify: # False: close and fetch rpc run concurrently. @@ -75,7 +78,9 @@ class TestCancellation(ImpalaTestSuite): cls.ImpalaTestMatrix.add_dimension( ImpalaTestDimension('cancel_delay', *CANCEL_DELAY_IN_SECONDS)) cls.ImpalaTestMatrix.add_dimension( - ImpalaTestDimension('action', *DEBUG_ACTIONS)) + ImpalaTestDimension('wait_action', *WAIT_ACTIONS)) + cls.ImpalaTestMatrix.add_dimension( + ImpalaTestDimension('fail_rpc_action', *FAIL_RPC_ACTIONS)) cls.ImpalaTestMatrix.add_dimension( ImpalaTestDimension('join_before_close', *JOIN_BEFORE_CLOSE)) cls.ImpalaTestMatrix.add_dimension( @@ -125,9 +130,10 @@ class TestCancellation(ImpalaTestSuite): (file_format, query) join_before_close = vector.get_value('join_before_close') - action = vector.get_value('action') - # node ID 0 is the scan node - debug_action = '0:GETNEXT:' + action if action != None else '' + wait_action = vector.get_value('wait_action') + fail_rpc_action = vector.get_value('fail_rpc_action') + + debug_action = "|".join(filter(None, [wait_action, fail_rpc_action])) vector.get_value('exec_option')['debug_action'] = debug_action vector.get_value('exec_option')['buffer_pool_limit'] =\ @@ -193,7 +199,7 @@ class TestCancellation(ImpalaTestSuite): # Executing the same query without canceling should work fine. Only do this if the # query has a limit or aggregation - if action is None and ('count' in query or 'limit' in query): + if not debug_action and ('count' in query or 'limit' in query): self.execute_query(query, vector.get_value('exec_option')) def teardown_method(self, method):
