IMPALA-7555: Set socket timeout in impala-shell impala-shell does not set any socket timeout while connecting to the impala server. This change sets a timeout on the socket before connecting and unsets it back after successfully connecting. The default timeout on this socket is 5 sec. Usage: impala-shell --client_connect_timeout=<value in ms>
Testing: 1. Added a test where I create a random listening socket. impala-shell (with ssl enabled) connects to this socket and times out after 2 sec. 2. Created a kerberized impala cluster with ssl enabled and connected to the impalad using an openssl client (block the beeswax server thread to accept new connection) - E.g. - openssl s_client -connect <IP Addr>:21000 Used impala-shell to connect to the same impalad later. impala-shell timed out after the default of 5 sec.I verified it manually. Change-Id: I130fc47f7a83f591918d6842634b4e5787d00813 Reviewed-on: http://gerrit.cloudera.org:8080/11540 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/2fb8ebae Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/2fb8ebae Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/2fb8ebae Branch: refs/heads/master Commit: 2fb8ebaef2beecd511e963fadbb41cbb11add138 Parents: 5af5456 Author: aphadke <[email protected]> Authored: Thu Sep 20 16:51:23 2018 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Thu Oct 18 01:41:42 2018 +0000 ---------------------------------------------------------------------- shell/impala_client.py | 29 +++++++++++++++++++++++------ shell/impala_shell.py | 17 +++++++++-------- shell/impala_shell_config_defaults.py | 1 + shell/option_parser.py | 5 ++++- tests/shell/test_shell_commandline.py | 15 +++++++++++++++ 5 files changed, 52 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/2fb8ebae/shell/impala_client.py ---------------------------------------------------------------------- diff --git a/shell/impala_client.py b/shell/impala_client.py index 2f2a5e9..e53637a 100755 --- a/shell/impala_client.py +++ b/shell/impala_client.py @@ -18,6 +18,7 @@ # under the License. import sasl +import sys import time from beeswaxd import BeeswaxService @@ -57,11 +58,16 @@ class DisconnectedException(Exception): class QueryCancelledByShellException(Exception): pass + +def print_to_stderr(message): + print >> sys.stderr, message + class ImpalaClient(object): def __init__(self, impalad, kerberos_host_fqdn, use_kerberos=False, kerberos_service_name="impala", use_ssl=False, ca_cert=None, user=None, - ldap_password=None, use_ldap=False): + ldap_password=None, use_ldap=False, client_connect_timeout_ms=5000, + verbose=True): self.connected = False self.impalad_host = impalad[0].encode('ascii', 'ignore') self.impalad_port = int(impalad[1]) @@ -74,6 +80,7 @@ class ImpalaClient(object): self.ca_cert = ca_cert self.user, self.ldap_password = user, ldap_password self.use_ldap = use_ldap + self.client_connect_timeout_ms = int(client_connect_timeout_ms) self.default_query_options = {} self.query_option_levels = {} self.query_state = QueryState._NAMES_TO_VALUES @@ -82,6 +89,7 @@ class ImpalaClient(object): # from command line via CTRL+C. It is used to suppress error messages of # query cancellation. self.is_query_cancelled = False + self.verbose = verbose def _options_to_string_list(self, set_query_options): return ["%s=%s" % (k, v) for (k, v) in set_query_options.iteritems()] @@ -250,8 +258,15 @@ class ImpalaClient(object): self.transport = None self.connected = False - self.transport = self._get_transport() + sock, self.transport = self._get_socket_and_transport() + if self.client_connect_timeout_ms > 0: + sock.setTimeout(self.client_connect_timeout_ms) self.transport.open() + if self.verbose: + print_to_stderr('Opened TCP connection to %s:%s' % (self.impalad_host, + self.impalad_port)) + # Setting a timeout of None disables timeouts on sockets + sock.setTimeout(None) protocol = TBinaryProtocol.TBinaryProtocol(self.transport) self.imp_service = ImpalaService.Client(protocol) result = self.ping_impala_service() @@ -266,7 +281,7 @@ class ImpalaClient(object): if self.transport: self.transport.close() - def _get_transport(self): + def _get_socket_and_transport(self): """Create a Transport. A non-kerberized impalad just needs a simple buffered transport. For @@ -274,6 +289,7 @@ class ImpalaClient(object): If SSL is enabled, a TSSLSocket underlies the transport stack; otherwise a TSocket is used. + This function returns the socket and the transport object. """ if self.use_ssl: # TSSLSocket needs the ssl module, which may not be standard on all Operating @@ -304,7 +320,8 @@ class ImpalaClient(object): else: sock = TSocket(sock_host, sock_port) if not (self.use_ldap or self.use_kerberos): - return TBufferedTransport(sock) + return sock, TBufferedTransport(sock) + # Initializes a sasl client def sasl_factory(): sasl_client = sasl.Client() @@ -318,9 +335,9 @@ class ImpalaClient(object): return sasl_client # GSSASPI is the underlying mechanism used by kerberos to authenticate. if self.use_kerberos: - return TSaslClientTransport(sasl_factory, "GSSAPI", sock) + return sock, TSaslClientTransport(sasl_factory, "GSSAPI", sock) else: - return TSaslClientTransport(sasl_factory, "PLAIN", sock) + return sock, TSaslClientTransport(sasl_factory, "PLAIN", sock) def create_beeswax_query(self, query_str, set_query_options): """Create a beeswax query object from a query string""" http://git-wip-us.apache.org/repos/asf/impala/blob/2fb8ebae/shell/impala_shell.py ---------------------------------------------------------------------- diff --git a/shell/impala_shell.py b/shell/impala_shell.py index a7b1ac8..9277071 100755 --- a/shell/impala_shell.py +++ b/shell/impala_shell.py @@ -168,7 +168,7 @@ class ImpalaShell(object, cmd.Cmd): self.ldap_password = options.ldap_password self.ldap_password_cmd = options.ldap_password_cmd self.use_ldap = options.use_ldap - + self.client_connect_timeout_ms = options.client_connect_timeout_ms self.verbose = options.verbose self.prompt = ImpalaShell.DISCONNECTED_PROMPT self.server_version = ImpalaShell.UNKNOWN_SERVER_VERSION @@ -518,7 +518,7 @@ class ImpalaShell(object, cmd.Cmd): return ImpalaClient(self.impalad, self.kerberos_host_fqdn, self.use_kerberos, self.kerberos_service_name, self.use_ssl, self.ca_cert, self.user, self.ldap_password, - self.use_ldap) + self.use_ldap, self.client_connect_timeout_ms, self.verbose) def _signal_handler(self, signal, frame): """Handles query cancellation on a Ctrl+C event""" @@ -818,12 +818,13 @@ class ImpalaShell(object, cmd.Cmd): print_to_stderr("Unable to import the python 'ssl' module. It is" " required for an SSL-secured connection.") sys.exit(1) - except socket.error, (code, e): + except socket.error, e: # if the socket was interrupted, reconnect the connection with the client - if code == errno.EINTR: + if e.errno == errno.EINTR: self._reconnect_cancellation() else: - print_to_stderr("Socket error %s: %s" % (code, e)) + print_to_stderr("Socket error %s: %s" % (e.errno, e)) + self.imp_client.close_connection() self.prompt = self.DISCONNECTED_PROMPT except Exception, e: if self.ldap_password_cmd and \ @@ -1507,7 +1508,7 @@ def parse_variables(keyvals): def replace_variables(set_variables, string): """Replaces variable within the string with their corresponding values using the - given set_variables.""" + given set_variables.""" errors = False matches = set(map(lambda v: v.upper(), re.findall(r'(?<!\\)\${([^}]+)}', string))) for name in matches: @@ -1537,8 +1538,8 @@ def replace_variables(set_variables, string): def get_var_name(name): - """Look for a namespace:var_name pattern in an option name. - Return the variable name if it's a match or None otherwise. + """Looks for a namespace:var_name pattern in an option name. + Returns the variable name if it's a match or None otherwise. """ ns_match = re.match(r'^([^:]*):(.*)', name) if ns_match is not None: http://git-wip-us.apache.org/repos/asf/impala/blob/2fb8ebae/shell/impala_shell_config_defaults.py ---------------------------------------------------------------------- diff --git a/shell/impala_shell_config_defaults.py b/shell/impala_shell_config_defaults.py index 260e93e..be7685b 100644 --- a/shell/impala_shell_config_defaults.py +++ b/shell/impala_shell_config_defaults.py @@ -51,4 +51,5 @@ impala_shell_defaults = { 'verbose': True, 'version': False, 'write_delimited': False, + 'client_connect_timeout_ms': 5000, } http://git-wip-us.apache.org/repos/asf/impala/blob/2fb8ebae/shell/option_parser.py ---------------------------------------------------------------------- diff --git a/shell/option_parser.py b/shell/option_parser.py index 000e319..ccae53b 100755 --- a/shell/option_parser.py +++ b/shell/option_parser.py @@ -221,7 +221,10 @@ def get_option_parser(defaults): " It must follow the pattern \"KEY=VALUE\"," " KEY must be a valid query option. Valid query options " " can be listed by command 'set'.") - + parser.add_option("-t", "--client_connect_timeout_ms", + help="Timeout in milliseconds after which impala-shell will time out" + " if it fails to connect to Impala server. Set to 0 to disable any" + " timeout.") # add default values to the help text for option in parser.option_list: # since the quiet flag is the same as the verbose flag http://git-wip-us.apache.org/repos/asf/impala/blob/2fb8ebae/tests/shell/test_shell_commandline.py ---------------------------------------------------------------------- diff --git a/tests/shell/test_shell_commandline.py b/tests/shell/test_shell_commandline.py index b74fbc2..fd18230 100644 --- a/tests/shell/test_shell_commandline.py +++ b/tests/shell/test_shell_commandline.py @@ -32,6 +32,7 @@ from tests.common.skip import SkipIf from time import sleep, time from util import IMPALAD, SHELL_CMD from util import assert_var_substitution, run_impala_shell_cmd, ImpalaShell +from contextlib import closing DEFAULT_QUERY = 'select 1' QUERY_FILE_PATH = os.path.join(os.environ['IMPALA_HOME'], 'tests', 'shell') @@ -757,3 +758,17 @@ class TestImpalaShell(ImpalaTestSuite): find_query_option("duplicate", test_input) with pytest.raises(AssertionError): find_query_option("not_an_option", test_input) + + def test_impala_shell_timeout(self): + """Tests that impala shell times out during connect. + This creates a random listening socket and we try to connect to this + socket through the impala-shell. The impala-shell should timeout and not hang + indefinitely while connecting + """ + with closing(socket.socket()) as s: + s.bind(("", 0)) + # maximum number of queued connections on this socket is 1. + s.listen(1) + test_port = s.getsockname()[1] + args = '-q "select foo; select bar;" --ssl -t 2000 -i localhost:%d' % (test_port) + run_impala_shell_cmd(args, expect_success=False)
