This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 4a39144295bd65838d55e212b337a782f7167b0d Author: Riza Suminto <[email protected]> AuthorDate: Mon Sep 16 16:33:32 2024 -0700 IMPALA-12937: Deflake test_admission_controller.py test_mem_limit in test_admission_controller.py has been flaky, especially in ARM environment. From the test logs, some queries are canceled early, presumably due to client being idle. This patch attempts to fix the flakiness by fetching one row every FETCH_INTERVAL, regardless of the ending behavior. Added and moved logs around to help triaging if the issue surface again. Also addressed some flake2 errors. Testing: - Loop the whole test_admission_controller.py 10 times in exhaustive mode. No flakiness observed after patch. Change-Id: I9cf2bddcbfcd63d3a6bbc0a2014774a910f6e730 Reviewed-on: http://gerrit.cloudera.org:8080/21813 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- tests/custom_cluster/test_admission_controller.py | 74 ++++++++++++++--------- 1 file changed, 47 insertions(+), 27 deletions(-) diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py index 4743bad65..cd0d1fcd2 100644 --- a/tests/custom_cluster/test_admission_controller.py +++ b/tests/custom_cluster/test_admission_controller.py @@ -115,6 +115,8 @@ QUERY_END_BEHAVIORS = ['EOS', 'CLIENT_CANCEL', 'QUERY_TIMEOUT', 'CLIENT_CLOSE'] # The timeout used for the QUERY_TIMEOUT end behaviour QUERY_END_TIMEOUT_S = 3 +FETCH_INTERVAL = 0.5 +assert FETCH_INTERVAL < QUERY_END_TIMEOUT_S # Value used for --admission_control_stale_topic_threshold_ms in tests. STALE_TOPIC_THRESHOLD_MS = 500 @@ -743,7 +745,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): @CustomClusterTestSuite.with_args(num_exclusive_coordinators=1, cluster_size=2, impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=1, pool_max_mem=2 * 1024 * 1024 * 1024, proc_mem_limit=3 * 1024 * 1024 * 1024)) - def test_mem_limits(self, vector, unique_database): + def test_mem_limits(self, vector): """Verify that the query option mem_limit_coordinators and mem_limit_executors are ignored when mem_limit is set.""" ImpalaTestSuite.change_database(self.client, vector.get_value('table_format')) @@ -810,8 +812,8 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): # returns, we wait for this to timeout instead. self.execute_query(query, exec_options) except ImpalaBeeswaxException as e: - assert re.search("Queued reason: Not enough memory available on host \S+.Needed " - "2.00 GB but only 1.00 GB out of 2.00 GB was available.", str(e)), str(e) + assert re.search(r"Queued reason: Not enough memory available on host \S+.Needed " + r"2.00 GB but only 1.00 GB out of 2.00 GB was available.", str(e)), str(e) finally: if impalad_with_2g_mem is not None: impalad_with_2g_mem.close() @@ -1471,10 +1473,10 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( - impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=3, - pool_max_mem=1024 * 1024 * 1024) + - " --admission_control_stale_topic_threshold_ms={0}".format( - STALE_TOPIC_THRESHOLD_MS), + impalad_args=(impalad_admission_ctrl_flags( + max_requests=1, max_queued=3, pool_max_mem=1024 * 1024 * 1024) + + " --admission_control_stale_topic_threshold_ms={0}".format( + STALE_TOPIC_THRESHOLD_MS)), statestored_args=_STATESTORED_ARGS) def test_statestore_outage(self): self.client.set_configuration_option('enable_trivial_query_for_admission', 'false') @@ -1769,6 +1771,7 @@ class TestAdmissionControllerWithACService(TestAdmissionController): self.wait_for_state( handle2, self.client.QUERY_STATES['RUNNING'], timeout_s, client=client2) + class TestAdmissionControllerStress(TestAdmissionControllerBase): """Submits a number of queries (parameterized) with some delay between submissions (parameterized) and the ability to submit to one impalad or many in a round-robin @@ -1988,7 +1991,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): for i in range(num_queries): # pop() is thread-safe, it's OK if another thread is appending concurrently. thread = self.executing_threads.pop(0) - LOG.info("Cancelling query %s", thread.query_num) + LOG.info("Cancelling query {}".format(thread.query_num_and_id())) assert thread.query_state == 'ADMITTED' current_executing_queries.append(thread) thread.query_state = 'REQUEST_QUERY_END' @@ -2040,6 +2043,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): # check if the test is already shut down. self.lock.acquire() if self.shutdown: + self.print_termination_log() return exec_options = self.vector.get_value('exec_option') @@ -2057,28 +2061,32 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): if self.query_end_behavior == 'QUERY_TIMEOUT': client.execute("SET QUERY_TIMEOUT_S={0}".format(QUERY_END_TIMEOUT_S)) - LOG.info("Submitting query %s", self.query_num) + LOG.info("Submitting query %s with ending behavior %s", + self.query_num, self.query_end_behavior) self.query_handle = client.execute_async(query) client.wait_for_admission_control(self.query_handle) admission_result = client.get_admission_result(self.query_handle) assert len(admission_result) > 0 if "Rejected" in admission_result: - LOG.info("Rejected query %s", self.query_num) + LOG.info("Rejected query %s", self.query_num_and_id()) self.query_state = 'REJECTED' + self.print_termination_log() self.query_handle = None return elif "Timed out" in admission_result: - LOG.info("Query %s timed out", self.query_num) + LOG.info("Query %s timed out", self.query_num_and_id()) self.query_state = 'TIMED OUT' + self.print_termination_log() self.query_handle = None return - LOG.info("Admission result for query %s : %s", self.query_num, admission_result) + LOG.info("Admission result for query %s : %s", + self.query_num_and_id(), admission_result) except ImpalaBeeswaxException as e: LOG.exception(e) raise e finally: self.lock.release() - LOG.info("Admitted query %s", self.query_num) + LOG.info("Admitted query %s", self.query_num_and_id()) self.query_state = 'ADMITTED' # The thread becomes visible to the main thread when it is added to the # shared list of executing_threads. append() is atomic and thread-safe. @@ -2086,27 +2094,26 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): # Synchronize with the main thread. At this point, the thread is executing a # query. It needs to wait until the main thread requests it to end its query. - while not self.shutdown: - # The QUERY_TIMEOUT needs to stay active until the main thread requests it - # to end. Otherwise, the query may get cancelled early. Fetch rows 2 times - # per QUERY_TIMEOUT interval to keep the query active. - if self.query_end_behavior == 'QUERY_TIMEOUT' and \ - self.query_state != 'COMPLETED': - fetch_result = client.fetch(query, self.query_handle, 1) - assert len(fetch_result.data) == 1, str(fetch_result) + while not self.shutdown and self.query_state != 'COMPLETED': + # The query needs to stay active until the main thread requests it to end. + # Otherwise, the query may get cancelled early. Fetch 1 row every + # FETCH_INTERVAL to keep the query active. + fetch_result = client.fetch(query, self.query_handle, 1) + assert len(fetch_result.data) == 1, str(fetch_result) if self.query_state == 'REQUEST_QUERY_END': self._end_query(client, query) # The query has released admission control resources self.query_state = 'COMPLETED' + self.print_termination_log() self.query_handle = None - sleep(QUERY_END_TIMEOUT_S / 6) + sleep(FETCH_INTERVAL) except Exception as e: LOG.exception(e) # Unknown errors will be raised later self.error = e self.query_state = 'ERROR' + self.print_termination_log() finally: - LOG.info("Thread terminating in state=%s", self.query_state) if client is not None: try: self.lock.acquire() @@ -2116,20 +2123,33 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): finally: self.lock.release() + def print_termination_log(self): + LOG.info("Thread for query {} terminating in state {}".format( + self.query_num, self.query_state)) + + def query_num_and_id(self): + return "{} (id={})".format(self.query_num, self.query_handle.get_handle().id) + def _end_query(self, client, query): """Bring the query to the appropriate end state defined by self.query_end_behaviour. Returns once the query has reached that state.""" - LOG.info("Ending query %s by %s", - str(self.query_handle.get_handle()), self.query_end_behavior) + LOG.info("Ending query {} by {}".format( + self.query_num_and_id(), self.query_end_behavior)) if self.query_end_behavior == 'QUERY_TIMEOUT': # Sleep and wait for the query to be cancelled. The cancellation will # set the state to EXCEPTION. start_time = time() - while (client.get_state(self.query_handle) != - client.QUERY_STATES['EXCEPTION']): + while (client.get_state(self.query_handle) + != client.QUERY_STATES['EXCEPTION']): assert (time() - start_time < STRESS_TIMEOUT),\ "Timed out waiting %s seconds for query cancel" % (STRESS_TIMEOUT,) sleep(1) + # try fetch and confirm from exception message that query was timed out. + try: + client.fetch(query, self.query_handle) + assert False + except Exception as e: + assert 'expired due to client inactivity' in str(e) elif self.query_end_behavior == 'EOS': # Fetch all rows so we hit eos. client.fetch(query, self.query_handle)
