This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 59fdd7169a4523a2c4916096d550855e49c8a35a
Author: Yida Wu <yida...@cloudera.com>
AuthorDate: Thu Jul 24 21:58:39 2025 -0700

    IMPALA-10866: Add testcases for failure cases involving the admission 
service
    
    The admission service uses the statestore as the only source of
    truth to determine whether a coordinator is down. If the statestore
    reports a coordinator is down, all running and queued queries
    associated with it should be cancelled or rejected.
    
    In IMPALA-12057, we introduced logic to reject queued queries if
    the corresponding coordinator has been removed, along with tests
    for that behavior.
    
    This patch adds additional test cases to cover other failure
    scenarios, such as the coordinator or the statestore going down
    with running queries, and verifies that the behavior is as expected
    in each case.
    
    Tests:
    Passed exhaustive tests.
    
    Change-Id: If617326cbc6fe2567857d6323c6413d98c92d009
    Reviewed-on: http://gerrit.cloudera.org:8080/23217
    Reviewed-by: Riza Suminto <riza.sumi...@cloudera.com>
    Reviewed-by: Abhishek Rawat <ara...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 tests/custom_cluster/test_admission_controller.py | 76 +++++++++++++++++++++++
 1 file changed, 76 insertions(+)

diff --git a/tests/custom_cluster/test_admission_controller.py 
b/tests/custom_cluster/test_admission_controller.py
index 50fa518ce..77ea46415 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -2221,6 +2221,82 @@ class 
TestAdmissionControllerWithACService(TestAdmissionController):
     self.assert_log_contains_multiline(self.get_ac_log_name(), 'INFO',
         "The coordinator no longer exists")
 
+  @SkipIfNotHdfsMinicluster.tuned_for_minicluster
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args="--vmodule admission-controller=3 
--default_pool_max_requests=1 "
+    "--queue_wait_timeout_ms=60000 ")
+  def test_kill_statestore_with_queries_running(self):
+    long_query = "select count(*), sleep(10000) from functional.alltypes limit 
1"
+    short_query = "select count(*) from functional.alltypes limit 1"
+    timeout_s = 60
+
+    handle1 = self.client.execute_async(long_query)
+    # Make sure the first query has been admitted.
+    self.client.wait_for_impala_state(handle1, RUNNING, timeout_s)
+
+    # Run another query. This query should be queued because only 1 query is 
allowed in
+    # the default pool.
+    handle2 = self.client.execute_async(short_query)
+    self._wait_for_change_to_profile(handle2, "Admission result: Queued")
+
+    # Restart the statestore while queries are running/queued.
+    statestore = self.cluster.statestored
+    statestore.kill()
+    statestore.start()
+
+    # Verify that both queries eventually complete.
+    self.client.wait_for_impala_state(handle1, FINISHED, timeout_s)
+    self.client.close_query(handle1)
+    self.client.wait_for_impala_state(handle2, FINISHED, timeout_s)
+    self.client.close_query(handle2)
+
+  @SkipIfNotHdfsMinicluster.tuned_for_minicluster
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args="--vmodule admission-controller=3 
--default_pool_max_requests=1 "
+    "--queue_wait_timeout_ms=60000 ", disable_log_buffering=True)
+  def test_kill_coord_with_queries_running(self):
+    long_query = "select count(*), sleep(1000000000) from functional.alltypes 
limit 1"
+    short_query = "select count(*) from functional.alltypes limit 1"
+    timeout_s = 10
+
+    all_coords = self.cluster.get_all_coordinators()
+    assert len(all_coords) >= 2, "Test requires at least two coordinators"
+    coord1 = all_coords[0]
+    coord2 = all_coords[1]
+
+    # Make sure the first query has been admitted.
+    client1 = coord1.service.create_hs2_client()
+    handle1 = client1.execute_async(long_query)
+    client1.wait_for_impala_state(handle1, RUNNING, timeout_s)
+    query_id1 = client1.handle_id(handle1)
+
+    # Run another query. This query should be queued because only 1 query is 
allowed in
+    # the default pool.
+    client2 = coord2.service.create_hs2_client()
+    handle2 = client2.execute_async(short_query)
+    self._wait_for_change_to_profile(handle2, "Admission result: Queued", 
client=client2)
+
+    # Kill the coordinator handling the running query.
+    coord1.kill()
+    try:
+      client1.close_query(handle1)
+    except Exception:
+      pass
+
+    # The first query should be canceled after coord1 is killed,
+    # allowing the queued query to run.
+    admissiond_log = self.get_ac_log_name()
+    self.assert_log_contains(admissiond_log, 'INFO',
+      "Released query id={}".format(query_id1), expected_count=1)
+    client2.wait_for_impala_state(handle2, FINISHED, timeout_s)
+    client2.close_query(handle2)
+
+    # Cleanup.
+    client1.close()
+    client2.close()
+
   @SkipIfNotHdfsMinicluster.tuned_for_minicluster
   @pytest.mark.execute_serially
   def test_retained_removed_coords_size(self):

Reply via email to