IMPALA-7516: Fix query location accounting As a side-effect of IMPALA-5216, queries that were scheduled to be executed but eventually got rejected or canceled before starting execution (initializing the coordinator object) got added to the 'query_locations' map but never got removed. As a result, the queries debug page would sometimes show queries running under the "Query Locations" section even if there weren't. This patch fixes that behavior.
Testing: Added a custom cluster test. Change-Id: I66c8e59299747df57c9f39db1cb1f46ff6bbab7e Reviewed-on: http://gerrit.cloudera.org:8080/11372 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/ab6bd74f Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/ab6bd74f Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/ab6bd74f Branch: refs/heads/master Commit: ab6bd74ff73f52a3bd0247027bd82d977189ded6 Parents: a11e505 Author: Bikramjeet Vig <bikramjeet....@cloudera.com> Authored: Thu Aug 30 17:17:18 2018 -0700 Committer: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Committed: Mon Sep 10 22:05:49 2018 +0000 ---------------------------------------------------------------------- be/src/service/impala-server.cc | 2 + tests/common/impala_service.py | 7 ++++ .../custom_cluster/test_admission_controller.py | 42 ++++++++++++++++++++ 3 files changed, 51 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/ab6bd74f/be/src/service/impala-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index c8f2fcc..0999b5e 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -1158,7 +1158,9 @@ Status ImpalaServer::UnregisterQuery(const TUniqueId& query_id, bool check_infli request_state->summary_profile()->AddInfoStringRedacted("ExecSummary", exec_summary); request_state->summary_profile()->AddInfoStringRedacted("Errors", request_state->GetCoordinator()->GetErrorLog()); + } + if (request_state->schedule() != nullptr) { const PerBackendExecParams& per_backend_params = request_state->schedule()->per_backend_exec_params(); if (!per_backend_params.empty()) { http://git-wip-us.apache.org/repos/asf/impala/blob/ab6bd74f/tests/common/impala_service.py ---------------------------------------------------------------------- diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py index b738b8e..3157fbd 100644 --- a/tests/common/impala_service.py +++ b/tests/common/impala_service.py @@ -179,6 +179,13 @@ class ImpaladService(BaseImpalaService): num = len(result['backends']) return None if num is None else int(num) + def get_query_locations(self): + # Returns a dictionary of the format <host_address, num_of_queries_running_there> + result = json.loads(self.read_debug_webpage('queries?json', timeout=30, interval=1)) + if result['query_locations'] is not None: + return {loc["location"]: loc["count"] for loc in result['query_locations']} + return None + def get_in_flight_queries(self, timeout=30, interval=1): result = json.loads(self.read_debug_webpage('queries?json', timeout, interval)) return result['in_flight_queries'] http://git-wip-us.apache.org/repos/asf/impala/blob/ab6bd74f/tests/custom_cluster/test_admission_controller.py ---------------------------------------------------------------------- diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py index 4b8974a..7f20dbe 100644 --- a/tests/custom_cluster/test_admission_controller.py +++ b/tests/custom_cluster/test_admission_controller.py @@ -664,6 +664,48 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): matches = [re.search(INITIAL_QUEUE_REASON_REGEX, profile) for profile in profiles] return [match.group(0) for match in matches if match is not None] + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=10, + pool_max_mem=1024 * 1024 * 1024), + statestored_args=_STATESTORED_ARGS) + def test_query_locations_correctness(self, vector): + """Regression test for IMPALA-7516: Test to make sure query locations and in-flight + queries are correct for different admission results that can affect it.""" + # Choose a query that runs on all 3 backends. + query = "select * from functional.alltypesagg A, (select sleep(10000)) B limit 1" + # Case 1: When a query runs succesfully. + handle = self.client.execute_async(query) + self.__assert_num_queries_accounted(1) + self.close_query(handle) + self.__assert_num_queries_accounted(0) + # Case 2: When a query is queued then cancelled + handle_running = self.client.execute_async(query) + self.client.wait_for_admission_control(handle_running) + handle_queued = self.client.execute_async(query) + self.impalad_test_service.wait_for_metric_value( + "admission-controller.total-queued.default-pool", 1) + self.__assert_num_queries_accounted(2) + # First close the queued query + self.close_query(handle_queued) + self.close_query(handle_running) + self.__assert_num_queries_accounted(0) + # Case 3: When a query gets rejected + exec_options = copy(vector.get_value('exec_option')) + exec_options['mem_limit'] = "1b" + self.execute_query_expect_failure(self.client, query, exec_options) + self.__assert_num_queries_accounted(0) + + def __assert_num_queries_accounted(self, expected_num): + """Checks if the num of queries accounted by query_locations and in-flight are as + expected""" + # Wait for queries to start/un-register. + self.impalad_test_service.wait_for_num_in_flight_queries(expected_num) + query_locations = self.impalad_test_service.get_query_locations() + for host, num_q in query_locations.items(): + assert num_q == expected_num, "There should be {0} running queries on either " \ + "impalads: {0}".format(query_locations) + class TestAdmissionControllerStress(TestAdmissionControllerBase): """Submits a number of queries (parameterized) with some delay between submissions