http://git-wip-us.apache.org/repos/asf/impala/blob/2de9db8f/tests/custom_cluster/test_admission_controller.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_admission_controller.py 
b/tests/custom_cluster/test_admission_controller.py
index bb531ce..6fb0ae5 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -27,6 +27,7 @@ import threading
 from copy import copy
 from time import sleep, time
 
+from beeswaxd.BeeswaxService import QueryState
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.environ import specific_build_type_timeout, IMPALAD_BUILD
@@ -42,6 +43,7 @@ from tests.common.test_vector import ImpalaTestDimension
 from tests.hs2.hs2_test_suite import HS2TestSuite, needs_session
 from ImpalaService import ImpalaHiveServer2Service
 from TCLIService import TCLIService
+from tests.verifiers.metric_verifier import MetricVerifier
 
 LOG = logging.getLogger('admission_test')
 
@@ -481,6 +483,82 @@ class TestAdmissionController(TestAdmissionControllerBase, 
HS2TestSuite):
       if impalad_with_2g_mem is not None:
         impalad_with_2g_mem.close()
 
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args= "--logbuflevel=-1 " + 
impalad_admission_ctrl_flags(max_requests=1,
+        max_queued=1, pool_max_mem=PROC_MEM_TEST_LIMIT),
+    statestored_args=_STATESTORED_ARGS)
+  def test_cancellation(self):
+    """ Test to confirm that all Async cancellation windows are hit and are 
able to
+    succesfully cancel the query"""
+    impalad = self.cluster.impalads[0]
+    client = impalad.service.create_beeswax_client()
+    try:
+      client.set_configuration_option("debug_action", 
"SLEEP_BEFORE_ADMISSION_MS:2000")
+      client.set_configuration_option("mem_limit", self.PROC_MEM_TEST_LIMIT + 
1 )
+      handle = client.execute_async("select 1")
+      sleep(1)
+      client.close_query(handle)
+      self.assert_impalad_log_contains('INFO',
+          "Ready to be Rejected but already cancelled, query id=")
+      client.clear_configuration()
+
+      client.set_configuration_option("debug_action", 
"SLEEP_BEFORE_ADMISSION_MS:2000")
+      handle = client.execute_async("select 1")
+      sleep(1)
+      client.close_query(handle)
+      self.assert_impalad_log_contains('INFO',
+          "Ready to be Admitted immediately but already cancelled, query id=")
+
+      client.set_configuration_option("debug_action",
+          "SLEEP_AFTER_COORDINATOR_STARTS_MS:2000")
+      handle = client.execute_async("select 1")
+      sleep(1)
+      client.close_query(handle)
+      self.assert_impalad_log_contains('INFO',
+          "Cancelled right after starting the coordinator query id=")
+
+      client.clear_configuration()
+      handle = client.execute_async("select sleep(10000)")
+      client.set_configuration_option("debug_action",
+          "SLEEP_AFTER_ADMISSION_OUTCOME_MS:2000")
+      queued_query_handle = client.execute_async("select 1")
+      sleep(1)
+      assert client.get_state(queued_query_handle) == QueryState.COMPILED
+      assert "Admission result: Queued" in 
client.get_runtime_profile(queued_query_handle)
+      # Only cancel the queued query, because close will wait till it 
unregisters, this
+      # gives us a chance to close the running query and allow the dequeue 
thread to
+      # dequeue the queue query
+      client.cancel(queued_query_handle)
+      client.close_query(handle)
+      client.close_query(queued_query_handle)
+      queued_profile = client.get_runtime_profile(queued_query_handle)
+      assert "Admission result: Cancelled (queued)" in queued_profile
+      self.assert_impalad_log_contains('INFO', "Dequeued cancelled query=")
+      client.clear_configuration()
+
+      handle = client.execute_async("select sleep(10000)")
+      queued_query_handle = client.execute_async("select 1")
+      sleep(1)
+      assert client.get_state(queued_query_handle) == QueryState.COMPILED
+      assert "Admission result: Queued" in 
client.get_runtime_profile(queued_query_handle)
+      client.close_query(queued_query_handle)
+      client.close_query(handle)
+      queued_profile = client.get_runtime_profile(queued_query_handle)
+      assert "Admission result: Cancelled (queued)" in queued_profile
+    except Exception as e:
+      print e.args
+    finally:
+      client.close()
+      for i in self.cluster.impalads:
+        
i.service.wait_for_metric_value("impala-server.num-fragments-in-flight", 0)
+      assert self.cluster.impalads[0].service.get_metric_value(
+        "admission-controller.agg-num-running.default-pool") == 0
+      assert self.cluster.impalads[0].service.get_metric_value(
+        "admission-controller.total-admitted.default-pool") == 3
+      assert self.cluster.impalads[0].service.get_metric_value(
+        "admission-controller.total-queued.default-pool") == 2
+
 class TestAdmissionControllerStress(TestAdmissionControllerBase):
   """Submits a number of queries (parameterized) with some delay between 
submissions
   (parameterized) and the ability to submit to one impalad or many in a 
round-robin
@@ -759,16 +837,20 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
 
           LOG.info("Submitting query %s", self.query_num)
           self.query_handle = client.execute_async(query)
-        except ImpalaBeeswaxException as e:
-          if re.search("Rejected.*queue full", str(e)):
+          client.wait_for_admission_control(self.query_handle)
+          admission_result = client.get_admission_result(self.query_handle)
+          assert len(admission_result) > 0
+          if "Rejected" in admission_result:
             LOG.info("Rejected query %s", self.query_num)
             self.query_state = 'REJECTED'
+            self.query_handle = None
             return
-          elif "exceeded timeout" in str(e):
+          elif "timeout" in admission_result:
             LOG.info("Query %s timed out", self.query_num)
             self.query_state = 'TIMED OUT'
+            self.query_handle = None
             return
-          else:
+        except ImpalaBeeswaxException as e:
             raise e
         finally:
           self.lock.release()

http://git-wip-us.apache.org/repos/asf/impala/blob/2de9db8f/tests/custom_cluster/test_krpc_mem_usage.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_krpc_mem_usage.py 
b/tests/custom_cluster/test_krpc_mem_usage.py
index c26b776..d358baa 100644
--- a/tests/custom_cluster/test_krpc_mem_usage.py
+++ b/tests/custom_cluster/test_krpc_mem_usage.py
@@ -93,6 +93,7 @@ class TestKrpcMemUsage(CustomClusterTestSuite):
     self.client.execute(query)
     # Execute and cancel query
     handle = self.client.execute_async(query)
+    self.client.wait_for_admission_control(handle)
     # Sleep to allow RPCs to arrive.
     time.sleep(0.5)
     self.client.cancel(handle)

http://git-wip-us.apache.org/repos/asf/impala/blob/2de9db8f/tests/custom_cluster/test_session_expiration.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_session_expiration.py 
b/tests/custom_cluster/test_session_expiration.py
index e89fffd..78862ec 100644
--- a/tests/custom_cluster/test_session_expiration.py
+++ b/tests/custom_cluster/test_session_expiration.py
@@ -89,3 +89,23 @@ class TestSessionExpiration(CustomClusterTestSuite):
     # now client should have expired
     assert num_expired + 1 == impalad.service.get_metric_value(
       "impala-server.num-sessions-expired")
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("-default_pool_max_requests 1")
+  def test_session_expiration_with_queued_query(self, vector):
+    """Ensure that a query waiting in queue gets cancelled if the session 
expires."""
+    impalad = self.cluster.get_any_impalad()
+    client = impalad.service.create_beeswax_client()
+    client.execute("SET IDLE_SESSION_TIMEOUT=3")
+    client.execute_async("select sleep(10000)")
+    queued_handle = client.execute_async("select 1")
+    impalad.service.wait_for_metric_value(
+      "admission-controller.local-num-queued.default-pool", 1)
+    sleep(3)
+    impalad.service.wait_for_metric_value(
+      "admission-controller.local-num-queued.default-pool", 0)
+    impalad.service.wait_for_metric_value(
+      "admission-controller.agg-num-running.default-pool", 0)
+    queued_query_profile = 
impalad.service.create_beeswax_client().get_runtime_profile(
+      queued_handle)
+    assert "Admission result: Cancelled (queued)" in queued_query_profile

http://git-wip-us.apache.org/repos/asf/impala/blob/2de9db8f/tests/hs2/hs2_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/hs2/hs2_test_suite.py b/tests/hs2/hs2_test_suite.py
index 137bd46..81c5033 100644
--- a/tests/hs2/hs2_test_suite.py
+++ b/tests/hs2/hs2_test_suite.py
@@ -228,6 +228,22 @@ class HS2TestSuite(ImpalaTestSuite):
     assert False, 'Did not reach expected operation state %s in time, actual 
state was ' \
         '%s' % (expected_state, get_operation_status_resp.operationState)
 
+  def wait_for_admission_control(self, operation_handle, timeout = 10):
+    """Waits for the admission control processing of the query to complete by 
polling
+      GetOperationStatus every interval seconds, returning the 
TGetOperationStatusResp,
+      or raising an assertion after timeout seconds."""
+    start_time = time()
+    while (time() - start_time < timeout):
+      get_operation_status_resp = self.get_operation_status(operation_handle)
+      HS2TestSuite.check_response(get_operation_status_resp)
+      if TCLIService.TOperationState.INITIALIZED_STATE < \
+          get_operation_status_resp.operationState < \
+          TCLIService.TOperationState.PENDING_STATE:
+        return get_operation_status_resp
+      sleep(0.05)
+    assert False, 'Did not complete admission control processing in time, 
current ' \
+        'operation state of query: %s' % 
(get_operation_status_resp.operationState)
+
   def execute_statement(self, statement, conf_overlay=None,
                         
expected_status_code=TCLIService.TStatusCode.SUCCESS_STATUS,
                         expected_error_prefix=None):

http://git-wip-us.apache.org/repos/asf/impala/blob/2de9db8f/tests/hs2/test_hs2.py
----------------------------------------------------------------------
diff --git a/tests/hs2/test_hs2.py b/tests/hs2/test_hs2.py
index ccdcdea..cd861e9 100644
--- a/tests/hs2/test_hs2.py
+++ b/tests/hs2/test_hs2.py
@@ -253,9 +253,10 @@ class TestHS2(HS2TestSuite):
         self.get_operation_status(execute_statement_resp.operationHandle)
     TestHS2.check_response(get_operation_status_resp)
     # If ExecuteStatement() has completed but the results haven't been fetched 
yet, the
-    # query must have at least reached RUNNING.
+    # query must have reached either PENDING or RUNNING or FINISHED.
     assert get_operation_status_resp.operationState in \
-        [TCLIService.TOperationState.RUNNING_STATE,
+        [TCLIService.TOperationState.PENDING_STATE,
+         TCLIService.TOperationState.RUNNING_STATE,
          TCLIService.TOperationState.FINISHED_STATE]
 
     fetch_results_req = TCLIService.TFetchResultsReq()
@@ -463,6 +464,13 @@ class TestHS2(HS2TestSuite):
     # should work.
     TestHS2.check_response(exec_summary_resp)
 
+    # Wait for query to start running so we can get a non-empty ExecSummary.
+    self.wait_for_admission_control(execute_statement_resp.operationHandle)
+    exec_summary_resp = self.hs2_client.GetExecSummary(exec_summary_req)
+    TestHS2.check_response(exec_summary_resp)
+    assert len(exec_summary_resp.summary.nodes) > 0
+
+    # Now close the query and verify the exec summary is available.
     close_operation_req = TCLIService.TCloseOperationReq()
     close_operation_req.operationHandle = 
execute_statement_resp.operationHandle
     TestHS2.check_response(self.hs2_client.CloseOperation(close_operation_req))
@@ -483,8 +491,9 @@ class TestHS2(HS2TestSuite):
     TestHS2.check_response(get_profile_resp)
     assert statement in get_profile_resp.profile
     # If ExecuteStatement() has completed but the results haven't been fetched 
yet, the
-    # query must have at least reached RUNNING.
-    assert "Query State: RUNNING" in get_profile_resp.profile or \
+    # query must have reached either COMPILED or RUNNING or FINISHED.
+    assert "Query State: COMPILED" in get_profile_resp.profile or \
+        "Query State: RUNNING" in get_profile_resp.profile or \
         "Query State: FINISHED" in get_profile_resp.profile, 
get_profile_resp.profile
 
     fetch_results_req = TCLIService.TFetchResultsReq()

http://git-wip-us.apache.org/repos/asf/impala/blob/2de9db8f/tests/query_test/test_observability.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_observability.py 
b/tests/query_test/test_observability.py
index bba154a..4e4bc25 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -89,10 +89,16 @@ class TestObservability(ImpalaTestSuite):
   def test_query_states(self):
     """Tests that the query profile shows expected query states."""
     query = "select count(*) from functional.alltypes"
-    handle = self.execute_query_async(query, dict())
+    handle = self.execute_query_async(query,
+        {"debug_action": "SLEEP_BEFORE_ADMISSION_MS:1000"})
+    # If ExecuteStatement() has completed and the query is paused in the 
admission control
+    # phase, then the query must be in COMPILED state.
+    profile = self.client.get_runtime_profile(handle)
+    assert "Query State: COMPILED" in profile
+    # After completion of the admission control phase, the query must have at 
least
+    # reached RUNNING state.
+    self.client.wait_for_admission_control(handle)
     profile = self.client.get_runtime_profile(handle)
-    # If ExecuteStatement() has completed but the results haven't been fetched 
yet, the
-    # query must have at least reached RUNNING.
     assert "Query State: RUNNING" in profile or \
       "Query State: FINISHED" in profile, profile
 
@@ -104,17 +110,37 @@ class TestObservability(ImpalaTestSuite):
   def test_query_options(self):
     """Test that the query profile shows expected non-default query options, 
both set
     explicitly through client and those set by planner"""
-    # Set a query option explicitly through client
-    self.execute_query("set MEM_LIMIT = 8589934592")
-    # Make sure explicitly set default values are not shown in the profile
-    self.execute_query("set runtime_filter_wait_time_ms = 0")
-    runtime_profile = self.execute_query("select 1").runtime_profile
-    assert "Query Options (set by configuration): MEM_LIMIT=8589934592" in 
runtime_profile
+    # Set mem_limit and runtime_filter_wait_time_ms to non-default and default 
value.
+    query_opts = {'mem_limit': 8589934592, 'runtime_filter_wait_time_ms': 0}
+    profile = self.execute_query("select 1", query_opts).runtime_profile
+    assert "Query Options (set by configuration): MEM_LIMIT=8589934592" in 
profile,\
+        profile
     # For this query, the planner sets NUM_NODES=1, NUM_SCANNER_THREADS=1,
     # RUNTIME_FILTER_MODE=0 and MT_DOP=0
     assert "Query Options (set by configuration and planner): 
MEM_LIMIT=8589934592," \
         "NUM_NODES=1,NUM_SCANNER_THREADS=1,RUNTIME_FILTER_MODE=0,MT_DOP=0\n" \
-        in runtime_profile
+        in profile
+
+  def test_exec_summary(self):
+    """Test that the exec summary is populated correctly in every query 
state"""
+    query = "select count(*) from functional.alltypes"
+    handle = self.execute_query_async(query,
+        {"debug_action": "SLEEP_BEFORE_ADMISSION_MS:1000"})
+    # If ExecuteStatement() has completed and the query is paused in the 
admission control
+    # phase, then the coordinator has not started yet and exec_summary should 
be empty.
+    exec_summary = self.client.get_exec_summary(handle)
+    assert exec_summary is not None and exec_summary.nodes is None
+    # After completion of the admission control phase, the coordinator would 
have started
+    # and we should get a populated exec_summary.
+    self.client.wait_for_admission_control(handle)
+    exec_summary = self.client.get_exec_summary(handle)
+    assert exec_summary is not None and exec_summary.nodes is not None
+
+    self.client.fetch(query, handle)
+    exec_summary = self.client.get_exec_summary(handle)
+    # After fetching the results and reaching finished state, we should still 
be able to
+    # fetch an exec_summary.
+    assert exec_summary is not None and exec_summary.nodes is not None
 
   @SkipIfLocal.multiple_impalad
   @pytest.mark.xfail(reason="IMPALA-6338")

http://git-wip-us.apache.org/repos/asf/impala/blob/2de9db8f/tests/query_test/test_udfs.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_udfs.py b/tests/query_test/test_udfs.py
index 902914a..9a365b5 100644
--- a/tests/query_test/test_udfs.py
+++ b/tests/query_test/test_udfs.py
@@ -40,14 +40,10 @@ class TestUdfBase(ImpalaTestSuite):
   """
   Base class with utility functions for testing UDFs.
   """
-  def _check_exception(self, e):
-    # The interesting exception message may be in 'e' or in its inner_exception
-    # depending on the point of query failure.
-    if 'Memory limit exceeded' in str(e) or 'Cancelled' in str(e):
-      return
-    if e.inner_exception is not None\
-       and ('Memory limit exceeded' in e.inner_exception.message
-            or 'Cancelled' not in e.inner_exception.message):
+  def _check_mem_limit_exception(self, e):
+    """Return without error if the exception is MEM_LIMIT_EXCEEDED, re-raise 
'e'
+    in all other cases."""
+    if 'Memory limit exceeded' in str(e):
       return
     raise e
 
@@ -427,21 +423,22 @@ class TestUdfExecution(TestUdfBase):
   # queries to fail
   @pytest.mark.execute_serially
   def test_mem_limits(self, vector, unique_database):
-    # Set the mem limit high enough that a simple scan can run
-    mem_limit = 1024 * 1024
+    # Set the mem_limit and buffer_pool_limit high enough that the query makes 
it through
+    # admission control and a simple scan can run.
     vector = copy(vector)
-    vector.get_value('exec_option')['mem_limit'] = mem_limit
+    vector.get_value('exec_option')['mem_limit'] = '1mb'
+    vector.get_value('exec_option')['buffer_pool_limit'] = '32kb'
     try:
       self.run_test_case('QueryTest/udf-mem-limit', vector, 
use_db=unique_database)
       assert False, "Query was expected to fail"
     except ImpalaBeeswaxException, e:
-      self._check_exception(e)
+      self._check_mem_limit_exception(e)
 
     try:
       self.run_test_case('QueryTest/uda-mem-limit', vector, 
use_db=unique_database)
       assert False, "Query was expected to fail"
     except ImpalaBeeswaxException, e:
-      self._check_exception(e)
+      self._check_mem_limit_exception(e)
 
     # It takes a long time for Impala to free up memory after this test, 
especially if
     # ASAN is enabled. Verify that all fragments finish executing before 
moving on to the

http://git-wip-us.apache.org/repos/asf/impala/blob/2de9db8f/www/query_backends.tmpl
----------------------------------------------------------------------
diff --git a/www/query_backends.tmpl b/www/query_backends.tmpl
index 77ed261..c1833c6 100644
--- a/www/query_backends.tmpl
+++ b/www/query_backends.tmpl
@@ -87,7 +87,8 @@ function toggleRefresh() {
 
 {{^backend_states}}
 <div class="alert alert-info" role="alert">
-Query <strong>{{query_id}}</strong> has completed, or has no backends.
+Query <strong>{{query_id}}</strong> has either completed or has no backends or 
has not
+started any backends yet.
 </div>
 {{/backend_states}}
 

Reply via email to