This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 59fdd7169a4523a2c4916096d550855e49c8a35a Author: Yida Wu <yida...@cloudera.com> AuthorDate: Thu Jul 24 21:58:39 2025 -0700 IMPALA-10866: Add testcases for failure cases involving the admission service The admission service uses the statestore as the only source of truth to determine whether a coordinator is down. If the statestore reports a coordinator is down, all running and queued queries associated with it should be cancelled or rejected. In IMPALA-12057, we introduced logic to reject queued queries if the corresponding coordinator has been removed, along with tests for that behavior. This patch adds additional test cases to cover other failure scenarios, such as the coordinator or the statestore going down with running queries, and verifies that the behavior is as expected in each case. Tests: Passed exhaustive tests. Change-Id: If617326cbc6fe2567857d6323c6413d98c92d009 Reviewed-on: http://gerrit.cloudera.org:8080/23217 Reviewed-by: Riza Suminto <riza.sumi...@cloudera.com> Reviewed-by: Abhishek Rawat <ara...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- tests/custom_cluster/test_admission_controller.py | 76 +++++++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py index 50fa518ce..77ea46415 100644 --- a/tests/custom_cluster/test_admission_controller.py +++ b/tests/custom_cluster/test_admission_controller.py @@ -2221,6 +2221,82 @@ class TestAdmissionControllerWithACService(TestAdmissionController): self.assert_log_contains_multiline(self.get_ac_log_name(), 'INFO', "The coordinator no longer exists") + @SkipIfNotHdfsMinicluster.tuned_for_minicluster + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args="--vmodule admission-controller=3 --default_pool_max_requests=1 " + "--queue_wait_timeout_ms=60000 ") + def test_kill_statestore_with_queries_running(self): + long_query = "select count(*), sleep(10000) from functional.alltypes limit 1" + short_query = "select count(*) from functional.alltypes limit 1" + timeout_s = 60 + + handle1 = self.client.execute_async(long_query) + # Make sure the first query has been admitted. + self.client.wait_for_impala_state(handle1, RUNNING, timeout_s) + + # Run another query. This query should be queued because only 1 query is allowed in + # the default pool. + handle2 = self.client.execute_async(short_query) + self._wait_for_change_to_profile(handle2, "Admission result: Queued") + + # Restart the statestore while queries are running/queued. + statestore = self.cluster.statestored + statestore.kill() + statestore.start() + + # Verify that both queries eventually complete. + self.client.wait_for_impala_state(handle1, FINISHED, timeout_s) + self.client.close_query(handle1) + self.client.wait_for_impala_state(handle2, FINISHED, timeout_s) + self.client.close_query(handle2) + + @SkipIfNotHdfsMinicluster.tuned_for_minicluster + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args="--vmodule admission-controller=3 --default_pool_max_requests=1 " + "--queue_wait_timeout_ms=60000 ", disable_log_buffering=True) + def test_kill_coord_with_queries_running(self): + long_query = "select count(*), sleep(1000000000) from functional.alltypes limit 1" + short_query = "select count(*) from functional.alltypes limit 1" + timeout_s = 10 + + all_coords = self.cluster.get_all_coordinators() + assert len(all_coords) >= 2, "Test requires at least two coordinators" + coord1 = all_coords[0] + coord2 = all_coords[1] + + # Make sure the first query has been admitted. + client1 = coord1.service.create_hs2_client() + handle1 = client1.execute_async(long_query) + client1.wait_for_impala_state(handle1, RUNNING, timeout_s) + query_id1 = client1.handle_id(handle1) + + # Run another query. This query should be queued because only 1 query is allowed in + # the default pool. + client2 = coord2.service.create_hs2_client() + handle2 = client2.execute_async(short_query) + self._wait_for_change_to_profile(handle2, "Admission result: Queued", client=client2) + + # Kill the coordinator handling the running query. + coord1.kill() + try: + client1.close_query(handle1) + except Exception: + pass + + # The first query should be canceled after coord1 is killed, + # allowing the queued query to run. + admissiond_log = self.get_ac_log_name() + self.assert_log_contains(admissiond_log, 'INFO', + "Released query id={}".format(query_id1), expected_count=1) + client2.wait_for_impala_state(handle2, FINISHED, timeout_s) + client2.close_query(handle2) + + # Cleanup. + client1.close() + client2.close() + @SkipIfNotHdfsMinicluster.tuned_for_minicluster @pytest.mark.execute_serially def test_retained_removed_coords_size(self):