Repository: incubator-impala Updated Branches: refs/heads/master cd2ee9ecf -> 0dde1c2f8
IMPALA-3628: Fix cancellation from shell when security is enabled To cancel a query, the shell will create a separate connection inside it's SIGINT handler, and send the cancellation RPC. However this connection did not start a secure connection if it needed to, meaning that the cancellation attempt would just hang. A workaround is to kill the shell process, which I expect is what users have been doing with this bug which has been around since 2014. Testing: I added a custom cluster test that starts Impala with SSL enabled, and wrote two tests - one just to check SSL connectivity, and the other to mimic the existing test_cancellation which sends SIGINT to the shell process. In doing so I refactored the shell testing code a bit so that all tests use a single ImpalaShell object, rather than rolling their own Popen() based approaches when they needed to do something unusual, like cancel a query. In the cancellation test on my machine, SIGINT can take a few tries to be effective. I'm not sure if this is a timing thing - perhaps the Python interpreter doesn't correctly pass signals through to a handler if it's in a blocking call, for example. The test reliably passes within ~5 tries on my machine, so the test tries 30 times, once per second. Change-Id: If99085e75708d92a08dbecf0131a2234fedad33a Reviewed-on: http://gerrit.cloudera.org:8080/3302 Reviewed-by: Henry Robinson <[email protected]> Tested-by: Henry Robinson <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/0dde1c2f Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/0dde1c2f Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/0dde1c2f Branch: refs/heads/master Commit: 0dde1c2f86ef924777ac196013d11aa2e28cc067 Parents: cd2ee9e Author: Henry Robinson <[email protected]> Authored: Thu Jun 2 11:49:49 2016 -0700 Committer: Tim Armstrong <[email protected]> Committed: Tue Jul 5 16:40:23 2016 -0700 ---------------------------------------------------------------------- shell/impala_shell.py | 27 ++++-- tests/custom_cluster/test_client_ssl.py | 81 ++++++++++++++++ tests/shell/__init__.py | 1 + tests/shell/impala_shell_results.py | 33 ------- tests/shell/test_shell_commandline.py | 36 ++----- tests/shell/test_shell_common.py | 72 -------------- tests/shell/test_shell_interactive.py | 76 ++++++--------- tests/shell/util.py | 139 +++++++++++++++++++++++++++ 8 files changed, 274 insertions(+), 191 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0dde1c2f/shell/impala_shell.py ---------------------------------------------------------------------- diff --git a/shell/impala_shell.py b/shell/impala_shell.py index 6e37aae..deb65d8 100755 --- a/shell/impala_shell.py +++ b/shell/impala_shell.py @@ -402,6 +402,12 @@ class ImpalaShell(cmd.Cmd): completed_cmd = sqlparse.format(cmd) return completed_cmd + def _new_impala_client(self): + return ImpalaClient(self.impalad, self.use_kerberos, + self.kerberos_service_name, self.use_ssl, + self.ca_cert, self.user, self.ldap_password, + self.use_ldap) + def _signal_handler(self, signal, frame): """Handles query cancellation on a Ctrl+C event""" if self.last_query_handle is None or self.query_handle_closed: @@ -411,7 +417,7 @@ class ImpalaShell(cmd.Cmd): try: self.query_handle_closed = True print_to_stderr(ImpalaShell.CANCELLATION_MESSAGE) - new_imp_client = ImpalaClient(self.impalad) + new_imp_client = self._new_impala_client() new_imp_client.connect() new_imp_client.cancel_query(self.last_query_handle, False) self.imp_client.close_query(self.last_query_handle) @@ -627,10 +633,7 @@ class ImpalaShell(cmd.Cmd): host_port.append('21000') self.impalad = tuple(host_port) if self.imp_client: self.imp_client.close_connection() - self.imp_client = ImpalaClient(self.impalad, self.use_kerberos, - self.kerberos_service_name, self.use_ssl, - self.ca_cert, self.user, self.ldap_password, - self.use_ldap) + self.imp_client = self._new_impala_client() self._connect() # If the connection fails and the Kerberos has not been enabled, # check for a valid kerberos ticket and retry the connection @@ -638,11 +641,12 @@ class ImpalaShell(cmd.Cmd): if not self.imp_client.connected and not self.use_kerberos: try: if call(["klist", "-s"]) == 0: - print_to_stderr(("Kerberos ticket found in the credentials cache, retrying " - "the connection with a secure transport.")) - self.imp_client.use_kerberos = True - self.imp_client.use_ldap = False - self.imp_client.ldap_password = None + print_to_stderr("Kerberos ticket found in the credentials cache, retrying " + "the connection with a secure transport.") + self.use_kerberos = True + self.use_ldap = False + self.ldap_password = None + self.imp_client = self._new_impala_client() self._connect() except OSError, e: pass @@ -1365,5 +1369,8 @@ if __name__ == "__main__": except RPCException, e: # could not complete the rpc successfully print_to_stderr(e) + except IOError, e: + # Interrupted system calls (e.g. because of cancellation) should be ignored. + if e.errno != errno.EINTR: raise finally: intro = '' http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0dde1c2f/tests/custom_cluster/test_client_ssl.py ---------------------------------------------------------------------- diff --git a/tests/custom_cluster/test_client_ssl.py b/tests/custom_cluster/test_client_ssl.py new file mode 100644 index 0000000..6f8a01c --- /dev/null +++ b/tests/custom_cluster/test_client_ssl.py @@ -0,0 +1,81 @@ +# Copyright (c) 2016 Cloudera, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import os +import pytest +import signal +import socket +import time + +from tests.common.custom_cluster_test_suite import CustomClusterTestSuite +from tests.common.impala_service import ImpaladService +from tests.shell.util import run_impala_shell_cmd, ImpalaShell +from tests.common.impala_cluster import ImpalaCluster + +class TestClientSsl(CustomClusterTestSuite): + """Tests for a client using SSL (particularly, the Impala Shell) """ + + CERT_DIR = "%s/be/src/testutil" % os.environ['IMPALA_HOME'] + + SSL_ENABLED = "SSL is enabled" + CONNECTED = "Connected to" + FETCHED = "Fetched 1 row" + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args("--ssl_server_certificate=%s/server-cert.pem " + "--ssl_private_key=%s/server-key.pem" + % (CERT_DIR, CERT_DIR)) + def test_ssl(self, vector): + # TODO: This is really two different tests, but the custom cluster takes too long to + # start. Make it so that custom clusters can be specified across test suites. + result = run_impala_shell_cmd("--ssl --ca_cert=%s/server-cert.pem -q 'select 1 + 2'" + % self.CERT_DIR) + for msg in [self.SSL_ENABLED, self.CONNECTED, self.FETCHED]: + assert msg in result.stderr + + # No certificate checking: will accept any cert. + result = run_impala_shell_cmd("--ssl -q 'select 1 + 2'") + for msg in [self.SSL_ENABLED, self.CONNECTED, self.FETCHED]: + assert msg in result.stderr + + # Test cancelling a query + impalad = ImpaladService(socket.getfqdn()) + impalad.wait_for_num_in_flight_queries(0) + p = ImpalaShell(args="--ssl") + p.send_cmd("SET DEBUG_ACTION=0:OPEN:WAIT") + p.send_cmd("select count(*) from functional.alltypes") + impalad.wait_for_num_in_flight_queries(1) + + LOG = logging.getLogger('test_client_ssl') + LOG.info("Cancelling query") + num_tries = 0 + # In practice, sending SIGINT to the shell process doesn't always seem to get caught + # (and a search shows up some bugs in Python where SIGINT might be ignored). So retry + # for 30s until one signal takes. + while impalad.get_num_in_flight_queries() == 1: + time.sleep(1) + LOG.info("Sending signal...") + os.kill(p.pid(), signal.SIGINT) + num_tries += 1 + assert num_tries < 30, "SIGINT was not caught by shell within 30s" + + p.send_cmd("profile") + result = p.get_result() + + print result.stderr + assert result.rc == 0 + assert "Query Status: Cancelled" in result.stdout + assert impalad.wait_for_num_in_flight_queries(0) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0dde1c2f/tests/shell/__init__.py ---------------------------------------------------------------------- diff --git a/tests/shell/__init__.py b/tests/shell/__init__.py new file mode 100644 index 0000000..946a474 --- /dev/null +++ b/tests/shell/__init__.py @@ -0,0 +1 @@ +# This file is needed to make the files in this directory a python module http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0dde1c2f/tests/shell/impala_shell_results.py ---------------------------------------------------------------------- diff --git a/tests/shell/impala_shell_results.py b/tests/shell/impala_shell_results.py deleted file mode 100644 index 7cd8453..0000000 --- a/tests/shell/impala_shell_results.py +++ /dev/null @@ -1,33 +0,0 @@ -# encoding=utf-8 -# Copyright 2014 Cloudera Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os - -PY_CMD = "%s/shell/impala_shell.py" % os.environ['IMPALA_HOME'] - -class ImpalaShellResult(object): - def __init__(self): - self.rc = 0 - self.stdout = str() - self.stderr = str() - -def get_shell_cmd_result(process, stdin_input=None): - result = ImpalaShellResult() - result.stdout, result.stderr = process.communicate(input=stdin_input) - # We need to close STDIN if we gave it an input, in order to send an EOF that will - # allow the subprocess to exit. - if stdin_input is not None: process.stdin.close() - result.rc = process.returncode - return result http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0dde1c2f/tests/shell/test_shell_commandline.py ---------------------------------------------------------------------- diff --git a/tests/shell/test_shell_commandline.py b/tests/shell/test_shell_commandline.py index 77ee4be..f14e3ec 100644 --- a/tests/shell/test_shell_commandline.py +++ b/tests/shell/test_shell_commandline.py @@ -19,18 +19,14 @@ import re import shlex import signal -from impala_shell_results import get_shell_cmd_result from subprocess import Popen, PIPE, call from tests.common.impala_service import ImpaladService from time import sleep -from test_shell_common import assert_var_substitution from tests.common.impala_test_suite import ImpalaTestSuite from tests.common.patterns import is_valid_impala_identifier +from util import assert_var_substitution, run_impala_shell_cmd, ImpalaShell +from util import IMPALAD, SHELL_CMD -IMPALAD_HOST_PORT_LIST = pytest.config.option.impalad.split(',') -assert len(IMPALAD_HOST_PORT_LIST) > 0, 'Must specify at least 1 impalad to target' -IMPALAD = IMPALAD_HOST_PORT_LIST[0] -SHELL_CMD = "%s/bin/impala-shell.sh -i %s" % (os.environ['IMPALA_HOME'], IMPALAD) DEFAULT_QUERY = 'select 1' QUERY_FILE_PATH = os.path.join(os.environ['IMPALA_HOME'], 'tests', 'shell') @@ -304,25 +300,23 @@ class TestImpalaShell(ImpalaTestSuite): args = '-f %s/test_close_queries.sql --quiet -B' % QUERY_FILE_PATH cmd = "%s %s" % (SHELL_CMD, args) # Execute the shell command async - p = Popen(shlex.split(cmd), shell=False, stdout=PIPE, stderr=PIPE) + p = ImpalaShell(args) impalad_service = ImpaladService(IMPALAD.split(':')[0]) # The last query in the test SQL script will sleep for 10 seconds, so sleep # here for 5 seconds and verify the number of in-flight queries is 1. sleep(5) assert 1 == impalad_service.get_num_in_flight_queries() - assert get_shell_cmd_result(p).rc == 0 + assert p.get_result().rc == 0 assert 0 == impalad_service.get_num_in_flight_queries() def test_cancellation(self): """Test cancellation (Ctrl+C event).""" args = '-q "select sleep(10000)"' - cmd = "%s %s" % (SHELL_CMD, args) - - p = Popen(shlex.split(cmd), stderr=PIPE, stdout=PIPE) + p = ImpalaShell(args) sleep(3) - os.kill(p.pid, signal.SIGINT) - result = get_shell_cmd_result(p) + os.kill(p.pid(), signal.SIGINT) + result = p.get_result() assert "Cancelling Query" in result.stderr, result.stderr @@ -423,19 +417,3 @@ class TestImpalaShell(ImpalaTestSuite): % (os.path.join(QUERY_FILE_PATH, 'test_var_substitution.sql')) result = run_impala_shell_cmd(args, expect_success=True) assert_var_substitution(result) - - -def run_impala_shell_cmd(shell_args, expect_success=True, stdin_input=None): - """Run the Impala shell on the commandline. - - 'shell_args' is a string which represents the commandline options. - Returns a ImpalaShellResult. - """ - cmd = "%s %s" % (SHELL_CMD, shell_args) - p = Popen(shlex.split(cmd), shell=False, stdout=PIPE, stderr=PIPE, stdin=PIPE) - result = get_shell_cmd_result(p, stdin_input) - if expect_success: - assert result.rc == 0, "Cmd %s was expected to succeed: %s" % (cmd, result.stderr) - else: - assert result.rc != 0, "Cmd %s was expected to fail" % cmd - return result http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0dde1c2f/tests/shell/test_shell_common.py ---------------------------------------------------------------------- diff --git a/tests/shell/test_shell_common.py b/tests/shell/test_shell_common.py deleted file mode 100755 index 4d94055..0000000 --- a/tests/shell/test_shell_common.py +++ /dev/null @@ -1,72 +0,0 @@ -#!/usr/bin/env impala-python -# encoding=utf-8 -# Copyright 2016 Cloudera Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import re - -def assert_var_substitution(result): - assert_pattern(r'\bfoo_number=.*$', 'foo_number= 123123', result.stdout, \ - 'Numeric values not replaced correctly') - assert_pattern(r'\bfoo_string=.*$', 'foo_string=123', result.stdout, \ - 'String values not replaced correctly') - assert_pattern(r'\bVariables:[\s\n]*BAR:\s*[0-9]*\n\s*FOO:\s*[0-9]*', \ - 'Variables:\n\tBAR: 456\n\tFOO: 123', result.stdout, \ - "Set variable not listed correctly by the first SET command") - assert_pattern(r'\bError: Unknown variable FOO1$', \ - 'Error: Unknown variable FOO1', result.stderr, \ - 'Missing variable FOO1 not reported correctly') - assert_pattern(r'\bmulti_test=.*$', 'multi_test=456_123_456_123', \ - result.stdout, 'Multiple replaces not working correctly') - assert_pattern(r'\bError:\s*Unknown\s*substitution\s*syntax\s*' + - r'\(RANDOM_NAME\). Use \${VAR:var_name}', \ - 'Error: Unknown substitution syntax (RANDOM_NAME). Use ${VAR:var_name}', \ - result.stderr, "Invalid variable reference") - assert_pattern(r'"This should be not replaced: \${VAR:foo} \${HIVEVAR:bar}"', - '"This should be not replaced: ${VAR:foo} ${HIVEVAR:bar}"', \ - result.stdout, "Variable escaping not working") - assert_pattern(r'\bVariable MYVAR set to.*$', 'Variable MYVAR set to foo123', - result.stderr, 'No evidence of MYVAR variable being set.') - assert_pattern(r'\bVariables:[\s\n]*BAR:.*[\s\n]*FOO:.*[\s\n]*MYVAR:.*$', - 'Variables:\n\tBAR: 456\n\tFOO: 123\n\tMYVAR: foo123', result.stdout, - 'Set variables not listed correctly by the second SET command') - assert_pattern(r'\bUnsetting variable FOO$', 'Unsetting variable FOO', - result.stdout, 'No evidence of variable FOO being unset') - assert_pattern(r'\bUnsetting variable BAR$', 'Unsetting variable BAR', - result.stdout, 'No evidence of variable BAR being unset') - assert_pattern(r'\bVariables:[\s\n]*No variables defined\.$', \ - 'Variables:\n\tNo variables defined.', result.stdout, \ - 'Unset variables incorrectly listed by third SET command.') - assert_pattern(r'\bNo variable called NONEXISTENT is set', \ - 'No variable called NONEXISTENT is set', result.stdout, \ - 'Problem unsetting non-existent variable.') - assert_pattern(r'\bVariable COMMENT_TYPE1 set to.*$', - 'Variable COMMENT_TYPE1 set to ok', result.stderr, - 'No evidence of COMMENT_TYPE1 variable being set.') - assert_pattern(r'\bVariable COMMENT_TYPE2 set to.*$', - 'Variable COMMENT_TYPE2 set to ok', result.stderr, - 'No evidence of COMMENT_TYPE2 variable being set.') - assert_pattern(r'\bVariable COMMENT_TYPE3 set to.*$', - 'Variable COMMENT_TYPE3 set to ok', result.stderr, - 'No evidence of COMMENT_TYPE3 variable being set.') - assert_pattern(r'\bVariables:[\s\n]*COMMENT_TYPE1:.*[\s\n]*' + \ - 'COMMENT_TYPE2:.*[\s\n]*COMMENT_TYPE3:.*$', - 'Variables:\n\tCOMMENT_TYPE1: ok\n\tCOMMENT_TYPE2: ok\n\tCOMMENT_TYPE3: ok', \ - result.stdout, 'Set variables not listed correctly by the SET command') - -def assert_pattern(pattern, result, text, message): - """Asserts that the pattern, when applied to text, returns the expected result""" - m = re.search(pattern, text, re.MULTILINE) - assert m and m.group(0) == result, message - http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0dde1c2f/tests/shell/test_shell_interactive.py ---------------------------------------------------------------------- diff --git a/tests/shell/test_shell_interactive.py b/tests/shell/test_shell_interactive.py index 7b54d61..f03c58f 100755 --- a/tests/shell/test_shell_interactive.py +++ b/tests/shell/test_shell_interactive.py @@ -24,12 +24,11 @@ import socket import signal import sys -from impala_shell_results import get_shell_cmd_result from subprocess import Popen, PIPE from tests.common.impala_service import ImpaladService from tests.common.skip import SkipIfLocal from time import sleep -from test_shell_common import assert_var_substitution +from util import assert_var_substitution, ImpalaShell SHELL_CMD = "%s/bin/impala-shell.sh" % os.environ['IMPALA_HOME'] SHELL_HISTORY_FILE = os.path.expanduser("~/.impalahistory") @@ -39,19 +38,6 @@ QUERY_FILE_PATH = os.path.join(os.environ['IMPALA_HOME'], 'tests', 'shell') class TestImpalaShellInteractive(object): """Test the impala shell interactively""" - def _send_cmd_to_shell(self, p, cmd): - """Given an open shell process, write a cmd to stdin - - This method takes care of adding the delimiter and EOL, callers should send the raw - command. - """ - p.stdin.write("%s;\n" % cmd) - p.stdin.flush() - - def _start_new_shell_process(self): - """Starts a shell process and returns the process handle""" - return Popen([SHELL_CMD], stdout=PIPE, stdin=PIPE, stderr=PIPE) - @classmethod def setup_class(cls): if os.path.exists(SHELL_HISTORY_FILE): @@ -85,15 +71,15 @@ class TestImpalaShellInteractive(object): def test_compute_stats_with_live_progress_options(self): """Test that setting LIVE_PROGRESS options won't cause COMPUTE STATS query fail""" impalad = ImpaladService(socket.getfqdn()) - p = self._start_new_shell_process() - self._send_cmd_to_shell(p, "set live_progress=True") - self._send_cmd_to_shell(p, "set live_summary=True") - self._send_cmd_to_shell(p, 'create table test_live_progress_option(col int);') + p = ImpalaShell() + p.send_cmd("set live_progress=True") + p.send_cmd("set live_summary=True") + p.send_cmd('create table test_live_progress_option(col int);') try: - self._send_cmd_to_shell(p, 'compute stats test_live_progress_option;') + p.send_cmd('compute stats test_live_progress_option;') finally: - self._send_cmd_to_shell(p, 'drop table if exists test_live_progress_option;') - result = get_shell_cmd_result(p) + p.send_cmd('drop table if exists test_live_progress_option;') + result = p.get_result() assert "Updated 1 partition(s) and 1 column(s)" in result.stdout @pytest.mark.execute_serially @@ -115,11 +101,11 @@ class TestImpalaShellInteractive(object): impalad = ImpaladService(socket.getfqdn()) impalad.wait_for_num_in_flight_queries(0) command = "select sleep(10000);" - p = self._start_new_shell_process() - self._send_cmd_to_shell(p, command) + p = ImpalaShell() + p.send_cmd(command) sleep(3) - os.kill(p.pid, signal.SIGINT) - result = get_shell_cmd_result(p) + os.kill(p.pid(), signal.SIGINT) + result = p.get_result() assert "Cancelled" not in result.stderr assert impalad.wait_for_num_in_flight_queries(0) @@ -170,12 +156,12 @@ class TestImpalaShellInteractive(object): num_sessions_initial = get_num_open_sessions(initial_impala_service) num_sessions_target = get_num_open_sessions(target_impala_service) # Connect to localhost:21000 (default) - p = self._start_new_shell_process() + p = ImpalaShell() sleep(2) # Make sure we're connected <hostname>:21000 assert get_num_open_sessions(initial_impala_service) == num_sessions_initial + 1, \ "Not connected to %s:21000" % hostname - self._send_cmd_to_shell(p, "connect %s:21001" % hostname) + p.send_cmd("connect %s:21001" % hostname) # Wait for a little while sleep(2) # The number of sessions on the target impalad should have been incremented. @@ -201,18 +187,18 @@ class TestImpalaShellInteractive(object): NUM_QUERIES = 'impala-server.num-queries' impalad = ImpaladService(socket.getfqdn()) - p = self._start_new_shell_process() + p = ImpalaShell() try: start_num_queries = impalad.get_metric_value(NUM_QUERIES) - self._send_cmd_to_shell(p, 'create database if not exists %s' % TMP_DB) - self._send_cmd_to_shell(p, 'use %s' % TMP_DB) + p.send_cmd('create database if not exists %s' % TMP_DB) + p.send_cmd('use %s' % TMP_DB) impalad.wait_for_metric_value(NUM_QUERIES, start_num_queries + 2) assert impalad.wait_for_num_in_flight_queries(0), MSG % 'use' - self._send_cmd_to_shell(p, 'create table %s(i int)' % TMP_TBL) - self._send_cmd_to_shell(p, 'alter table %s add columns (j int)' % TMP_TBL) + p.send_cmd('create table %s(i int)' % TMP_TBL) + p.send_cmd('alter table %s add columns (j int)' % TMP_TBL) impalad.wait_for_metric_value(NUM_QUERIES, start_num_queries + 4) assert impalad.wait_for_num_in_flight_queries(0), MSG % 'alter' - self._send_cmd_to_shell(p, 'drop table %s' % TMP_TBL) + p.send_cmd('drop table %s' % TMP_TBL) impalad.wait_for_metric_value(NUM_QUERIES, start_num_queries + 5) assert impalad.wait_for_num_in_flight_queries(0), MSG % 'drop' finally: @@ -238,9 +224,9 @@ class TestImpalaShellInteractive(object): child_proc.sendline(query) child_proc.expect(prompt_regex) child_proc.sendline('quit;') - p = self._start_new_shell_process() - self._send_cmd_to_shell(p, 'history') - result = get_shell_cmd_result(p) + p = ImpalaShell() + p.send_cmd('history') + result = p.get_result() for query in queries: assert query in result.stderr, "'%s' not in '%s'" % (query, result.stderr) @@ -260,9 +246,8 @@ class TestImpalaShellInteractive(object): @pytest.mark.execute_serially def test_var_substitution(self): - cmds = open(os.path.join(QUERY_FILE_PATH, 'test_var_substitution.sql')).read().\ - split('\n') - args = '--var=foo=123 --var=BAR=456 --delimited --output_delimiter=" "' + cmds = open(os.path.join(QUERY_FILE_PATH, 'test_var_substitution.sql')).read() + args = '''--var=foo=123 --var=BAR=456 --delimited "--output_delimiter= " ''' result = run_impala_shell_interactive(cmds, shell_args=args) assert_var_substitution(result) @@ -302,9 +287,8 @@ class TestImpalaShellInteractive(object): result = run_impala_shell_interactive("source %s;" % full_path) assert "No such file or directory" in result.stderr -def run_impala_shell_interactive(input_lines, shell_args=''): +def run_impala_shell_interactive(input_lines, shell_args=None): """Runs a command in the Impala shell interactively.""" - cmd = "%s %s" % (SHELL_CMD, shell_args) # if argument "input_lines" is a string, makes it into a list if type(input_lines) is str: input_lines = [input_lines] @@ -312,9 +296,7 @@ def run_impala_shell_interactive(input_lines, shell_args=''): # since piping defaults to ascii my_env = os.environ my_env['PYTHONIOENCODING'] = 'utf-8' - p = Popen(cmd, shell=True, stdout=PIPE, - stdin=PIPE, stderr=PIPE, env=my_env) + p = ImpalaShell(shell_args, env=my_env) for line in input_lines: - p.stdin.write(line + "\n") - p.stdin.flush() - return get_shell_cmd_result(p) + p.send_cmd(line) + return p.get_result() http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0dde1c2f/tests/shell/util.py ---------------------------------------------------------------------- diff --git a/tests/shell/util.py b/tests/shell/util.py new file mode 100755 index 0000000..36cb729 --- /dev/null +++ b/tests/shell/util.py @@ -0,0 +1,139 @@ +#!/usr/bin/env impala-python +# encoding=utf-8 +# Copyright 2016 Cloudera Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import pytest +import re +import shlex +from subprocess import Popen, PIPE + +IMPALAD_HOST_PORT_LIST = pytest.config.option.impalad.split(',') +assert len(IMPALAD_HOST_PORT_LIST) > 0, 'Must specify at least 1 impalad to target' +IMPALAD = IMPALAD_HOST_PORT_LIST[0] +SHELL_CMD = "%s/bin/impala-shell.sh -i %s" % (os.environ['IMPALA_HOME'], IMPALAD) + +def assert_var_substitution(result): + assert_pattern(r'\bfoo_number=.*$', 'foo_number= 123123', result.stdout, \ + 'Numeric values not replaced correctly') + assert_pattern(r'\bfoo_string=.*$', 'foo_string=123', result.stdout, \ + 'String values not replaced correctly') + assert_pattern(r'\bVariables:[\s\n]*BAR:\s*[0-9]*\n\s*FOO:\s*[0-9]*', \ + 'Variables:\n\tBAR: 456\n\tFOO: 123', result.stdout, \ + "Set variable not listed correctly by the first SET command") + assert_pattern(r'\bError: Unknown variable FOO1$', \ + 'Error: Unknown variable FOO1', result.stderr, \ + 'Missing variable FOO1 not reported correctly') + assert_pattern(r'\bmulti_test=.*$', 'multi_test=456_123_456_123', \ + result.stdout, 'Multiple replaces not working correctly') + assert_pattern(r'\bError:\s*Unknown\s*substitution\s*syntax\s*' + + r'\(RANDOM_NAME\). Use \${VAR:var_name}', \ + 'Error: Unknown substitution syntax (RANDOM_NAME). Use ${VAR:var_name}', \ + result.stderr, "Invalid variable reference") + assert_pattern(r'"This should be not replaced: \${VAR:foo} \${HIVEVAR:bar}"', + '"This should be not replaced: ${VAR:foo} ${HIVEVAR:bar}"', \ + result.stdout, "Variable escaping not working") + assert_pattern(r'\bVariable MYVAR set to.*$', 'Variable MYVAR set to foo123', + result.stderr, 'No evidence of MYVAR variable being set.') + assert_pattern(r'\bVariables:[\s\n]*BAR:.*[\s\n]*FOO:.*[\s\n]*MYVAR:.*$', + 'Variables:\n\tBAR: 456\n\tFOO: 123\n\tMYVAR: foo123', result.stdout, + 'Set variables not listed correctly by the second SET command') + assert_pattern(r'\bUnsetting variable FOO$', 'Unsetting variable FOO', + result.stdout, 'No evidence of variable FOO being unset') + assert_pattern(r'\bUnsetting variable BAR$', 'Unsetting variable BAR', + result.stdout, 'No evidence of variable BAR being unset') + assert_pattern(r'\bVariables:[\s\n]*No variables defined\.$', \ + 'Variables:\n\tNo variables defined.', result.stdout, \ + 'Unset variables incorrectly listed by third SET command.') + assert_pattern(r'\bNo variable called NONEXISTENT is set', \ + 'No variable called NONEXISTENT is set', result.stdout, \ + 'Problem unsetting non-existent variable.') + assert_pattern(r'\bVariable COMMENT_TYPE1 set to.*$', + 'Variable COMMENT_TYPE1 set to ok', result.stderr, + 'No evidence of COMMENT_TYPE1 variable being set.') + assert_pattern(r'\bVariable COMMENT_TYPE2 set to.*$', + 'Variable COMMENT_TYPE2 set to ok', result.stderr, + 'No evidence of COMMENT_TYPE2 variable being set.') + assert_pattern(r'\bVariable COMMENT_TYPE3 set to.*$', + 'Variable COMMENT_TYPE3 set to ok', result.stderr, + 'No evidence of COMMENT_TYPE3 variable being set.') + assert_pattern(r'\bVariables:[\s\n]*COMMENT_TYPE1:.*[\s\n]*' + \ + 'COMMENT_TYPE2:.*[\s\n]*COMMENT_TYPE3:.*$', + 'Variables:\n\tCOMMENT_TYPE1: ok\n\tCOMMENT_TYPE2: ok\n\tCOMMENT_TYPE3: ok', \ + result.stdout, 'Set variables not listed correctly by the SET command') + +def assert_pattern(pattern, result, text, message): + """Asserts that the pattern, when applied to text, returns the expected result""" + m = re.search(pattern, text, re.MULTILINE) + assert m and m.group(0) == result, message + +def run_impala_shell_cmd(shell_args, expect_success=True, stdin_input=None): + """Runs the Impala shell on the commandline. + + 'shell_args' is a string which represents the commandline options. + Returns a ImpalaShellResult. + """ + p = ImpalaShell(shell_args) + result = p.get_result(stdin_input) + cmd = "%s %s" % (SHELL_CMD, shell_args) + if expect_success: + assert result.rc == 0, "Cmd %s was expected to succeed: %s" % (cmd, result.stderr) + else: + assert result.rc != 0, "Cmd %s was expected to fail" % cmd + return result + +class ImpalaShellResult(object): + def __init__(self): + self.rc = 0 + self.stdout = str() + self.stderr = str() + +class ImpalaShell(object): + """A single instance of the Impala shell. The proces is started when this object is + constructed, and then users should repeatedly call send_cmd(), followed eventually by + get_result() to retrieve the process output.""" + def __init__(self, args=None, env=None): + self.shell_process = self._start_new_shell_process(args, env=env) + + def pid(self): + return self.shell_process.pid + + def send_cmd(self, cmd): + """Send a single command to the shell. This method adds the end-of-query + terminator (';'). """ + self.shell_process.stdin.write("%s;\n" % cmd) + self.shell_process.stdin.flush() + # Allow fluent-style chaining of commands + return self + + def get_result(self, stdin_input=None): + """Returns an ImpalaShellResult produced by the shell process on exit. After this + method returns, send_cmd() no longer has any effect.""" + result = ImpalaShellResult() + result.stdout, result.stderr = self.shell_process.communicate(input=stdin_input) + # We need to close STDIN if we gave it an input, in order to send an EOF that will + # allow the subprocess to exit. + if stdin_input is not None: self.shell_process.stdin.close() + result.rc = self.shell_process.returncode + return result + + def _start_new_shell_process(self, args=None, env=None): + """Starts a shell process and returns the process handle""" + shell_args = SHELL_CMD + if args is not None: shell_args = "%s %s" % (SHELL_CMD, args) + lex = shlex.split(shell_args) + if not env: env = os.environ + return Popen(lex, shell=False, stdout=PIPE, stdin=PIPE, stderr=PIPE, + env=env)
