This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch branch-4.0.1
in repository https://gitbox.apache.org/repos/asf/impala.git

commit cb604bbd3ac6bb14c0aa994a6f743069d8ce7029
Author: xqhe <[email protected]>
AuthorDate: Tue Jul 27 17:34:45 2021 +0800

    IMPALA-10825: fix impalad crashes when closing the retrying query
    
    The crash happens when canceling the retrying query in web UI.
    The canceling action will call ImpalaServer#UnregisterQuery.
    The QueryDriver will be null if the query has already been unregistered.
    
    Testing:
     Add test in tests/custom_cluster/test_query_retries.py and manually
    tested 100 times to make sure that there was no Impalad crash
    
    Change-Id: I3b9a2cccbfbdca00b099e0f8d5f2d4bcb4d0a8c3
    Reviewed-on: http://gerrit.cloudera.org:8080/17729
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/runtime/query-driver.cc             | 13 ++++++++++---
 tests/custom_cluster/test_query_retries.py | 26 ++++++++++++++++++++++++++
 2 files changed, 36 insertions(+), 3 deletions(-)

diff --git a/be/src/runtime/query-driver.cc b/be/src/runtime/query-driver.cc
index db8c3bd..c5d979c 100644
--- a/be/src/runtime/query-driver.cc
+++ b/be/src/runtime/query-driver.cc
@@ -174,14 +174,21 @@ void QueryDriver::TryQueryRetry(
         << Substitute("Cannot retry a that has already been retried query_id = 
$0",
             PrintId(query_id));
 
-    // Update the state and then schedule the retry asynchronously.
-    client_request_state_->MarkAsRetrying(*error);
-
     // Another reference to this QueryDriver (via the shared_ptr) needs to be 
created and
     // passed to the thread so that a valid shared_ptr exists while the thread 
is running.
     // Otherwise it is possible that the user cancels the query and this 
QueryDriver gets
     // deleted by the shared_ptr.
+    DebugActionNoFail(FLAGS_debug_actions, "RETRY_DELAY_GET_QUERY_DRIVER");
     shared_ptr<QueryDriver> query_driver = 
parent_server_->GetQueryDriver(query_id);
+    if (query_driver.get() == nullptr) {
+      VLOG_QUERY << Substitute(
+          "Skipping retry of query_id=$0 because it has already been 
unregistered",
+          PrintId(query_id));
+      return;
+    }
+
+    // Update the state and then schedule the retry asynchronously.
+    client_request_state_->MarkAsRetrying(*error);
 
     // Launch the query retry in a separate thread, 'was_retried' is set to 
true
     // if the query retry was successfully launched.
diff --git a/tests/custom_cluster/test_query_retries.py 
b/tests/custom_cluster/test_query_retries.py
index 5a6e8a4..04bcd89 100644
--- a/tests/custom_cluster/test_query_retries.py
+++ b/tests/custom_cluster/test_query_retries.py
@@ -747,6 +747,32 @@ class TestQueryRetries(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
+      impalad_args="--debug_actions=RETRY_DELAY_GET_QUERY_DRIVER:SLEEP@2000",
+      statestored_args="--statestore_heartbeat_frequency_ms=60000")
+  def test_retry_query_close_before_getting_query_driver(self):
+    """Trigger a query retry, and then close the retried query before getting
+    the query driver. Validate that it doesn't crash the impalad.
+    Set a really high statestore heartbeat frequency so that killed impalads 
are not
+    removed from the cluster membership."""
+
+    # Kill an impalad, and run a query. The query should be retried.
+    self.cluster.impalads[1].kill()
+    query = "select count(*) from tpch_parquet.lineitem"
+    handle = self.execute_query_async(query,
+        query_options={'retry_failed_queries': 'true'})
+
+    time.sleep(1)
+    # close the query
+    self.client.close_query(handle)
+
+    time.sleep(2)
+    impala_service = self.cluster.get_first_impalad().service
+    self.assert_eventually(60, 0.1,
+        lambda: impala_service.get_num_in_flight_queries() == 0,
+        lambda: "in-flight queries: %d" % 
impala_service.get_num_in_flight_queries())
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
       impalad_args="--debug_actions=QUERY_RETRY_SET_RESULT_CACHE:FAIL",
       statestored_args="--statestore_heartbeat_frequency_ms=60000")
   def test_retry_query_result_cacheing_failed(self):

Reply via email to