IMPALA-7516: Fix query location accounting

As a side-effect of IMPALA-5216, queries that were scheduled to be
executed but eventually got rejected or canceled before starting
execution (initializing the coordinator object) got added to the
'query_locations' map but never got removed. As a result, the queries
debug page would sometimes show queries running under the
"Query Locations" section even if there weren't. This patch fixes that
behavior.

Testing:
Added a custom cluster test.

Change-Id: I66c8e59299747df57c9f39db1cb1f46ff6bbab7e
Reviewed-on: http://gerrit.cloudera.org:8080/11372
Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/ab6bd74f
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/ab6bd74f
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/ab6bd74f

Branch: refs/heads/master
Commit: ab6bd74ff73f52a3bd0247027bd82d977189ded6
Parents: a11e505
Author: Bikramjeet Vig <bikramjeet....@cloudera.com>
Authored: Thu Aug 30 17:17:18 2018 -0700
Committer: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Committed: Mon Sep 10 22:05:49 2018 +0000

----------------------------------------------------------------------
 be/src/service/impala-server.cc                 |  2 +
 tests/common/impala_service.py                  |  7 ++++
 .../custom_cluster/test_admission_controller.py | 42 ++++++++++++++++++++
 3 files changed, 51 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/ab6bd74f/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index c8f2fcc..0999b5e 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1158,7 +1158,9 @@ Status ImpalaServer::UnregisterQuery(const TUniqueId& 
query_id, bool check_infli
     request_state->summary_profile()->AddInfoStringRedacted("ExecSummary", 
exec_summary);
     request_state->summary_profile()->AddInfoStringRedacted("Errors",
         request_state->GetCoordinator()->GetErrorLog());
+  }
 
+  if (request_state->schedule() != nullptr) {
     const PerBackendExecParams& per_backend_params =
         request_state->schedule()->per_backend_exec_params();
     if (!per_backend_params.empty()) {

http://git-wip-us.apache.org/repos/asf/impala/blob/ab6bd74f/tests/common/impala_service.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py
index b738b8e..3157fbd 100644
--- a/tests/common/impala_service.py
+++ b/tests/common/impala_service.py
@@ -179,6 +179,13 @@ class ImpaladService(BaseImpalaService):
     num = len(result['backends'])
     return None if num is None else int(num)
 
+  def get_query_locations(self):
+    # Returns a dictionary of the format <host_address, 
num_of_queries_running_there>
+    result = json.loads(self.read_debug_webpage('queries?json', timeout=30, 
interval=1))
+    if result['query_locations'] is not None:
+      return {loc["location"]: loc["count"] for loc in 
result['query_locations']}
+    return None
+
   def get_in_flight_queries(self, timeout=30, interval=1):
     result = json.loads(self.read_debug_webpage('queries?json', timeout, 
interval))
     return result['in_flight_queries']

http://git-wip-us.apache.org/repos/asf/impala/blob/ab6bd74f/tests/custom_cluster/test_admission_controller.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_admission_controller.py 
b/tests/custom_cluster/test_admission_controller.py
index 4b8974a..7f20dbe 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -664,6 +664,48 @@ class TestAdmissionController(TestAdmissionControllerBase, 
HS2TestSuite):
     matches = [re.search(INITIAL_QUEUE_REASON_REGEX, profile) for profile in 
profiles]
     return [match.group(0) for match in matches if match is not None]
 
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=10,
+        pool_max_mem=1024 * 1024 * 1024),
+    statestored_args=_STATESTORED_ARGS)
+  def test_query_locations_correctness(self, vector):
+    """Regression test for IMPALA-7516: Test to make sure query locations and 
in-flight
+    queries are correct for different admission results that can affect it."""
+    # Choose a query that runs on all 3 backends.
+    query = "select * from functional.alltypesagg A, (select sleep(10000)) B 
limit 1"
+    # Case 1: When a query runs succesfully.
+    handle = self.client.execute_async(query)
+    self.__assert_num_queries_accounted(1)
+    self.close_query(handle)
+    self.__assert_num_queries_accounted(0)
+    # Case 2: When a query is queued then cancelled
+    handle_running = self.client.execute_async(query)
+    self.client.wait_for_admission_control(handle_running)
+    handle_queued = self.client.execute_async(query)
+    self.impalad_test_service.wait_for_metric_value(
+      "admission-controller.total-queued.default-pool", 1)
+    self.__assert_num_queries_accounted(2)
+    # First close the queued query
+    self.close_query(handle_queued)
+    self.close_query(handle_running)
+    self.__assert_num_queries_accounted(0)
+    # Case 3: When a query gets rejected
+    exec_options = copy(vector.get_value('exec_option'))
+    exec_options['mem_limit'] = "1b"
+    self.execute_query_expect_failure(self.client, query, exec_options)
+    self.__assert_num_queries_accounted(0)
+
+  def __assert_num_queries_accounted(self, expected_num):
+    """Checks if the num of queries accounted by query_locations and in-flight 
are as
+    expected"""
+    # Wait for queries to start/un-register.
+    self.impalad_test_service.wait_for_num_in_flight_queries(expected_num)
+    query_locations = self.impalad_test_service.get_query_locations()
+    for host, num_q in query_locations.items():
+      assert num_q == expected_num, "There should be {0} running queries on 
either " \
+                                    "impalads: {0}".format(query_locations)
+
 
 class TestAdmissionControllerStress(TestAdmissionControllerBase):
   """Submits a number of queries (parameterized) with some delay between 
submissions

Reply via email to