IMPALA-1144: Fix exception when cancelling query in Impala-shell with CTRL-C

Issue 1: When query is cancelled via CTRL-C while being executed in Impala-shell
then an exception is thrown from Impala backend saying 'Invalid query handle'.
This is because one ImpalaClient was making RPC's while another ImpalaClient
cancelled the query on the backend. As a result RPC handlers in ImpalaServer
try to access a ClientRequestState that had been cleared from the backend. The
issue is confidently reproducable both in wait_to_finish and in fetch states of
the query.

As a solution the query cancellation is indicated to ImpalaClient via a bool
flag. Once a cancellation originated exception reaches Impala shell this flag
is checked to decide whether to suppress the error or not.

Issue 2: Every time a query was cancelled a 'use db' command was issued
automatically. This happened to historical reasons but is not needed anymore
(see Jira for more details).

Change-Id: I6cefaf1dae78baae238289816a7cb9d210fb38e2
Reviewed-on: http://gerrit.cloudera.org:8080/8549
Reviewed-by: Tim Armstrong <tarmstr...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/6d9da172
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/6d9da172
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/6d9da172

Branch: refs/heads/master
Commit: 6d9da172889cde75e041caf4fa024f4d9f223db5
Parents: dc1282f
Author: Gabor Kaszab <gaborkas...@cloudera.com>
Authored: Wed Nov 15 01:01:45 2017 +0100
Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org>
Committed: Wed Nov 29 03:44:51 2017 +0000

----------------------------------------------------------------------
 shell/impala_client.py                | 25 +++++++++++++++++++++----
 shell/impala_shell.py                 |  7 +++++--
 tests/shell/test_shell_commandline.py | 30 ++++++++++++++++++++++++++++++
 3 files changed, 56 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/6d9da172/shell/impala_client.py
----------------------------------------------------------------------
diff --git a/shell/impala_client.py b/shell/impala_client.py
index 868d898..795768c 100755
--- a/shell/impala_client.py
+++ b/shell/impala_client.py
@@ -55,6 +55,8 @@ class DisconnectedException(Exception):
   def __str__(self):
       return self.value
 
+class QueryCancelledByShellException(Exception): pass
+
 class ImpalaClient(object):
 
   def __init__(self, impalad, use_kerberos=False, 
kerberos_service_name="impala",
@@ -74,6 +76,10 @@ class ImpalaClient(object):
     self.query_option_levels = {}
     self.query_state = QueryState._NAMES_TO_VALUES
     self.fetch_batch_size = 1024
+    # This is set from ImpalaShell's signal handler when a query is cancelled
+    # from command line via CTRL+C. It is used to suppress error messages of
+    # query cancellation.
+    self.is_query_cancelled = False
 
   def _options_to_string_list(self, set_query_options):
     return ["%s=%s" % (k, v) for (k, v) in set_query_options.iteritems()]
@@ -306,6 +312,7 @@ class ImpalaClient(object):
     return query
 
   def execute_query(self, query):
+    self.is_query_cancelled = False
     rpc_result = self._do_rpc(lambda: self.imp_service.query(query))
     last_query_handle, status = rpc_result
     if status != RpcStatus.OK:
@@ -381,7 +388,8 @@ class ImpalaClient(object):
     # co-ordinator, so we don't need to wait.
     if query_handle_closed:
       return True
-    rpc_result = self._do_rpc(lambda: 
self.imp_service.Cancel(last_query_handle))
+    rpc_result = self._do_rpc(lambda: 
self.imp_service.Cancel(last_query_handle),
+        False)
     _, status = rpc_result
     return status == RpcStatus.OK
 
@@ -409,7 +417,7 @@ class ImpalaClient(object):
       return summary
     return None
 
-  def _do_rpc(self, rpc):
+  def _do_rpc(self, rpc, suppress_error_on_cancel=True):
     """Executes the provided callable."""
 
     if not self.connected:
@@ -428,16 +436,25 @@ class ImpalaClient(object):
           status = RpcStatus.ERROR
       return ret, status
     except BeeswaxService.QueryNotFoundException:
+      if suppress_error_on_cancel and self.is_query_cancelled:
+        raise QueryCancelledByShellException()
       raise QueryStateException('Error: Stale query handle')
     # beeswaxException prints out the entire object, printing
     # just the message is far more readable/helpful.
     except BeeswaxService.BeeswaxException, b:
-        raise RPCException("ERROR: %s" % b.message)
+      # Suppress the errors from cancelling a query that is in fetch state
+      if suppress_error_on_cancel and self.is_query_cancelled:
+        raise QueryCancelledByShellException()
+      raise RPCException("ERROR: %s" % b.message)
     except TTransportException, e:
       # issue with the connection with the impalad
       raise DisconnectedException("Error communicating with impalad: %s" % e)
     except TApplicationException, t:
-        raise RPCException("Application Exception : %s" % t)
+      # Suppress the errors from cancelling a query that is in 
waiting_to_finish
+      # state
+      if suppress_error_on_cancel and self.is_query_cancelled:
+        raise QueryCancelledByShellException()
+      raise RPCException("Application Exception : %s" % t)
     return None, RpcStatus.ERROR
 
   def _get_sleep_interval(self, start_time):

http://git-wip-us.apache.org/repos/asf/impala/blob/6d9da172/shell/impala_shell.py
----------------------------------------------------------------------
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index a9a527a..ffe01e1 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -36,7 +36,8 @@ import textwrap
 import time
 
 from impala_client import (ImpalaClient, DisconnectedException, 
QueryStateException,
-                           RPCException, TApplicationException)
+                           RPCException, TApplicationException,
+                           QueryCancelledByShellException)
 from impala_shell_config_defaults import impala_shell_defaults
 from option_parser import get_option_parser, get_config_from_file
 from shell_output import DelimitedOutputFormatter, OutputStream, 
PrettyOutputFormatter
@@ -484,13 +485,13 @@ class ImpalaShell(object, cmd.Cmd):
     # Create a new connection to the impalad and cancel the query.
     for cancel_try in xrange(ImpalaShell.CANCELLATION_TRIES):
       try:
+        self.imp_client.is_query_cancelled = True
         self.query_handle_closed = True
         print_to_stderr(ImpalaShell.CANCELLATION_MESSAGE)
         new_imp_client = self._new_impala_client()
         new_imp_client.connect()
         new_imp_client.cancel_query(self.last_query_handle, False)
         self.imp_client.close_query(self.last_query_handle)
-        self._validate_database()
         break
       except Exception, e:
         # Suppress harmless errors.
@@ -1038,6 +1039,8 @@ class ImpalaShell(object, cmd.Cmd):
       except RPCException, e:
         if self.show_profiles: raise e
       return CmdStatus.SUCCESS
+    except QueryCancelledByShellException, e:
+      return CmdStatus.SUCCESS
     except RPCException, e:
       # could not complete the rpc successfully
       print_to_stderr(e)

http://git-wip-us.apache.org/repos/asf/impala/blob/6d9da172/tests/shell/test_shell_commandline.py
----------------------------------------------------------------------
diff --git a/tests/shell/test_shell_commandline.py 
b/tests/shell/test_shell_commandline.py
index 218224b..1ecdbd5 100644
--- a/tests/shell/test_shell_commandline.py
+++ b/tests/shell/test_shell_commandline.py
@@ -325,6 +325,36 @@ class TestImpalaShell(ImpalaTestSuite):
 
     assert "Cancelling Query" in result.stderr, result.stderr
 
+  def test_query_cancellation_during_fetch(self):
+    """IMPALA-1144: Test cancellation (CTRL+C) while results are being
+    fetched"""
+    # A select query where fetch takes several seconds
+    args = '-q "with v as (values (1 as x), (2), (3), (4)) ' + \
+        'select * from v, v v2, v v3, v v4, v v5, v v6, v v7, v v8, ' + \
+        'v v9, v v10, v v11;"'
+    # Kill happens when the results are being fetched
+    self.run_and_verify_query_cancellation_test(args)
+
+  def test_query_cancellation_during_wait_to_finish(self):
+    """IMPALA-1144: Test cancellation (CTRL+C) while the query is in the
+    wait_to_finish state"""
+    # A select where wait_to_finish takes several seconds
+    args = '-q "select * from tpch.customer c1, tpch.customer c2, ' + \
+           'tpch.customer c3 order by c1.c_name"'
+    # Kill happens in wait_to_finish state
+    self.run_and_verify_query_cancellation_test(args)
+
+  def run_and_verify_query_cancellation_test(self, args):
+    """Starts the execution of the received query, waits until the query
+    execution in fact starts and then cancels it. Expects the query
+    cancellation to succeed."""
+    p = ImpalaShell(args)
+    sleep(2.0)
+    os.kill(p.pid(), signal.SIGINT)
+    result = p.get_result()
+    assert "Cancelling Query" in result.stderr
+    assert "Invalid query handle" not in result.stderr
+
   def test_get_log_once(self, empty_table):
     """Test that get_log() is always called exactly once."""
     # Query with fetch

Reply via email to