This is an automated email from the ASF dual-hosted git repository. csringhofer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit e3a6cb240b95785a6b92bb1e505bcddceb29b3c8 Author: Riza Suminto <[email protected]> AuthorDate: Wed Mar 26 17:28:17 2025 -0700 IMPALA-13906: Change TestQueryConcurrency to regular query_test TestQueryConcurrency run much slower when the test client changed from beeswax to hs2. The reason is this test inject slowdown through backend flag --stress_metadata_loading_pause_injection_ms=100000 that will impact all queries. Changing test client to ImpylaHS2Connection will cause this test run much slower because ImpylaHS2Connection populate its default query options by running SET ALL query. This patch speed up TestQueryConcurrency by changing it from custom_cluster to regular query_test, using debug action to inject slowdown, and lower the slowdown from 100s to 60s. Added debug action EXECUTE_INTERNAL_REGISTERED to replace stress_metadata_loading_pause_injection_ms flag. Testing: - Pass TestQueryConcurrency in exhaustive mode. Change-Id: Ia33f7c0a36df0f1922055902974ce3e0c2fb6fdb Reviewed-on: http://gerrit.cloudera.org:8080/22699 Reviewed-by: Riza Suminto <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/service/impala-server.cc | 14 +------ .../test_query_concurrency.py | 47 ++++++++++++---------- 2 files changed, 26 insertions(+), 35 deletions(-) diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index 55b5a1ebc..ad0f7463f 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -335,12 +335,6 @@ DEFINE_int64(shutdown_query_cancel_period_s, 60, "period exceeds 20% of the total shutdown deadline, it will be capped at 20% of the " "total shutdown duration."); -#ifndef NDEBUG - DEFINE_int64(stress_metadata_loading_pause_injection_ms, 0, "Simulates metadata loading" - "for a given query by injecting a sleep equivalent to this configuration in " - "milliseconds. Only used for testing."); -#endif - DEFINE_int64(accepted_client_cnxn_timeout, 300000, "(Advanced) The amount of time in milliseconds an accepted connection will wait in " "the post-accept, pre-setup connection queue before it is timed out and the " @@ -1351,13 +1345,7 @@ Status ImpalaServer::ExecuteInternal(const TQueryCtx& query_ctx, RETURN_IF_ERROR(RegisterQuery(query_ctx.query_id, session_state, query_handle)); *registered_query = true; -#ifndef NDEBUG - // Inject a sleep to simulate metadata loading pauses for tables. This - // is only used for testing. - if (FLAGS_stress_metadata_loading_pause_injection_ms > 0) { - SleepForMs(FLAGS_stress_metadata_loading_pause_injection_ms); - } -#endif + DebugActionNoFail((*query_handle)->query_options(), "EXECUTE_INTERNAL_REGISTERED"); size_t statement_length = query_ctx.client_request.stmt.length(); int32_t max_statement_length = diff --git a/tests/custom_cluster/test_query_concurrency.py b/tests/query_test/test_query_concurrency.py similarity index 70% rename from tests/custom_cluster/test_query_concurrency.py rename to tests/query_test/test_query_concurrency.py index 49a9ca246..97e2d0c97 100644 --- a/tests/custom_cluster/test_query_concurrency.py +++ b/tests/query_test/test_query_concurrency.py @@ -18,18 +18,21 @@ from __future__ import absolute_import, division, print_function import pytest import time + from threading import Thread -from tests.common.custom_cluster_test_suite import CustomClusterTestSuite +from tests.common.impala_cluster import ImpalaCluster +from tests.common.impala_test_suite import ImpalaTestSuite from tests.common.skip import SkipIfBuildType + @SkipIfBuildType.not_dev_build -class TestQueryConcurrency(CustomClusterTestSuite): +class TestQueryConcurrency(ImpalaTestSuite): """Tests if multiple queries are registered on the coordinator when submitted in parallel along with clients trying to access the web UI. The intention here is to check that the web server call paths don't hold global locks that can conflict with other requests and prevent the impalad from servicing them. It is done by simulating a metadata loading pause - using the configuration key --metadata_loading_pause_injection_ms that + using the debug action 'EXECUTE_INTERNAL_REGISTERED' that makes the frontend hold the ClientRequestState::lock_ for longer duration.""" TEST_QUERY = "select count(*) from tpch.supplier" @@ -39,12 +42,6 @@ class TestQueryConcurrency(CustomClusterTestSuite): def get_workload(self): return 'functional-query' - @classmethod - def setup_class(cls): - if cls.exploration_strategy() != 'exhaustive': - pytest.skip('Runs only in exhaustive mode.') - super(TestQueryConcurrency, cls).setup_class() - def poll_query_page(self, impalad, query_id): """Polls the debug plan page of a given query id in a loop till the timeout of POLLING_TIMEOUT_S is hit.""" @@ -68,24 +65,30 @@ class TestQueryConcurrency(CustomClusterTestSuite): assert False, "Registered query count doesn't match: " + str(count) @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args("--stress_metadata_loading_pause_injection_ms=100000") - def test_query_concurrency(self, vector): - impalad = self.cluster.get_any_impalad() - client1 = impalad.service.create_beeswax_client() - client2 = impalad.service.create_beeswax_client() - q1 = Thread(target = client1.execute_async, args = (self.TEST_QUERY,)) - q2 = Thread(target = client2.execute_async, args = (self.TEST_QUERY,)) + def test_query_concurrency(self): + if self.exploration_strategy() != 'exhaustive': + pytest.skip('Runs only in exhaustive mode.') + impalad = ImpalaCluster.get_e2e_test_cluster().get_any_impalad() + # Inject 1 minute sleep right after "Query submitted" timeline shows up + # and ClientRequestState::lock_ is being held. + opts = {'debug_action': 'EXECUTE_INTERNAL_REGISTERED:SLEEP@60000'} + client1 = impalad.service.create_hs2_client() + client2 = impalad.service.create_hs2_client() + q1 = Thread(target=self.execute_query_expect_success, + args=(client1, self.TEST_QUERY, opts,)) + q2 = Thread(target=self.execute_query_expect_success, + args=(client2, self.TEST_QUERY, opts,)) q1.start() inflight_query_ids = self.check_registered_queries(impalad, 1) - Thread(target = self.poll_query_page,\ - args = (impalad, inflight_query_ids[0]['query_id'],)).start() + poll_thread = Thread(target=self.poll_query_page, + args=(impalad, inflight_query_ids[0]['query_id'],)) + poll_thread.start() time.sleep(2) q2.start() inflight_query_ids = self.check_registered_queries(impalad, 2) - result = impalad.service.read_debug_webpage("query_profile_encoded?query_id="\ - + inflight_query_ids[1]['query_id']) + result = impalad.service.read_debug_webpage( + "query_profile_encoded?query_id={}".format(inflight_query_ids[1]['query_id'])) assert result.startswith("Could not obtain runtime profile") - client1.close() - client2.close() + poll_thread.join() q1.join() q2.join()
