This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new ea13e74 IMPALA-10309: Use sleep time from a Retry-After header in
Impala Shell
ea13e74 is described below
commit ea13e744977c71206c11a54f65d77aeecb55703d
Author: Andrew Sherman <[email protected]>
AuthorDate: Mon Nov 2 12:17:02 2020 -0800
IMPALA-10309: Use sleep time from a Retry-After header in Impala Shell
When Impala Shell receives an http error message (that is a message with
http code greater than or equal to 300), it may sleep for a time before
retrying. If the message contains a 'Retry-After' header that has an
integer value, then this will be used as the time for which to sleep.
The implementation is to use a new HttpError exception (similar to that
used in Impyla) which includes more information from the error message
(including the headers) so that catchers of the exception can use the
'Retry-After' header if appropriate.
TESTING:
Hand testing with a proxy that uses the 'Retry-After' header.
Added new tests that use the fault injection framework in
test_hs2_fault_injection.py
Change-Id: I2b4226e7723d585d61deb4d1d6777aac901bfd93
Reviewed-on: http://gerrit.cloudera.org:8080/16702
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
shell/ImpalaHttpClient.py | 7 +-
shell/impala_client.py | 28 +++++-
shell/shell_exceptions.py | 15 ++++
tests/custom_cluster/test_hs2_fault_injection.py | 105 +++++++++++++++++++++--
4 files changed, 143 insertions(+), 12 deletions(-)
diff --git a/shell/ImpalaHttpClient.py b/shell/ImpalaHttpClient.py
index 1820a1b..14dc767 100644
--- a/shell/ImpalaHttpClient.py
+++ b/shell/ImpalaHttpClient.py
@@ -28,7 +28,7 @@ from six.moves import urllib
from six.moves import http_client
from thrift.transport.TTransport import TTransportBase
-from shell_exceptions import RPCException
+from shell_exceptions import HttpError
import six
@@ -213,7 +213,4 @@ class ImpalaHttpClient(TTransportBase):
# Report any http response code that is not 1XX (informational response)
or
# 2XX (successful).
body = self.readBody()
- if not body:
- raise RPCException("HTTP code {}: {}".format(self.code, self.message))
- else:
- raise RPCException("HTTP code {}: {} [{}]".format(self.code,
self.message, body))
+ raise HttpError(self.code, self.message, body, self.headers)
diff --git a/shell/impala_client.py b/shell/impala_client.py
index ec60a1b..591011a 100755
--- a/shell/impala_client.py
+++ b/shell/impala_client.py
@@ -48,7 +48,7 @@ from thrift.transport.TSocket import TSocket
from thrift.transport.TTransport import TBufferedTransport, TTransportException
from thrift.Thrift import TApplicationException, TException
from shell_exceptions import (RPCException, QueryStateException,
DisconnectedException,
- QueryCancelledByShellException, MissingThriftMethodException)
+ QueryCancelledByShellException, MissingThriftMethodException, HttpError)
# Helpers to extract and convert HS2's representation of values to the display
version.
@@ -951,8 +951,12 @@ class ImpalaHS2Client(ImpalaClient):
while num_tries <= max_tries:
raise_error = (num_tries == max_tries)
# Generate a retry message, only if retries and supported.
+ will_retry = False
+ retry_secs = None
if retry_on_error and self.max_tries > 1:
retry_msg = 'Num remaining tries: {0}'.format(max_tries - num_tries)
+ if num_tries < max_tries:
+ will_retry = True
else:
retry_msg = ''
try:
@@ -971,12 +975,32 @@ class ImpalaHS2Client(ImpalaClient):
raise MissingThriftMethodException(t.message)
raise RPCException("Application Exception : {0}".format(t),
RPC_EXCEPTION_TAPPLICATION)
+ except HttpError as h:
+ if will_retry:
+ retry_after = h.http_headers.get('Retry-After', None)
+ if retry_after:
+ try:
+ retry_secs = int(retry_after)
+ except ValueError:
+ retry_secs = None
+ if retry_secs:
+ print('Caught exception {0}, type={1} in {2}. {3}, retry after {4}
secs'
+ .format(str(h), type(h), rpc.__name__, retry_msg, retry_secs),
+ file=sys.stderr)
+ else:
+ print('Caught exception {0}, type={1} in {2}. {3}'
+ .format(str(h), type(h), rpc.__name__, retry_msg),
file=sys.stderr)
+ if raise_error:
+ raise
except Exception as e:
print('Caught exception {0}, type={1} in {2}. {3}'
.format(str(e), type(e), rpc.__name__, retry_msg), file=sys.stderr)
if raise_error:
raise
- time.sleep(self._get_sleep_interval_for_retries(num_tries))
+ if retry_secs:
+ time.sleep(retry_secs)
+ else:
+ time.sleep(self._get_sleep_interval_for_retries(num_tries))
num_tries += 1
def _check_hs2_rpc_status(self, status):
diff --git a/shell/shell_exceptions.py b/shell/shell_exceptions.py
index 800004b..538ca1e 100644
--- a/shell/shell_exceptions.py
+++ b/shell/shell_exceptions.py
@@ -51,3 +51,18 @@ class MissingThriftMethodException(Exception):
def __str__(self):
return self.value
+
+
+class HttpError(Exception):
+ """An error containing an http response code and a possible message body"""
+ def __init__(self, code, message, body, http_headers):
+ self.code = code
+ self.message = message
+ self.body = body
+ self.http_headers = http_headers
+
+ def __str__(self):
+ if not self.body:
+ return "HTTP code {}: {}".format(self.code, self.message)
+ else:
+ return "HTTP code {}: {} [{}]".format(self.code, self.message, self.body)
diff --git a/tests/custom_cluster/test_hs2_fault_injection.py
b/tests/custom_cluster/test_hs2_fault_injection.py
index f450287..47107ba 100644
--- a/tests/custom_cluster/test_hs2_fault_injection.py
+++ b/tests/custom_cluster/test_hs2_fault_injection.py
@@ -22,6 +22,7 @@ import requests
from shell.ImpalaHttpClient import ImpalaHttpClient
from shell.impala_client import ImpalaHS2Client
+from shell.shell_exceptions import HttpError
from tests.common.impala_test_suite import IMPALAD_HS2_HTTP_HOST_PORT
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from time import sleep
@@ -40,16 +41,21 @@ class FaultInjectingHttpClient(ImpalaHttpClient, object):
self.fault_frequency = 0
self.fault_enabled = False
- def enable_fault(self, http_code, http_message, fault_frequency):
+ def enable_fault(self, http_code, http_message, fault_frequency,
fault_body=None,
+ fault_headers=None):
"""Inject fault with given code and message at the given frequency.
As an example, if frequency is 20% then inject fault for 1 out of every 5
requests."""
+ if fault_headers is None:
+ fault_headers = {}
self.fault_enabled = True
self.fault_code = http_code
self.fault_message = http_message
self.fault_frequency = fault_frequency
assert fault_frequency > 0 and fault_frequency <= 1
self.num_requests = 0
+ self.fault_body = fault_body
+ self.fault_headers = fault_headers
def disable_fault(self):
self.fault_enabled = False
@@ -58,7 +64,7 @@ class FaultInjectingHttpClient(ImpalaHttpClient, object):
if self.code >= 300:
# Report any http response code that is not 1XX (informational response)
or
# 2XX (successful).
- raise Exception("HTTP code {}: {}".format(self.code, self.message))
+ raise HttpError(self.code, self.message, self.body, self.headers)
def _inject_fault(self):
if not self.fault_enabled:
@@ -76,6 +82,8 @@ class FaultInjectingHttpClient(ImpalaHttpClient, object):
if self.fault_code is not None and self._inject_fault():
self.code = self.fault_code
self.message = self.fault_message
+ self.body = self.fault_body
+ self.headers = self.fault_headers
self._check_code()
@@ -125,19 +133,40 @@ class TestHS2FaultInjection(CustomClusterTestSuite):
def __expect_msg_retry(self, impala_rpc_name):
"""Returns expected log message for rpcs which can be retried"""
return ("Caught exception HTTP code 502: Injected Fault, "
- "type=<type 'exceptions.Exception'> in {0}. "
+ "type=<class 'shell.shell_exceptions.HttpError'> in {0}. "
"Num remaining tries: 3".format(impala_rpc_name))
+ def __expect_msg_retry_with_extra(self, impala_rpc_name):
+ """Returns expected log message for rpcs which can be retried and where
the http
+ message has a message body"""
+ return ("Caught exception HTTP code 503: Injected Fault [EXTRA], "
+ "type=<class 'shell.shell_exceptions.HttpError'> in {0}. "
+ "Num remaining tries: 3".format(impala_rpc_name))
+
+ def __expect_msg_retry_with_retry_after(self, impala_rpc_name):
+ """Returns expected log message for rpcs which can be retried and the http
+ message has a body and a Retry-After header that can be correctly
decoded"""
+ return ("Caught exception HTTP code 503: Injected Fault [EXTRA], "
+ "type=<class 'shell.shell_exceptions.HttpError'> in {0}. "
+ "Num remaining tries: 3, retry after 1 secs".format(impala_rpc_name))
+
+ def __expect_msg_retry_with_retry_after_no_extra(self, impala_rpc_name):
+ """Returns expected log message for rpcs which can be retried and the http
+ message has a Retry-After header that can be correctly decoded"""
+ return ("Caught exception HTTP code 503: Injected Fault, "
+ "type=<class 'shell.shell_exceptions.HttpError'> in {0}. "
+ "Num remaining tries: 3, retry after 1 secs".format(impala_rpc_name))
+
def __expect_msg_no_retry(self, impala_rpc_name):
"""Returns expected log message for rpcs which can not be retried"""
return ("Caught exception HTTP code 502: Injected Fault, "
- "type=<type 'exceptions.Exception'> in {0}. ".format(impala_rpc_name))
+ "type=<class 'shell.shell_exceptions.HttpError'> in {0}.
".format(impala_rpc_name))
@pytest.mark.execute_serially
def test_connect(self, capsys):
"""Tests fault injection in ImpalaHS2Client's connect().
OpenSession and CloseImpalaOperation rpcs fail.
- Retries results in a successfull connection."""
+ Retries results in a successful connection."""
self.transport.enable_fault(502, "Injected Fault", 0.20)
self.connect()
output = capsys.readouterr()[1].splitlines()
@@ -145,6 +174,72 @@ class TestHS2FaultInjection(CustomClusterTestSuite):
assert output[2] == self.__expect_msg_retry("CloseImpalaOperation")
@pytest.mark.execute_serially
+ def test_connect_proxy(self, capsys):
+ """Tests fault injection in ImpalaHS2Client's connect().
+ The injected error has a message body.
+ OpenSession and CloseImpalaOperation rpcs fail.
+ Retries results in a successful connection."""
+ self.transport.enable_fault(503, "Injected Fault", 0.20, 'EXTRA')
+ self.connect()
+ output = capsys.readouterr()[1].splitlines()
+ assert output[1] == self.__expect_msg_retry_with_extra("OpenSession")
+ assert output[2] ==
self.__expect_msg_retry_with_extra("CloseImpalaOperation")
+
+ @pytest.mark.execute_serially
+ def test_connect_proxy_no_retry(self, capsys):
+ """Tests fault injection in ImpalaHS2Client's connect().
+ The injected error contains headers but no Retry-After header.
+ OpenSession and CloseImpalaOperation rpcs fail.
+ Retries results in a successful connection."""
+ self.transport.enable_fault(503, "Injected Fault", 0.20, 'EXTRA',
+ {"header1": "value1"})
+ self.connect()
+ output = capsys.readouterr()[1].splitlines()
+ assert output[1] == self.__expect_msg_retry_with_extra("OpenSession")
+ assert output[2] ==
self.__expect_msg_retry_with_extra("CloseImpalaOperation")
+
+ @pytest.mark.execute_serially
+ def test_connect_proxy_bad_retry(self, capsys):
+ """Tests fault injection in ImpalaHS2Client's connect().
+ The injected error contains a body and a junk Retry-After header.
+ OpenSession and CloseImpalaOperation rpcs fail.
+ Retries results in a successful connection."""
+ self.transport.enable_fault(503, "Injected Fault", 0.20, 'EXTRA',
+ {"header1": "value1",
+ "Retry-After": "junk"})
+ self.connect()
+ output = capsys.readouterr()[1].splitlines()
+ assert output[1] == self.__expect_msg_retry_with_extra("OpenSession")
+ assert output[2] ==
self.__expect_msg_retry_with_extra("CloseImpalaOperation")
+
+ @pytest.mark.execute_serially
+ def test_connect_proxy_retry(self, capsys):
+ """Tests fault injection in ImpalaHS2Client's connect().
+ The injected error contains a body and a Retry-After header that can be
decoded.
+ Retries results in a successful connection."""
+ self.transport.enable_fault(503, "Injected Fault", 0.20, 'EXTRA',
+ {"header1": "value1",
+ "Retry-After": "1"})
+ self.connect()
+ output = capsys.readouterr()[1].splitlines()
+ assert output[1] == self.__expect_msg_retry_with_retry_after("OpenSession")
+ assert output[2] ==
self.__expect_msg_retry_with_retry_after("CloseImpalaOperation")
+
+ @pytest.mark.execute_serially
+ def test_connect_proxy_retry_no_body(self, capsys):
+ """Tests fault injection in ImpalaHS2Client's connect().
+ The injected error has no body but does have a Retry-After header that can
be decoded.
+ Retries results in a successful connection."""
+ self.transport.enable_fault(503, "Injected Fault", 0.20, None,
+ {"header1": "value1",
+ "Retry-After": "1"})
+ self.connect()
+ output = capsys.readouterr()[1].splitlines()
+ assert output[1] ==
self.__expect_msg_retry_with_retry_after_no_extra("OpenSession")
+ assert output[2] == self.\
+ __expect_msg_retry_with_retry_after_no_extra("CloseImpalaOperation")
+
+ @pytest.mark.execute_serially
def test_close_connection(self, capsys):
"""Tests fault injection in ImpalaHS2Client's close_connection().
CloseSession rpc fails due to the fault, but succeeds anyways since
exceptions