This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch branch-4.0.1 in repository https://gitbox.apache.org/repos/asf/impala.git
commit cb604bbd3ac6bb14c0aa994a6f743069d8ce7029 Author: xqhe <[email protected]> AuthorDate: Tue Jul 27 17:34:45 2021 +0800 IMPALA-10825: fix impalad crashes when closing the retrying query The crash happens when canceling the retrying query in web UI. The canceling action will call ImpalaServer#UnregisterQuery. The QueryDriver will be null if the query has already been unregistered. Testing: Add test in tests/custom_cluster/test_query_retries.py and manually tested 100 times to make sure that there was no Impalad crash Change-Id: I3b9a2cccbfbdca00b099e0f8d5f2d4bcb4d0a8c3 Reviewed-on: http://gerrit.cloudera.org:8080/17729 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/runtime/query-driver.cc | 13 ++++++++++--- tests/custom_cluster/test_query_retries.py | 26 ++++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 3 deletions(-) diff --git a/be/src/runtime/query-driver.cc b/be/src/runtime/query-driver.cc index db8c3bd..c5d979c 100644 --- a/be/src/runtime/query-driver.cc +++ b/be/src/runtime/query-driver.cc @@ -174,14 +174,21 @@ void QueryDriver::TryQueryRetry( << Substitute("Cannot retry a that has already been retried query_id = $0", PrintId(query_id)); - // Update the state and then schedule the retry asynchronously. - client_request_state_->MarkAsRetrying(*error); - // Another reference to this QueryDriver (via the shared_ptr) needs to be created and // passed to the thread so that a valid shared_ptr exists while the thread is running. // Otherwise it is possible that the user cancels the query and this QueryDriver gets // deleted by the shared_ptr. + DebugActionNoFail(FLAGS_debug_actions, "RETRY_DELAY_GET_QUERY_DRIVER"); shared_ptr<QueryDriver> query_driver = parent_server_->GetQueryDriver(query_id); + if (query_driver.get() == nullptr) { + VLOG_QUERY << Substitute( + "Skipping retry of query_id=$0 because it has already been unregistered", + PrintId(query_id)); + return; + } + + // Update the state and then schedule the retry asynchronously. + client_request_state_->MarkAsRetrying(*error); // Launch the query retry in a separate thread, 'was_retried' is set to true // if the query retry was successfully launched. diff --git a/tests/custom_cluster/test_query_retries.py b/tests/custom_cluster/test_query_retries.py index 5a6e8a4..04bcd89 100644 --- a/tests/custom_cluster/test_query_retries.py +++ b/tests/custom_cluster/test_query_retries.py @@ -747,6 +747,32 @@ class TestQueryRetries(CustomClusterTestSuite): @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( + impalad_args="--debug_actions=RETRY_DELAY_GET_QUERY_DRIVER:SLEEP@2000", + statestored_args="--statestore_heartbeat_frequency_ms=60000") + def test_retry_query_close_before_getting_query_driver(self): + """Trigger a query retry, and then close the retried query before getting + the query driver. Validate that it doesn't crash the impalad. + Set a really high statestore heartbeat frequency so that killed impalads are not + removed from the cluster membership.""" + + # Kill an impalad, and run a query. The query should be retried. + self.cluster.impalads[1].kill() + query = "select count(*) from tpch_parquet.lineitem" + handle = self.execute_query_async(query, + query_options={'retry_failed_queries': 'true'}) + + time.sleep(1) + # close the query + self.client.close_query(handle) + + time.sleep(2) + impala_service = self.cluster.get_first_impalad().service + self.assert_eventually(60, 0.1, + lambda: impala_service.get_num_in_flight_queries() == 0, + lambda: "in-flight queries: %d" % impala_service.get_num_in_flight_queries()) + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( impalad_args="--debug_actions=QUERY_RETRY_SET_RESULT_CACHE:FAIL", statestored_args="--statestore_heartbeat_frequency_ms=60000") def test_retry_query_result_cacheing_failed(self):
