This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 4a39144295bd65838d55e212b337a782f7167b0d
Author: Riza Suminto <[email protected]>
AuthorDate: Mon Sep 16 16:33:32 2024 -0700

    IMPALA-12937: Deflake test_admission_controller.py
    
    test_mem_limit in test_admission_controller.py has been flaky,
    especially in ARM environment. From the test logs, some queries are
    canceled early, presumably due to client being idle.
    
    This patch attempts to fix the flakiness by fetching one row every
    FETCH_INTERVAL, regardless of the ending behavior. Added and moved logs
    around to help triaging if the issue surface again. Also addressed some
    flake2 errors.
    
    Testing:
    - Loop the whole test_admission_controller.py 10 times in exhaustive
      mode. No flakiness observed after patch.
    
    Change-Id: I9cf2bddcbfcd63d3a6bbc0a2014774a910f6e730
    Reviewed-on: http://gerrit.cloudera.org:8080/21813
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 tests/custom_cluster/test_admission_controller.py | 74 ++++++++++++++---------
 1 file changed, 47 insertions(+), 27 deletions(-)

diff --git a/tests/custom_cluster/test_admission_controller.py 
b/tests/custom_cluster/test_admission_controller.py
index 4743bad65..cd0d1fcd2 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -115,6 +115,8 @@ QUERY_END_BEHAVIORS = ['EOS', 'CLIENT_CANCEL', 
'QUERY_TIMEOUT', 'CLIENT_CLOSE']
 
 # The timeout used for the QUERY_TIMEOUT end behaviour
 QUERY_END_TIMEOUT_S = 3
+FETCH_INTERVAL = 0.5
+assert FETCH_INTERVAL < QUERY_END_TIMEOUT_S
 
 # Value used for --admission_control_stale_topic_threshold_ms in tests.
 STALE_TOPIC_THRESHOLD_MS = 500
@@ -743,7 +745,7 @@ class TestAdmissionController(TestAdmissionControllerBase, 
HS2TestSuite):
   @CustomClusterTestSuite.with_args(num_exclusive_coordinators=1, 
cluster_size=2,
       impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=1,
           pool_max_mem=2 * 1024 * 1024 * 1024, proc_mem_limit=3 * 1024 * 1024 
* 1024))
-  def test_mem_limits(self, vector, unique_database):
+  def test_mem_limits(self, vector):
     """Verify that the query option mem_limit_coordinators and 
mem_limit_executors are
     ignored when mem_limit is set."""
     ImpalaTestSuite.change_database(self.client, 
vector.get_value('table_format'))
@@ -810,8 +812,8 @@ class TestAdmissionController(TestAdmissionControllerBase, 
HS2TestSuite):
       # returns, we wait for this to timeout instead.
       self.execute_query(query, exec_options)
     except ImpalaBeeswaxException as e:
-      assert re.search("Queued reason: Not enough memory available on host 
\S+.Needed "
-          "2.00 GB but only 1.00 GB out of 2.00 GB was available.", str(e)), 
str(e)
+      assert re.search(r"Queued reason: Not enough memory available on host 
\S+.Needed "
+          r"2.00 GB but only 1.00 GB out of 2.00 GB was available.", str(e)), 
str(e)
     finally:
       if impalad_with_2g_mem is not None:
         impalad_with_2g_mem.close()
@@ -1471,10 +1473,10 @@ class 
TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-      impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=3,
-          pool_max_mem=1024 * 1024 * 1024) +
-      " --admission_control_stale_topic_threshold_ms={0}".format(
-          STALE_TOPIC_THRESHOLD_MS),
+      impalad_args=(impalad_admission_ctrl_flags(
+        max_requests=1, max_queued=3, pool_max_mem=1024 * 1024 * 1024)
+        + " --admission_control_stale_topic_threshold_ms={0}".format(
+          STALE_TOPIC_THRESHOLD_MS)),
       statestored_args=_STATESTORED_ARGS)
   def test_statestore_outage(self):
     self.client.set_configuration_option('enable_trivial_query_for_admission', 
'false')
@@ -1769,6 +1771,7 @@ class 
TestAdmissionControllerWithACService(TestAdmissionController):
     self.wait_for_state(
         handle2, self.client.QUERY_STATES['RUNNING'], timeout_s, 
client=client2)
 
+
 class TestAdmissionControllerStress(TestAdmissionControllerBase):
   """Submits a number of queries (parameterized) with some delay between 
submissions
   (parameterized) and the ability to submit to one impalad or many in a 
round-robin
@@ -1988,7 +1991,7 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
     for i in range(num_queries):
       # pop() is thread-safe, it's OK if another thread is appending 
concurrently.
       thread = self.executing_threads.pop(0)
-      LOG.info("Cancelling query %s", thread.query_num)
+      LOG.info("Cancelling query {}".format(thread.query_num_and_id()))
       assert thread.query_state == 'ADMITTED'
       current_executing_queries.append(thread)
       thread.query_state = 'REQUEST_QUERY_END'
@@ -2040,6 +2043,7 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
           # check if the test is already shut down.
           self.lock.acquire()
           if self.shutdown:
+            self.print_termination_log()
             return
 
           exec_options = self.vector.get_value('exec_option')
@@ -2057,28 +2061,32 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
           if self.query_end_behavior == 'QUERY_TIMEOUT':
             client.execute("SET 
QUERY_TIMEOUT_S={0}".format(QUERY_END_TIMEOUT_S))
 
-          LOG.info("Submitting query %s", self.query_num)
+          LOG.info("Submitting query %s with ending behavior %s",
+                   self.query_num, self.query_end_behavior)
           self.query_handle = client.execute_async(query)
           client.wait_for_admission_control(self.query_handle)
           admission_result = client.get_admission_result(self.query_handle)
           assert len(admission_result) > 0
           if "Rejected" in admission_result:
-            LOG.info("Rejected query %s", self.query_num)
+            LOG.info("Rejected query %s", self.query_num_and_id())
             self.query_state = 'REJECTED'
+            self.print_termination_log()
             self.query_handle = None
             return
           elif "Timed out" in admission_result:
-            LOG.info("Query %s timed out", self.query_num)
+            LOG.info("Query %s timed out", self.query_num_and_id())
             self.query_state = 'TIMED OUT'
+            self.print_termination_log()
             self.query_handle = None
             return
-          LOG.info("Admission result for query %s : %s", self.query_num, 
admission_result)
+          LOG.info("Admission result for query %s : %s",
+                   self.query_num_and_id(), admission_result)
         except ImpalaBeeswaxException as e:
           LOG.exception(e)
           raise e
         finally:
           self.lock.release()
-        LOG.info("Admitted query %s", self.query_num)
+        LOG.info("Admitted query %s", self.query_num_and_id())
         self.query_state = 'ADMITTED'
         # The thread becomes visible to the main thread when it is added to the
         # shared list of executing_threads. append() is atomic and thread-safe.
@@ -2086,27 +2094,26 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
 
         # Synchronize with the main thread. At this point, the thread is 
executing a
         # query. It needs to wait until the main thread requests it to end its 
query.
-        while not self.shutdown:
-          # The QUERY_TIMEOUT needs to stay active until the main thread 
requests it
-          # to end. Otherwise, the query may get cancelled early. Fetch rows 2 
times
-          # per QUERY_TIMEOUT interval to keep the query active.
-          if self.query_end_behavior == 'QUERY_TIMEOUT' and \
-             self.query_state != 'COMPLETED':
-            fetch_result = client.fetch(query, self.query_handle, 1)
-            assert len(fetch_result.data) == 1, str(fetch_result)
+        while not self.shutdown and self.query_state != 'COMPLETED':
+          # The query needs to stay active until the main thread requests it 
to end.
+          # Otherwise, the query may get cancelled early. Fetch 1 row every
+          # FETCH_INTERVAL to keep the query active.
+          fetch_result = client.fetch(query, self.query_handle, 1)
+          assert len(fetch_result.data) == 1, str(fetch_result)
           if self.query_state == 'REQUEST_QUERY_END':
             self._end_query(client, query)
             # The query has released admission control resources
             self.query_state = 'COMPLETED'
+            self.print_termination_log()
             self.query_handle = None
-          sleep(QUERY_END_TIMEOUT_S / 6)
+          sleep(FETCH_INTERVAL)
       except Exception as e:
         LOG.exception(e)
         # Unknown errors will be raised later
         self.error = e
         self.query_state = 'ERROR'
+        self.print_termination_log()
       finally:
-        LOG.info("Thread terminating in state=%s", self.query_state)
         if client is not None:
           try:
             self.lock.acquire()
@@ -2116,20 +2123,33 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
           finally:
             self.lock.release()
 
+    def print_termination_log(self):
+      LOG.info("Thread for query {} terminating in state {}".format(
+        self.query_num, self.query_state))
+
+    def query_num_and_id(self):
+      return "{} (id={})".format(self.query_num, 
self.query_handle.get_handle().id)
+
     def _end_query(self, client, query):
       """Bring the query to the appropriate end state defined by 
self.query_end_behaviour.
       Returns once the query has reached that state."""
-      LOG.info("Ending query %s by %s",
-          str(self.query_handle.get_handle()), self.query_end_behavior)
+      LOG.info("Ending query {} by {}".format(
+        self.query_num_and_id(), self.query_end_behavior))
       if self.query_end_behavior == 'QUERY_TIMEOUT':
         # Sleep and wait for the query to be cancelled. The cancellation will
         # set the state to EXCEPTION.
         start_time = time()
-        while (client.get_state(self.query_handle) !=
-               client.QUERY_STATES['EXCEPTION']):
+        while (client.get_state(self.query_handle)
+               != client.QUERY_STATES['EXCEPTION']):
           assert (time() - start_time < STRESS_TIMEOUT),\
             "Timed out waiting %s seconds for query cancel" % (STRESS_TIMEOUT,)
           sleep(1)
+        # try fetch and confirm from exception message that query was timed 
out.
+        try:
+          client.fetch(query, self.query_handle)
+          assert False
+        except Exception as e:
+          assert 'expired due to client inactivity' in str(e)
       elif self.query_end_behavior == 'EOS':
         # Fetch all rows so we hit eos.
         client.fetch(query, self.query_handle)

Reply via email to