Repository: incubator-impala
Updated Branches:
  refs/heads/master cd2ee9ecf -> 0dde1c2f8


IMPALA-3628: Fix cancellation from shell when security is enabled

To cancel a query, the shell will create a separate connection inside
it's SIGINT handler, and send the cancellation RPC. However this
connection did not start a secure connection if it needed to, meaning
that the cancellation attempt would just hang.

A workaround is to kill the shell process, which I expect is what users
have been doing with this bug which has been around since 2014.

Testing:

I added a custom cluster test that starts Impala with SSL
enabled, and wrote two tests - one just to check SSL connectivity, and
the other to mimic the existing test_cancellation which sends SIGINT to
the shell process. In doing so I refactored the shell testing code a bit
so that all tests use a single ImpalaShell object, rather than rolling
their own Popen() based approaches when they needed to do something
unusual, like cancel a query.

In the cancellation test on my machine, SIGINT can take a few tries to
be effective. I'm not sure if this is a timing thing - perhaps the
Python interpreter doesn't correctly pass signals through to a handler
if it's in a blocking call, for example. The test reliably passes within
~5 tries on my machine, so the test tries 30 times, once per second.

Change-Id: If99085e75708d92a08dbecf0131a2234fedad33a
Reviewed-on: http://gerrit.cloudera.org:8080/3302
Reviewed-by: Henry Robinson <[email protected]>
Tested-by: Henry Robinson <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/0dde1c2f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/0dde1c2f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/0dde1c2f

Branch: refs/heads/master
Commit: 0dde1c2f86ef924777ac196013d11aa2e28cc067
Parents: cd2ee9e
Author: Henry Robinson <[email protected]>
Authored: Thu Jun 2 11:49:49 2016 -0700
Committer: Tim Armstrong <[email protected]>
Committed: Tue Jul 5 16:40:23 2016 -0700

----------------------------------------------------------------------
 shell/impala_shell.py                   |  27 ++++--
 tests/custom_cluster/test_client_ssl.py |  81 ++++++++++++++++
 tests/shell/__init__.py                 |   1 +
 tests/shell/impala_shell_results.py     |  33 -------
 tests/shell/test_shell_commandline.py   |  36 ++-----
 tests/shell/test_shell_common.py        |  72 --------------
 tests/shell/test_shell_interactive.py   |  76 ++++++---------
 tests/shell/util.py                     | 139 +++++++++++++++++++++++++++
 8 files changed, 274 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0dde1c2f/shell/impala_shell.py
----------------------------------------------------------------------
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index 6e37aae..deb65d8 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -402,6 +402,12 @@ class ImpalaShell(cmd.Cmd):
       completed_cmd = sqlparse.format(cmd)
     return completed_cmd
 
+  def _new_impala_client(self):
+    return ImpalaClient(self.impalad, self.use_kerberos,
+                        self.kerberos_service_name, self.use_ssl,
+                        self.ca_cert, self.user, self.ldap_password,
+                        self.use_ldap)
+
   def _signal_handler(self, signal, frame):
     """Handles query cancellation on a Ctrl+C event"""
     if self.last_query_handle is None or self.query_handle_closed:
@@ -411,7 +417,7 @@ class ImpalaShell(cmd.Cmd):
       try:
         self.query_handle_closed = True
         print_to_stderr(ImpalaShell.CANCELLATION_MESSAGE)
-        new_imp_client = ImpalaClient(self.impalad)
+        new_imp_client = self._new_impala_client()
         new_imp_client.connect()
         new_imp_client.cancel_query(self.last_query_handle, False)
         self.imp_client.close_query(self.last_query_handle)
@@ -627,10 +633,7 @@ class ImpalaShell(cmd.Cmd):
       host_port.append('21000')
     self.impalad = tuple(host_port)
     if self.imp_client: self.imp_client.close_connection()
-    self.imp_client = ImpalaClient(self.impalad, self.use_kerberos,
-                                   self.kerberos_service_name, self.use_ssl,
-                                   self.ca_cert, self.user, self.ldap_password,
-                                   self.use_ldap)
+    self.imp_client = self._new_impala_client()
     self._connect()
     # If the connection fails and the Kerberos has not been enabled,
     # check for a valid kerberos ticket and retry the connection
@@ -638,11 +641,12 @@ class ImpalaShell(cmd.Cmd):
     if not self.imp_client.connected and not self.use_kerberos:
       try:
         if call(["klist", "-s"]) == 0:
-          print_to_stderr(("Kerberos ticket found in the credentials cache, 
retrying "
-                           "the connection with a secure transport."))
-          self.imp_client.use_kerberos = True
-          self.imp_client.use_ldap = False
-          self.imp_client.ldap_password = None
+          print_to_stderr("Kerberos ticket found in the credentials cache, 
retrying "
+                          "the connection with a secure transport.")
+          self.use_kerberos = True
+          self.use_ldap = False
+          self.ldap_password = None
+          self.imp_client = self._new_impala_client()
           self._connect()
       except OSError, e:
         pass
@@ -1365,5 +1369,8 @@ if __name__ == "__main__":
       except RPCException, e:
         # could not complete the rpc successfully
         print_to_stderr(e)
+      except IOError, e:
+        # Interrupted system calls (e.g. because of cancellation) should be 
ignored.
+        if e.errno != errno.EINTR: raise
     finally:
       intro = ''

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0dde1c2f/tests/custom_cluster/test_client_ssl.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_client_ssl.py 
b/tests/custom_cluster/test_client_ssl.py
new file mode 100644
index 0000000..6f8a01c
--- /dev/null
+++ b/tests/custom_cluster/test_client_ssl.py
@@ -0,0 +1,81 @@
+# Copyright (c) 2016 Cloudera, Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+import os
+import pytest
+import signal
+import socket
+import time
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.impala_service import ImpaladService
+from tests.shell.util import run_impala_shell_cmd, ImpalaShell
+from tests.common.impala_cluster import ImpalaCluster
+
+class TestClientSsl(CustomClusterTestSuite):
+  """Tests for a client using SSL (particularly, the Impala Shell) """
+
+  CERT_DIR = "%s/be/src/testutil" % os.environ['IMPALA_HOME']
+
+  SSL_ENABLED = "SSL is enabled"
+  CONNECTED = "Connected to"
+  FETCHED = "Fetched 1 row"
+
+  @pytest.mark.execute_serially
+  
@CustomClusterTestSuite.with_args("--ssl_server_certificate=%s/server-cert.pem "
+                                    "--ssl_private_key=%s/server-key.pem"
+                                    % (CERT_DIR, CERT_DIR))
+  def test_ssl(self, vector):
+    # TODO: This is really two different tests, but the custom cluster takes 
too long to
+    # start. Make it so that custom clusters can be specified across test 
suites.
+    result = run_impala_shell_cmd("--ssl --ca_cert=%s/server-cert.pem -q 
'select 1 + 2'"
+                                  % self.CERT_DIR)
+    for msg in [self.SSL_ENABLED, self.CONNECTED, self.FETCHED]:
+      assert msg in result.stderr
+
+    # No certificate checking: will accept any cert.
+    result = run_impala_shell_cmd("--ssl -q 'select 1 + 2'")
+    for msg in [self.SSL_ENABLED, self.CONNECTED, self.FETCHED]:
+      assert msg in result.stderr
+
+    # Test cancelling a query
+    impalad = ImpaladService(socket.getfqdn())
+    impalad.wait_for_num_in_flight_queries(0)
+    p = ImpalaShell(args="--ssl")
+    p.send_cmd("SET DEBUG_ACTION=0:OPEN:WAIT")
+    p.send_cmd("select count(*) from functional.alltypes")
+    impalad.wait_for_num_in_flight_queries(1)
+
+    LOG = logging.getLogger('test_client_ssl')
+    LOG.info("Cancelling query")
+    num_tries = 0
+    # In practice, sending SIGINT to the shell process doesn't always seem to 
get caught
+    # (and a search shows up some bugs in Python where SIGINT might be 
ignored). So retry
+    # for 30s until one signal takes.
+    while impalad.get_num_in_flight_queries() == 1:
+      time.sleep(1)
+      LOG.info("Sending signal...")
+      os.kill(p.pid(), signal.SIGINT)
+      num_tries += 1
+      assert num_tries < 30, "SIGINT was not caught by shell within 30s"
+
+    p.send_cmd("profile")
+    result = p.get_result()
+
+    print result.stderr
+    assert result.rc == 0
+    assert "Query Status: Cancelled" in result.stdout
+    assert impalad.wait_for_num_in_flight_queries(0)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0dde1c2f/tests/shell/__init__.py
----------------------------------------------------------------------
diff --git a/tests/shell/__init__.py b/tests/shell/__init__.py
new file mode 100644
index 0000000..946a474
--- /dev/null
+++ b/tests/shell/__init__.py
@@ -0,0 +1 @@
+# This file is needed to make the files in this directory a python module

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0dde1c2f/tests/shell/impala_shell_results.py
----------------------------------------------------------------------
diff --git a/tests/shell/impala_shell_results.py 
b/tests/shell/impala_shell_results.py
deleted file mode 100644
index 7cd8453..0000000
--- a/tests/shell/impala_shell_results.py
+++ /dev/null
@@ -1,33 +0,0 @@
-# encoding=utf-8
-# Copyright 2014 Cloudera Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import os
-
-PY_CMD = "%s/shell/impala_shell.py" % os.environ['IMPALA_HOME']
-
-class ImpalaShellResult(object):
-  def __init__(self):
-    self.rc = 0
-    self.stdout = str()
-    self.stderr = str()
-
-def get_shell_cmd_result(process, stdin_input=None):
-  result = ImpalaShellResult()
-  result.stdout, result.stderr = process.communicate(input=stdin_input)
-  # We need to close STDIN if we gave it an input, in order to send an EOF 
that will
-  # allow the subprocess to exit.
-  if stdin_input is not None: process.stdin.close()
-  result.rc = process.returncode
-  return result

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0dde1c2f/tests/shell/test_shell_commandline.py
----------------------------------------------------------------------
diff --git a/tests/shell/test_shell_commandline.py 
b/tests/shell/test_shell_commandline.py
index 77ee4be..f14e3ec 100644
--- a/tests/shell/test_shell_commandline.py
+++ b/tests/shell/test_shell_commandline.py
@@ -19,18 +19,14 @@ import re
 import shlex
 import signal
 
-from impala_shell_results import get_shell_cmd_result
 from subprocess import Popen, PIPE, call
 from tests.common.impala_service import ImpaladService
 from time import sleep
-from test_shell_common import assert_var_substitution
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.patterns import is_valid_impala_identifier
+from util import assert_var_substitution, run_impala_shell_cmd, ImpalaShell
+from util import IMPALAD, SHELL_CMD
 
-IMPALAD_HOST_PORT_LIST = pytest.config.option.impalad.split(',')
-assert len(IMPALAD_HOST_PORT_LIST) > 0, 'Must specify at least 1 impalad to 
target'
-IMPALAD = IMPALAD_HOST_PORT_LIST[0]
-SHELL_CMD = "%s/bin/impala-shell.sh -i %s" % (os.environ['IMPALA_HOME'], 
IMPALAD)
 DEFAULT_QUERY = 'select 1'
 QUERY_FILE_PATH = os.path.join(os.environ['IMPALA_HOME'], 'tests', 'shell')
 
@@ -304,25 +300,23 @@ class TestImpalaShell(ImpalaTestSuite):
     args = '-f %s/test_close_queries.sql --quiet -B' % QUERY_FILE_PATH
     cmd = "%s %s" % (SHELL_CMD, args)
     # Execute the shell command async
-    p = Popen(shlex.split(cmd), shell=False, stdout=PIPE, stderr=PIPE)
+    p = ImpalaShell(args)
 
     impalad_service = ImpaladService(IMPALAD.split(':')[0])
     # The last query in the test SQL script will sleep for 10 seconds, so sleep
     # here for 5 seconds and verify the number of in-flight queries is 1.
     sleep(5)
     assert 1 == impalad_service.get_num_in_flight_queries()
-    assert get_shell_cmd_result(p).rc == 0
+    assert p.get_result().rc == 0
     assert 0 == impalad_service.get_num_in_flight_queries()
 
   def test_cancellation(self):
     """Test cancellation (Ctrl+C event)."""
     args = '-q "select sleep(10000)"'
-    cmd = "%s %s" % (SHELL_CMD, args)
-
-    p = Popen(shlex.split(cmd), stderr=PIPE, stdout=PIPE)
+    p = ImpalaShell(args)
     sleep(3)
-    os.kill(p.pid, signal.SIGINT)
-    result = get_shell_cmd_result(p)
+    os.kill(p.pid(), signal.SIGINT)
+    result = p.get_result()
 
     assert "Cancelling Query" in result.stderr, result.stderr
 
@@ -423,19 +417,3 @@ class TestImpalaShell(ImpalaTestSuite):
            % (os.path.join(QUERY_FILE_PATH, 'test_var_substitution.sql'))
     result = run_impala_shell_cmd(args, expect_success=True)
     assert_var_substitution(result)
-
-
-def run_impala_shell_cmd(shell_args, expect_success=True, stdin_input=None):
-  """Run the Impala shell on the commandline.
-
-  'shell_args' is a string which represents the commandline options.
-  Returns a ImpalaShellResult.
-  """
-  cmd = "%s %s" % (SHELL_CMD, shell_args)
-  p = Popen(shlex.split(cmd), shell=False, stdout=PIPE, stderr=PIPE, 
stdin=PIPE)
-  result = get_shell_cmd_result(p, stdin_input)
-  if expect_success:
-    assert result.rc == 0, "Cmd %s was expected to succeed: %s" % (cmd, 
result.stderr)
-  else:
-    assert result.rc != 0, "Cmd %s was expected to fail" % cmd
-  return result

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0dde1c2f/tests/shell/test_shell_common.py
----------------------------------------------------------------------
diff --git a/tests/shell/test_shell_common.py b/tests/shell/test_shell_common.py
deleted file mode 100755
index 4d94055..0000000
--- a/tests/shell/test_shell_common.py
+++ /dev/null
@@ -1,72 +0,0 @@
-#!/usr/bin/env impala-python
-# encoding=utf-8
-# Copyright 2016 Cloudera Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import re
-
-def assert_var_substitution(result):
-  assert_pattern(r'\bfoo_number=.*$', 'foo_number= 123123', result.stdout, \
-    'Numeric values not replaced correctly')
-  assert_pattern(r'\bfoo_string=.*$', 'foo_string=123', result.stdout, \
-    'String values not replaced correctly')
-  assert_pattern(r'\bVariables:[\s\n]*BAR:\s*[0-9]*\n\s*FOO:\s*[0-9]*', \
-    'Variables:\n\tBAR: 456\n\tFOO: 123', result.stdout, \
-    "Set variable not listed correctly by the first SET command")
-  assert_pattern(r'\bError: Unknown variable FOO1$', \
-    'Error: Unknown variable FOO1', result.stderr, \
-    'Missing variable FOO1 not reported correctly')
-  assert_pattern(r'\bmulti_test=.*$', 'multi_test=456_123_456_123', \
-    result.stdout, 'Multiple replaces not working correctly')
-  assert_pattern(r'\bError:\s*Unknown\s*substitution\s*syntax\s*' +
-                 r'\(RANDOM_NAME\). Use \${VAR:var_name}', \
-    'Error: Unknown substitution syntax (RANDOM_NAME). Use ${VAR:var_name}', \
-    result.stderr, "Invalid variable reference")
-  assert_pattern(r'"This should be not replaced: \${VAR:foo} \${HIVEVAR:bar}"',
-    '"This should be not replaced: ${VAR:foo} ${HIVEVAR:bar}"', \
-    result.stdout, "Variable escaping not working")
-  assert_pattern(r'\bVariable MYVAR set to.*$', 'Variable MYVAR set to foo123',
-    result.stderr, 'No evidence of MYVAR variable being set.')
-  assert_pattern(r'\bVariables:[\s\n]*BAR:.*[\s\n]*FOO:.*[\s\n]*MYVAR:.*$',
-    'Variables:\n\tBAR: 456\n\tFOO: 123\n\tMYVAR: foo123', result.stdout,
-    'Set variables not listed correctly by the second SET command')
-  assert_pattern(r'\bUnsetting variable FOO$', 'Unsetting variable FOO',
-    result.stdout, 'No evidence of variable FOO being unset')
-  assert_pattern(r'\bUnsetting variable BAR$', 'Unsetting variable BAR',
-    result.stdout, 'No evidence of variable BAR being unset')
-  assert_pattern(r'\bVariables:[\s\n]*No variables defined\.$', \
-    'Variables:\n\tNo variables defined.', result.stdout, \
-    'Unset variables incorrectly listed by third SET command.')
-  assert_pattern(r'\bNo variable called NONEXISTENT is set', \
-    'No variable called NONEXISTENT is set', result.stdout, \
-    'Problem unsetting non-existent variable.')
-  assert_pattern(r'\bVariable COMMENT_TYPE1 set to.*$',
-    'Variable COMMENT_TYPE1 set to ok', result.stderr,
-    'No evidence of COMMENT_TYPE1 variable being set.')
-  assert_pattern(r'\bVariable COMMENT_TYPE2 set to.*$',
-    'Variable COMMENT_TYPE2 set to ok', result.stderr,
-    'No evidence of COMMENT_TYPE2 variable being set.')
-  assert_pattern(r'\bVariable COMMENT_TYPE3 set to.*$',
-    'Variable COMMENT_TYPE3 set to ok', result.stderr,
-    'No evidence of COMMENT_TYPE3 variable being set.')
-  assert_pattern(r'\bVariables:[\s\n]*COMMENT_TYPE1:.*[\s\n]*' + \
-    'COMMENT_TYPE2:.*[\s\n]*COMMENT_TYPE3:.*$',
-    'Variables:\n\tCOMMENT_TYPE1: ok\n\tCOMMENT_TYPE2: ok\n\tCOMMENT_TYPE3: 
ok', \
-    result.stdout, 'Set variables not listed correctly by the SET command')
-
-def assert_pattern(pattern, result, text, message):
-  """Asserts that the pattern, when applied to text, returns the expected 
result"""
-  m = re.search(pattern, text, re.MULTILINE)
-  assert m and m.group(0) == result, message
-

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0dde1c2f/tests/shell/test_shell_interactive.py
----------------------------------------------------------------------
diff --git a/tests/shell/test_shell_interactive.py 
b/tests/shell/test_shell_interactive.py
index 7b54d61..f03c58f 100755
--- a/tests/shell/test_shell_interactive.py
+++ b/tests/shell/test_shell_interactive.py
@@ -24,12 +24,11 @@ import socket
 import signal
 import sys
 
-from impala_shell_results import get_shell_cmd_result
 from subprocess import Popen, PIPE
 from tests.common.impala_service import ImpaladService
 from tests.common.skip import SkipIfLocal
 from time import sleep
-from test_shell_common import assert_var_substitution
+from util import assert_var_substitution, ImpalaShell
 
 SHELL_CMD = "%s/bin/impala-shell.sh" % os.environ['IMPALA_HOME']
 SHELL_HISTORY_FILE = os.path.expanduser("~/.impalahistory")
@@ -39,19 +38,6 @@ QUERY_FILE_PATH = os.path.join(os.environ['IMPALA_HOME'], 
'tests', 'shell')
 class TestImpalaShellInteractive(object):
   """Test the impala shell interactively"""
 
-  def _send_cmd_to_shell(self, p, cmd):
-    """Given an open shell process, write a cmd to stdin
-
-    This method takes care of adding the delimiter and EOL, callers should 
send the raw
-    command.
-    """
-    p.stdin.write("%s;\n" % cmd)
-    p.stdin.flush()
-
-  def _start_new_shell_process(self):
-    """Starts a shell process and returns the process handle"""
-    return Popen([SHELL_CMD], stdout=PIPE, stdin=PIPE, stderr=PIPE)
-
   @classmethod
   def setup_class(cls):
     if os.path.exists(SHELL_HISTORY_FILE):
@@ -85,15 +71,15 @@ class TestImpalaShellInteractive(object):
   def test_compute_stats_with_live_progress_options(self):
     """Test that setting LIVE_PROGRESS options won't cause COMPUTE STATS query 
fail"""
     impalad = ImpaladService(socket.getfqdn())
-    p = self._start_new_shell_process()
-    self._send_cmd_to_shell(p, "set live_progress=True")
-    self._send_cmd_to_shell(p, "set live_summary=True")
-    self._send_cmd_to_shell(p, 'create table test_live_progress_option(col 
int);')
+    p = ImpalaShell()
+    p.send_cmd("set live_progress=True")
+    p.send_cmd("set live_summary=True")
+    p.send_cmd('create table test_live_progress_option(col int);')
     try:
-      self._send_cmd_to_shell(p, 'compute stats test_live_progress_option;')
+      p.send_cmd('compute stats test_live_progress_option;')
     finally:
-      self._send_cmd_to_shell(p, 'drop table if exists 
test_live_progress_option;')
-    result = get_shell_cmd_result(p)
+      p.send_cmd('drop table if exists test_live_progress_option;')
+    result = p.get_result()
     assert "Updated 1 partition(s) and 1 column(s)" in result.stdout
 
   @pytest.mark.execute_serially
@@ -115,11 +101,11 @@ class TestImpalaShellInteractive(object):
     impalad = ImpaladService(socket.getfqdn())
     impalad.wait_for_num_in_flight_queries(0)
     command = "select sleep(10000);"
-    p = self._start_new_shell_process()
-    self._send_cmd_to_shell(p, command)
+    p = ImpalaShell()
+    p.send_cmd(command)
     sleep(3)
-    os.kill(p.pid, signal.SIGINT)
-    result = get_shell_cmd_result(p)
+    os.kill(p.pid(), signal.SIGINT)
+    result = p.get_result()
     assert "Cancelled" not in result.stderr
     assert impalad.wait_for_num_in_flight_queries(0)
 
@@ -170,12 +156,12 @@ class TestImpalaShellInteractive(object):
     num_sessions_initial = get_num_open_sessions(initial_impala_service)
     num_sessions_target = get_num_open_sessions(target_impala_service)
     # Connect to localhost:21000 (default)
-    p = self._start_new_shell_process()
+    p = ImpalaShell()
     sleep(2)
     # Make sure we're connected <hostname>:21000
     assert get_num_open_sessions(initial_impala_service) == 
num_sessions_initial + 1, \
         "Not connected to %s:21000" % hostname
-    self._send_cmd_to_shell(p, "connect %s:21001" % hostname)
+    p.send_cmd("connect %s:21001" % hostname)
     # Wait for a little while
     sleep(2)
     # The number of sessions on the target impalad should have been 
incremented.
@@ -201,18 +187,18 @@ class TestImpalaShellInteractive(object):
     NUM_QUERIES = 'impala-server.num-queries'
 
     impalad = ImpaladService(socket.getfqdn())
-    p = self._start_new_shell_process()
+    p = ImpalaShell()
     try:
       start_num_queries = impalad.get_metric_value(NUM_QUERIES)
-      self._send_cmd_to_shell(p, 'create database if not exists %s' % TMP_DB)
-      self._send_cmd_to_shell(p, 'use %s' % TMP_DB)
+      p.send_cmd('create database if not exists %s' % TMP_DB)
+      p.send_cmd('use %s' % TMP_DB)
       impalad.wait_for_metric_value(NUM_QUERIES, start_num_queries + 2)
       assert impalad.wait_for_num_in_flight_queries(0), MSG % 'use'
-      self._send_cmd_to_shell(p, 'create table %s(i int)' % TMP_TBL)
-      self._send_cmd_to_shell(p, 'alter table %s add columns (j int)' % 
TMP_TBL)
+      p.send_cmd('create table %s(i int)' % TMP_TBL)
+      p.send_cmd('alter table %s add columns (j int)' % TMP_TBL)
       impalad.wait_for_metric_value(NUM_QUERIES, start_num_queries + 4)
       assert impalad.wait_for_num_in_flight_queries(0), MSG % 'alter'
-      self._send_cmd_to_shell(p, 'drop table %s' % TMP_TBL)
+      p.send_cmd('drop table %s' % TMP_TBL)
       impalad.wait_for_metric_value(NUM_QUERIES, start_num_queries + 5)
       assert impalad.wait_for_num_in_flight_queries(0), MSG % 'drop'
     finally:
@@ -238,9 +224,9 @@ class TestImpalaShellInteractive(object):
       child_proc.sendline(query)
     child_proc.expect(prompt_regex)
     child_proc.sendline('quit;')
-    p = self._start_new_shell_process()
-    self._send_cmd_to_shell(p, 'history')
-    result = get_shell_cmd_result(p)
+    p = ImpalaShell()
+    p.send_cmd('history')
+    result = p.get_result()
     for query in queries:
       assert query in result.stderr, "'%s' not in '%s'" % (query, 
result.stderr)
 
@@ -260,9 +246,8 @@ class TestImpalaShellInteractive(object):
 
   @pytest.mark.execute_serially
   def test_var_substitution(self):
-    cmds = open(os.path.join(QUERY_FILE_PATH, 
'test_var_substitution.sql')).read().\
-      split('\n')
-    args = '--var=foo=123 --var=BAR=456 --delimited --output_delimiter=" "'
+    cmds = open(os.path.join(QUERY_FILE_PATH, 
'test_var_substitution.sql')).read()
+    args = '''--var=foo=123 --var=BAR=456 --delimited "--output_delimiter= " 
'''
     result = run_impala_shell_interactive(cmds, shell_args=args)
     assert_var_substitution(result)
 
@@ -302,9 +287,8 @@ class TestImpalaShellInteractive(object):
     result = run_impala_shell_interactive("source %s;" % full_path)
     assert "No such file or directory" in result.stderr
 
-def run_impala_shell_interactive(input_lines, shell_args=''):
+def run_impala_shell_interactive(input_lines, shell_args=None):
   """Runs a command in the Impala shell interactively."""
-  cmd = "%s %s" % (SHELL_CMD, shell_args)
   # if argument "input_lines" is a string, makes it into a list
   if type(input_lines) is str:
     input_lines = [input_lines]
@@ -312,9 +296,7 @@ def run_impala_shell_interactive(input_lines, 
shell_args=''):
   # since piping defaults to ascii
   my_env = os.environ
   my_env['PYTHONIOENCODING'] = 'utf-8'
-  p = Popen(cmd, shell=True, stdout=PIPE,
-            stdin=PIPE, stderr=PIPE, env=my_env)
+  p = ImpalaShell(shell_args, env=my_env)
   for line in input_lines:
-    p.stdin.write(line + "\n")
-    p.stdin.flush()
-  return get_shell_cmd_result(p)
+    p.send_cmd(line)
+  return p.get_result()

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0dde1c2f/tests/shell/util.py
----------------------------------------------------------------------
diff --git a/tests/shell/util.py b/tests/shell/util.py
new file mode 100755
index 0000000..36cb729
--- /dev/null
+++ b/tests/shell/util.py
@@ -0,0 +1,139 @@
+#!/usr/bin/env impala-python
+# encoding=utf-8
+# Copyright 2016 Cloudera Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import pytest
+import re
+import shlex
+from subprocess import Popen, PIPE
+
+IMPALAD_HOST_PORT_LIST = pytest.config.option.impalad.split(',')
+assert len(IMPALAD_HOST_PORT_LIST) > 0, 'Must specify at least 1 impalad to 
target'
+IMPALAD = IMPALAD_HOST_PORT_LIST[0]
+SHELL_CMD = "%s/bin/impala-shell.sh -i %s" % (os.environ['IMPALA_HOME'], 
IMPALAD)
+
+def assert_var_substitution(result):
+  assert_pattern(r'\bfoo_number=.*$', 'foo_number= 123123', result.stdout, \
+    'Numeric values not replaced correctly')
+  assert_pattern(r'\bfoo_string=.*$', 'foo_string=123', result.stdout, \
+    'String values not replaced correctly')
+  assert_pattern(r'\bVariables:[\s\n]*BAR:\s*[0-9]*\n\s*FOO:\s*[0-9]*', \
+    'Variables:\n\tBAR: 456\n\tFOO: 123', result.stdout, \
+    "Set variable not listed correctly by the first SET command")
+  assert_pattern(r'\bError: Unknown variable FOO1$', \
+    'Error: Unknown variable FOO1', result.stderr, \
+    'Missing variable FOO1 not reported correctly')
+  assert_pattern(r'\bmulti_test=.*$', 'multi_test=456_123_456_123', \
+    result.stdout, 'Multiple replaces not working correctly')
+  assert_pattern(r'\bError:\s*Unknown\s*substitution\s*syntax\s*' +
+                 r'\(RANDOM_NAME\). Use \${VAR:var_name}', \
+    'Error: Unknown substitution syntax (RANDOM_NAME). Use ${VAR:var_name}', \
+    result.stderr, "Invalid variable reference")
+  assert_pattern(r'"This should be not replaced: \${VAR:foo} \${HIVEVAR:bar}"',
+    '"This should be not replaced: ${VAR:foo} ${HIVEVAR:bar}"', \
+    result.stdout, "Variable escaping not working")
+  assert_pattern(r'\bVariable MYVAR set to.*$', 'Variable MYVAR set to foo123',
+    result.stderr, 'No evidence of MYVAR variable being set.')
+  assert_pattern(r'\bVariables:[\s\n]*BAR:.*[\s\n]*FOO:.*[\s\n]*MYVAR:.*$',
+    'Variables:\n\tBAR: 456\n\tFOO: 123\n\tMYVAR: foo123', result.stdout,
+    'Set variables not listed correctly by the second SET command')
+  assert_pattern(r'\bUnsetting variable FOO$', 'Unsetting variable FOO',
+    result.stdout, 'No evidence of variable FOO being unset')
+  assert_pattern(r'\bUnsetting variable BAR$', 'Unsetting variable BAR',
+    result.stdout, 'No evidence of variable BAR being unset')
+  assert_pattern(r'\bVariables:[\s\n]*No variables defined\.$', \
+    'Variables:\n\tNo variables defined.', result.stdout, \
+    'Unset variables incorrectly listed by third SET command.')
+  assert_pattern(r'\bNo variable called NONEXISTENT is set', \
+    'No variable called NONEXISTENT is set', result.stdout, \
+    'Problem unsetting non-existent variable.')
+  assert_pattern(r'\bVariable COMMENT_TYPE1 set to.*$',
+    'Variable COMMENT_TYPE1 set to ok', result.stderr,
+    'No evidence of COMMENT_TYPE1 variable being set.')
+  assert_pattern(r'\bVariable COMMENT_TYPE2 set to.*$',
+    'Variable COMMENT_TYPE2 set to ok', result.stderr,
+    'No evidence of COMMENT_TYPE2 variable being set.')
+  assert_pattern(r'\bVariable COMMENT_TYPE3 set to.*$',
+    'Variable COMMENT_TYPE3 set to ok', result.stderr,
+    'No evidence of COMMENT_TYPE3 variable being set.')
+  assert_pattern(r'\bVariables:[\s\n]*COMMENT_TYPE1:.*[\s\n]*' + \
+    'COMMENT_TYPE2:.*[\s\n]*COMMENT_TYPE3:.*$',
+    'Variables:\n\tCOMMENT_TYPE1: ok\n\tCOMMENT_TYPE2: ok\n\tCOMMENT_TYPE3: 
ok', \
+    result.stdout, 'Set variables not listed correctly by the SET command')
+
+def assert_pattern(pattern, result, text, message):
+  """Asserts that the pattern, when applied to text, returns the expected 
result"""
+  m = re.search(pattern, text, re.MULTILINE)
+  assert m and m.group(0) == result, message
+
+def run_impala_shell_cmd(shell_args, expect_success=True, stdin_input=None):
+  """Runs the Impala shell on the commandline.
+
+  'shell_args' is a string which represents the commandline options.
+  Returns a ImpalaShellResult.
+  """
+  p = ImpalaShell(shell_args)
+  result = p.get_result(stdin_input)
+  cmd = "%s %s" % (SHELL_CMD, shell_args)
+  if expect_success:
+    assert result.rc == 0, "Cmd %s was expected to succeed: %s" % (cmd, 
result.stderr)
+  else:
+    assert result.rc != 0, "Cmd %s was expected to fail" % cmd
+  return result
+
+class ImpalaShellResult(object):
+  def __init__(self):
+    self.rc = 0
+    self.stdout = str()
+    self.stderr = str()
+
+class ImpalaShell(object):
+  """A single instance of the Impala shell. The proces is started when this 
object is
+     constructed, and then users should repeatedly call send_cmd(), followed 
eventually by
+     get_result() to retrieve the process output."""
+  def __init__(self, args=None, env=None):
+    self.shell_process = self._start_new_shell_process(args, env=env)
+
+  def pid(self):
+    return self.shell_process.pid
+
+  def send_cmd(self, cmd):
+    """Send a single command to the shell. This method adds the end-of-query
+       terminator (';'). """
+    self.shell_process.stdin.write("%s;\n" % cmd)
+    self.shell_process.stdin.flush()
+    # Allow fluent-style chaining of commands
+    return self
+
+  def get_result(self, stdin_input=None):
+    """Returns an ImpalaShellResult produced by the shell process on exit. 
After this
+       method returns, send_cmd() no longer has any effect."""
+    result = ImpalaShellResult()
+    result.stdout, result.stderr = 
self.shell_process.communicate(input=stdin_input)
+    # We need to close STDIN if we gave it an input, in order to send an EOF 
that will
+    # allow the subprocess to exit.
+    if stdin_input is not None: self.shell_process.stdin.close()
+    result.rc = self.shell_process.returncode
+    return result
+
+  def _start_new_shell_process(self, args=None, env=None):
+    """Starts a shell process and returns the process handle"""
+    shell_args = SHELL_CMD
+    if args is not None: shell_args = "%s %s" % (SHELL_CMD, args)
+    lex = shlex.split(shell_args)
+    if not env: env = os.environ
+    return Popen(lex, shell=False, stdout=PIPE, stdin=PIPE, stderr=PIPE,
+                 env=env)

Reply via email to