This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 1935f9e1a IMPALA-12616: Fix test_restart_services.py::TestRestart 
tests for S3
1935f9e1a is described below

commit 1935f9e1a199c958c5fb12ad53277fa720d6ae5c
Author: Joe McDonnell <[email protected]>
AuthorDate: Mon Jun 3 16:55:28 2024 -0700

    IMPALA-12616: Fix test_restart_services.py::TestRestart tests for S3
    
    The test_restart_catalogd_while_handling_rpc_response* tests
    from custom_cluster/test_restart_services.py have been failing
    consistently on s3. The alter table statement is expected to
    succeed, but instead it fails with:
    "CatalogException: Detected catalog service ID changes"
    This manifests as a timeout waiting for the statement to reach
    the finished state.
    
    The test relies on specific timing with a sleep injected via a
    debug action. The failure stems from the catalog being slower
    on s3. The alter table wakes up before the catalog service ID
    change has fully completed, and it fails when it sees the
    catalog service ID change.
    
    This increases two sleep times:
    1. This increases the sleep time before restarting the catalogd
       from 0.5 seconds to 5 seconds. This gives the catalogd longer
       to receive the message about the alter table and respond back
       to the impalad.
    2. This increases the WAIT_BEFORE_PROCESSING_CATALOG_UPDATE
       sleep from 10 seconds to 30 seconds so the alter table
       statement doesn't wake up until the catalog service ID change
       is finalized.
    The test is verifying that the right messages are in the impalad
    logs, so we know this is still testing the same condition.
    
    This modifies the tests to use wait_for_finished_timeout()
    rather than wait_for_state(). This bails out immediately if the
    query fails rather than waiting unnecessarily for the full timeout.
    This also clears the query options so that later statements
    don't inherit the debug_action that the alter table statement
    used.
    
    Testing:
     - Ran the tests 100x in a loop on s3
     - Ran the tests 100x in a loop on HDFS
    
    Change-Id: Ieb5699b8fb0b2ad8bad4ac30922a7b4d7fa17d29
    Reviewed-on: http://gerrit.cloudera.org:8080/21485
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Daniel Becker <[email protected]>
---
 tests/custom_cluster/test_restart_services.py | 40 +++++++++++++++++++++------
 1 file changed, 31 insertions(+), 9 deletions(-)

diff --git a/tests/custom_cluster/test_restart_services.py 
b/tests/custom_cluster/test_restart_services.py
index bc9e803d0..01b9971fe 100644
--- a/tests/custom_cluster/test_restart_services.py
+++ b/tests/custom_cluster/test_restart_services.py
@@ -166,7 +166,9 @@ class TestRestart(CustomClusterTestSuite):
 
     self.cluster.catalogd.start()
     thread.join()
-    self.wait_for_state(query_handle[0], QueryState.FINISHED, 30000)
+    max_wait_time = 300
+    finished = self.client.wait_for_finished_timeout(query_handle[0], 
max_wait_time)
+    assert finished, "Statement did not finish after {0} 
seconds".format(max_wait_time)
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
@@ -219,28 +221,38 @@ class TestRestart(CustomClusterTestSuite):
     # Make the catalog object version grow large enough
     self.execute_query_expect_success(self.client, "invalidate metadata")
 
-    debug_action_sleep_time_sec = 10
+    # IMPALA-12616: If this sleep is not long enough, the alter table could 
wake up
+    # before the new catalog service ID is finalized, and the query can fail 
due to the
+    # difference in the service ID. This was a particular problem on s3, which 
runs a
+    # bit slower.
+    debug_action_sleep_time_sec = 30
     DEBUG_ACTION = ("WAIT_BEFORE_PROCESSING_CATALOG_UPDATE:SLEEP@{}"
                     .format(debug_action_sleep_time_sec * 1000))
 
     query = "alter table {} add columns (age int)".format(tbl_name)
     handle = self.execute_query_async(query, query_options={"debug_action": 
DEBUG_ACTION})
 
-    # Wait a bit so the RPC from the catalogd arrives to the coordinator.
-    time.sleep(0.5)
+    # Wait a bit so the RPC from the catalogd arrives to the coordinator. 
Using a generous
+    # value here gives the catalogd plenty of time to respond.
+    time.sleep(5)
 
     self.cluster.catalogd.restart()
 
     # Wait for the query to finish.
     max_wait_time = (debug_action_sleep_time_sec
         + self.WAIT_FOR_CATALOG_UPDATE_TIMEOUT_SEC + 10)
-    self.wait_for_state(handle, self.client.QUERY_STATES["FINISHED"], 
max_wait_time)
+    finished = self.client.wait_for_finished_timeout(handle, max_wait_time)
+    assert finished, "Statement did not finish after {0} 
seconds".format(max_wait_time)
 
     self.assert_impalad_log_contains("WARNING",
         "Waiting for catalog update with a new catalog service ID timed out.")
     self.assert_impalad_log_contains("WARNING",
         "Ignoring catalog update result of catalog service ID")
 
+    # Clear the query options so the following statements don't use the 
debug_action
+    # set above.
+    self.client.clear_configuration()
+
     self.execute_query_expect_success(self.client, "select age from 
{}".format(tbl_name))
 
     self.execute_query_expect_success(self.client,
@@ -270,15 +282,20 @@ class TestRestart(CustomClusterTestSuite):
     # Make the catalog object version grow large enough
     self.execute_query_expect_success(self.client, "invalidate metadata")
 
-    debug_action_sleep_time_sec = 10
+    # IMPALA-12616: If this sleep is not long enough, the alter table could 
wake up
+    # before the new catalog service ID is finalized, and the query can fail 
due to the
+    # difference in the service ID. This was a particular problem on s3, which 
runs a
+    # bit slower.
+    debug_action_sleep_time_sec = 30
     DEBUG_ACTION = ("WAIT_BEFORE_PROCESSING_CATALOG_UPDATE:SLEEP@{}"
                     .format(debug_action_sleep_time_sec * 1000))
 
     query = "alter table {} add columns (age int)".format(tbl_name)
     handle = self.execute_query_async(query, query_options={"debug_action": 
DEBUG_ACTION})
 
-    # Wait a bit so the RPC from the catalogd arrives to the coordinator.
-    time.sleep(0.5)
+    # Wait a bit so the RPC from the catalogd arrives to the coordinator. 
Using a generous
+    # value here gives the catalogd plenty of time to respond.
+    time.sleep(5)
 
     self.cluster.catalogd.restart()
 
@@ -286,6 +303,10 @@ class TestRestart(CustomClusterTestSuite):
     # waiting for catalog updates.
     time.sleep(debug_action_sleep_time_sec + 0.5)
 
+    # Clear the query options so the following statements don't use the 
debug_action
+    # set above.
+    self.client.clear_configuration()
+
     # Issue DML queries so that the coordinator receives catalog updates.
     for i in range(self.WAIT_FOR_CATALOG_UPDATE_MAX_ITERATIONS):
       try:
@@ -297,7 +318,8 @@ class TestRestart(CustomClusterTestSuite):
 
     # Wait for the query to finish.
     max_wait_time = 10
-    self.wait_for_state(handle, self.client.QUERY_STATES["FINISHED"], 
max_wait_time)
+    finished = self.client.wait_for_finished_timeout(handle, max_wait_time)
+    assert finished, "Statement did not finish after {0} 
seconds".format(max_wait_time)
 
     expected_log_msg = "Received {} non-empty catalog updates from the 
statestore " \
         "while waiting for an update with a new catalog service ID but the 
catalog " \

Reply via email to