This is an automated email from the ASF dual-hosted git repository. tarmstrong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 6cdc8b5ce7fd50bbb54d846594262ba766b22bed Author: wzhou-code <wz...@cloudera.com> AuthorDate: Mon Jul 13 12:24:19 2020 -0700 IMPALA-6788: Abort ExecFInstance() RPC loop early after query failure Stops issuing ExecQueryFInstance rpcs and cancels any inflight when backend reports failure. Adds new debug action CONSTRUCT_QUERY_STATE_REPORT that runs when constructing a query state report. Adds a new test case for handling errors reported from query state. Testing: - Ran following command for new test case and verified that the code working as expected: ./bin/impala-py.test tests/custom_cluster/test_rpc_exception.py\ ::TestRPCException::test_state_report_error \ --workload_exploration_strategy=functional-query:exhaustive - Passed exhaustive tests. Change-Id: I034788f7720fc97c25c54f006ff72dce6cb199c3 Reviewed-on: http://gerrit.cloudera.org:8080/16192 Reviewed-by: Thomas Tauber-Marshall <tmarsh...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- be/src/runtime/coordinator.cc | 9 ++++++--- be/src/runtime/query-state.cc | 5 +++++ tests/custom_cluster/test_rpc_exception.py | 29 +++++++++++++++++++++++++++++ 3 files changed, 40 insertions(+), 3 deletions(-) diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc index 3551849..b57d66f 100644 --- a/be/src/runtime/coordinator.cc +++ b/be/src/runtime/coordinator.cc @@ -946,9 +946,12 @@ Status Coordinator::UpdateBackendExecStatus(const ReportExecStatusRequestPB& req if (!status.ok()) { // We may start receiving status reports before all exec rpcs are complete. // Can't apply state transition until no more exec rpcs will be sent. - // TODO(IMPALA-6788): we should stop issuing ExecQueryFInstance rpcs and cancel any - // inflight when this happens. - WaitOnExecRpcs(); + // We should stop issuing ExecQueryFInstance rpcs and cancel any inflight + // when this happens. + if (!exec_rpcs_complete_.Load()) { + if (!status.IsCancelled()) exec_rpcs_status_barrier_.NotifyRemaining(status); + WaitOnExecRpcs(); + } // Transition the status if we're not already in a terminal state. This won't block // because either this transitions to an ERROR state or the query is already in diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc index 35d8c1f..742d33f 100644 --- a/be/src/runtime/query-state.cc +++ b/be/src/runtime/query-state.cc @@ -427,6 +427,11 @@ void QueryState::ConstructReport(bool instances_started, report->set_coord_state_idx(exec_rpc_params_.coord_state_idx()); { unique_lock<SpinLock> l(status_lock_); + + Status debug_action_status = + DebugAction(query_options(), "CONSTRUCT_QUERY_STATE_REPORT"); + if (UNLIKELY(!debug_action_status.ok())) overall_status_ = debug_action_status; + overall_status_.ToProto(report->mutable_overall_status()); if (IsValidFInstanceId(failed_finstance_id_)) { TUniqueIdToUniqueIdPB(failed_finstance_id_, report->mutable_fragment_instance_id()); diff --git a/tests/custom_cluster/test_rpc_exception.py b/tests/custom_cluster/test_rpc_exception.py index d4701c5..4921ca9 100644 --- a/tests/custom_cluster/test_rpc_exception.py +++ b/tests/custom_cluster/test_rpc_exception.py @@ -157,3 +157,32 @@ class TestRPCException(CustomClusterTestSuite): elapsed_s = time.time() - start_s assert elapsed_s < 100, "Query took longer than expected to fail: %ss" % elapsed_s self.client.set_configuration_option("DEBUG_ACTION", "") + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args("--debug_actions=" + + _get_rpc_debug_action(rpc=EXEC_RPC, action="SLEEP@1000000")) + def test_state_report_error(self): + """Test that verifies that when one backend reports failure, the other Exec() rpcs + are immediately cancelled and the query returns an error quickly.""" + # Debug action to cause executer to construct a state report with failure. When + # the state report is processed by the coordinator, it sends a signal to stop + # issuing ExecQueryFInstance rpcs and cancel any inflight. + # The Exec() rpc to one of the impalads will sleep for a long time before hitting + # this (due to the impalad debug_actions startup flag specified above), so one + # Exec() will fail quickly while other one will fail only after a long wait. + self.client.set_configuration_option("DEBUG_ACTION", + "CONSTRUCT_QUERY_STATE_REPORT:FAIL") + + start_s = time.time() + try: + self.client.execute(self.TEST_QUERY) + assert False, "query was expected to fail" + except ImpalaBeeswaxException as e: + assert "Debug Action: CONSTRUCT_QUERY_STATE_REPORT:FAIL" in str(e) + + # If we successfully cancelled all Exec() rpcs and returned to the client as soon as + # the fast Exec() report failure, the time to run the query should be much less than + # the sleep time for the slow Exec() of 1000s. + elapsed_s = time.time() - start_s + assert elapsed_s < 100, "Query took longer than expected to fail: %ss" % elapsed_s + self.client.set_configuration_option("DEBUG_ACTION", "")