This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new ea13e74  IMPALA-10309: Use sleep time from a Retry-After header in 
Impala Shell
ea13e74 is described below

commit ea13e744977c71206c11a54f65d77aeecb55703d
Author: Andrew Sherman <[email protected]>
AuthorDate: Mon Nov 2 12:17:02 2020 -0800

    IMPALA-10309: Use sleep time from a Retry-After header in Impala Shell
    
    When Impala Shell receives an http error message (that is a message with
    http code greater than or equal to 300), it may sleep for a time before
    retrying. If the message contains a 'Retry-After' header that has an
    integer value, then this will be used as the time for which to sleep.
    
    The implementation is to use a new HttpError exception (similar to that
    used in Impyla) which includes more information from the error message
    (including the headers) so that catchers of the exception can use the
    'Retry-After' header if appropriate.
    
    TESTING:
        Hand testing with a proxy that uses the 'Retry-After' header.
        Added new tests that use the fault injection framework in
        test_hs2_fault_injection.py
    
    Change-Id: I2b4226e7723d585d61deb4d1d6777aac901bfd93
    Reviewed-on: http://gerrit.cloudera.org:8080/16702
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 shell/ImpalaHttpClient.py                        |   7 +-
 shell/impala_client.py                           |  28 +++++-
 shell/shell_exceptions.py                        |  15 ++++
 tests/custom_cluster/test_hs2_fault_injection.py | 105 +++++++++++++++++++++--
 4 files changed, 143 insertions(+), 12 deletions(-)

diff --git a/shell/ImpalaHttpClient.py b/shell/ImpalaHttpClient.py
index 1820a1b..14dc767 100644
--- a/shell/ImpalaHttpClient.py
+++ b/shell/ImpalaHttpClient.py
@@ -28,7 +28,7 @@ from six.moves import urllib
 from six.moves import http_client
 
 from thrift.transport.TTransport import TTransportBase
-from shell_exceptions import RPCException
+from shell_exceptions import HttpError
 import six
 
 
@@ -213,7 +213,4 @@ class ImpalaHttpClient(TTransportBase):
       # Report any http response code that is not 1XX (informational response) 
or
       # 2XX (successful).
       body = self.readBody()
-      if not body:
-        raise RPCException("HTTP code {}: {}".format(self.code, self.message))
-      else:
-        raise RPCException("HTTP code {}: {} [{}]".format(self.code, 
self.message, body))
+      raise HttpError(self.code, self.message, body, self.headers)
diff --git a/shell/impala_client.py b/shell/impala_client.py
index ec60a1b..591011a 100755
--- a/shell/impala_client.py
+++ b/shell/impala_client.py
@@ -48,7 +48,7 @@ from thrift.transport.TSocket import TSocket
 from thrift.transport.TTransport import TBufferedTransport, TTransportException
 from thrift.Thrift import TApplicationException, TException
 from shell_exceptions import (RPCException, QueryStateException, 
DisconnectedException,
-    QueryCancelledByShellException, MissingThriftMethodException)
+    QueryCancelledByShellException, MissingThriftMethodException, HttpError)
 
 
 # Helpers to extract and convert HS2's representation of values to the display 
version.
@@ -951,8 +951,12 @@ class ImpalaHS2Client(ImpalaClient):
     while num_tries <= max_tries:
       raise_error = (num_tries == max_tries)
       # Generate a retry message, only if retries and supported.
+      will_retry = False
+      retry_secs = None
       if retry_on_error and self.max_tries > 1:
         retry_msg = 'Num remaining tries: {0}'.format(max_tries - num_tries)
+        if num_tries < max_tries:
+          will_retry = True
       else:
         retry_msg = ''
       try:
@@ -971,12 +975,32 @@ class ImpalaHS2Client(ImpalaClient):
           raise MissingThriftMethodException(t.message)
         raise RPCException("Application Exception : {0}".format(t),
           RPC_EXCEPTION_TAPPLICATION)
+      except HttpError as h:
+        if will_retry:
+          retry_after = h.http_headers.get('Retry-After', None)
+          if retry_after:
+            try:
+              retry_secs = int(retry_after)
+            except ValueError:
+              retry_secs = None
+        if retry_secs:
+          print('Caught exception {0}, type={1} in {2}. {3}, retry after {4} 
secs'
+                .format(str(h), type(h), rpc.__name__, retry_msg, retry_secs),
+                file=sys.stderr)
+        else:
+          print('Caught exception {0}, type={1} in {2}. {3}'
+                .format(str(h), type(h), rpc.__name__, retry_msg), 
file=sys.stderr)
+        if raise_error:
+          raise
       except Exception as e:
         print('Caught exception {0}, type={1} in {2}. {3}'
           .format(str(e), type(e), rpc.__name__, retry_msg), file=sys.stderr)
         if raise_error:
           raise
-      time.sleep(self._get_sleep_interval_for_retries(num_tries))
+      if retry_secs:
+        time.sleep(retry_secs)
+      else:
+        time.sleep(self._get_sleep_interval_for_retries(num_tries))
       num_tries += 1
 
   def _check_hs2_rpc_status(self, status):
diff --git a/shell/shell_exceptions.py b/shell/shell_exceptions.py
index 800004b..538ca1e 100644
--- a/shell/shell_exceptions.py
+++ b/shell/shell_exceptions.py
@@ -51,3 +51,18 @@ class MissingThriftMethodException(Exception):
 
   def __str__(self):
       return self.value
+
+
+class HttpError(Exception):
+  """An error containing an http response code and a possible message body"""
+  def __init__(self, code, message, body, http_headers):
+    self.code = code
+    self.message = message
+    self.body = body
+    self.http_headers = http_headers
+
+  def __str__(self):
+    if not self.body:
+      return "HTTP code {}: {}".format(self.code, self.message)
+    else:
+      return "HTTP code {}: {} [{}]".format(self.code, self.message, self.body)
diff --git a/tests/custom_cluster/test_hs2_fault_injection.py 
b/tests/custom_cluster/test_hs2_fault_injection.py
index f450287..47107ba 100644
--- a/tests/custom_cluster/test_hs2_fault_injection.py
+++ b/tests/custom_cluster/test_hs2_fault_injection.py
@@ -22,6 +22,7 @@ import requests
 
 from shell.ImpalaHttpClient import ImpalaHttpClient
 from shell.impala_client import ImpalaHS2Client
+from shell.shell_exceptions import HttpError
 from tests.common.impala_test_suite import IMPALAD_HS2_HTTP_HOST_PORT
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from time import sleep
@@ -40,16 +41,21 @@ class FaultInjectingHttpClient(ImpalaHttpClient, object):
     self.fault_frequency = 0
     self.fault_enabled = False
 
-  def enable_fault(self, http_code, http_message, fault_frequency):
+  def enable_fault(self, http_code, http_message, fault_frequency, 
fault_body=None,
+                   fault_headers=None):
     """Inject fault with given code and message at the given frequency.
     As an example, if frequency is 20% then inject fault for 1 out of every 5
     requests."""
+    if fault_headers is None:
+      fault_headers = {}
     self.fault_enabled = True
     self.fault_code = http_code
     self.fault_message = http_message
     self.fault_frequency = fault_frequency
     assert fault_frequency > 0 and fault_frequency <= 1
     self.num_requests = 0
+    self.fault_body = fault_body
+    self.fault_headers = fault_headers
 
   def disable_fault(self):
     self.fault_enabled = False
@@ -58,7 +64,7 @@ class FaultInjectingHttpClient(ImpalaHttpClient, object):
     if self.code >= 300:
       # Report any http response code that is not 1XX (informational response) 
or
       # 2XX (successful).
-      raise Exception("HTTP code {}: {}".format(self.code, self.message))
+      raise HttpError(self.code, self.message, self.body, self.headers)
 
   def _inject_fault(self):
     if not self.fault_enabled:
@@ -76,6 +82,8 @@ class FaultInjectingHttpClient(ImpalaHttpClient, object):
     if self.fault_code is not None and self._inject_fault():
       self.code = self.fault_code
       self.message = self.fault_message
+      self.body = self.fault_body
+      self.headers = self.fault_headers
       self._check_code()
 
 
@@ -125,19 +133,40 @@ class TestHS2FaultInjection(CustomClusterTestSuite):
   def __expect_msg_retry(self, impala_rpc_name):
     """Returns expected log message for rpcs which can be retried"""
     return ("Caught exception HTTP code 502: Injected Fault, "
-      "type=<type 'exceptions.Exception'> in {0}. "
+      "type=<class 'shell.shell_exceptions.HttpError'> in {0}. "
       "Num remaining tries: 3".format(impala_rpc_name))
 
+  def __expect_msg_retry_with_extra(self, impala_rpc_name):
+    """Returns expected log message for rpcs which can be retried and where 
the http
+    message has a message body"""
+    return ("Caught exception HTTP code 503: Injected Fault [EXTRA], "
+      "type=<class 'shell.shell_exceptions.HttpError'> in {0}. "
+      "Num remaining tries: 3".format(impala_rpc_name))
+
+  def __expect_msg_retry_with_retry_after(self, impala_rpc_name):
+    """Returns expected log message for rpcs which can be retried and the http
+    message has a body and a Retry-After header that can be correctly 
decoded"""
+    return ("Caught exception HTTP code 503: Injected Fault [EXTRA], "
+      "type=<class 'shell.shell_exceptions.HttpError'> in {0}. "
+      "Num remaining tries: 3, retry after 1 secs".format(impala_rpc_name))
+
+  def __expect_msg_retry_with_retry_after_no_extra(self, impala_rpc_name):
+    """Returns expected log message for rpcs which can be retried and the http
+    message has a Retry-After header that can be correctly decoded"""
+    return ("Caught exception HTTP code 503: Injected Fault, "
+      "type=<class 'shell.shell_exceptions.HttpError'> in {0}. "
+      "Num remaining tries: 3, retry after 1 secs".format(impala_rpc_name))
+
   def __expect_msg_no_retry(self, impala_rpc_name):
     """Returns expected log message for rpcs which can not be retried"""
     return ("Caught exception HTTP code 502: Injected Fault, "
-      "type=<type 'exceptions.Exception'> in {0}. ".format(impala_rpc_name))
+      "type=<class 'shell.shell_exceptions.HttpError'> in {0}. 
".format(impala_rpc_name))
 
   @pytest.mark.execute_serially
   def test_connect(self, capsys):
     """Tests fault injection in ImpalaHS2Client's connect().
     OpenSession and CloseImpalaOperation rpcs fail.
-    Retries results in a successfull connection."""
+    Retries results in a successful connection."""
     self.transport.enable_fault(502, "Injected Fault", 0.20)
     self.connect()
     output = capsys.readouterr()[1].splitlines()
@@ -145,6 +174,72 @@ class TestHS2FaultInjection(CustomClusterTestSuite):
     assert output[2] == self.__expect_msg_retry("CloseImpalaOperation")
 
   @pytest.mark.execute_serially
+  def test_connect_proxy(self, capsys):
+    """Tests fault injection in ImpalaHS2Client's connect().
+    The injected error has a message body.
+    OpenSession and CloseImpalaOperation rpcs fail.
+    Retries results in a successful connection."""
+    self.transport.enable_fault(503, "Injected Fault", 0.20, 'EXTRA')
+    self.connect()
+    output = capsys.readouterr()[1].splitlines()
+    assert output[1] == self.__expect_msg_retry_with_extra("OpenSession")
+    assert output[2] == 
self.__expect_msg_retry_with_extra("CloseImpalaOperation")
+
+  @pytest.mark.execute_serially
+  def test_connect_proxy_no_retry(self, capsys):
+    """Tests fault injection in ImpalaHS2Client's connect().
+    The injected error contains headers but no Retry-After header.
+    OpenSession and CloseImpalaOperation rpcs fail.
+    Retries results in a successful connection."""
+    self.transport.enable_fault(503, "Injected Fault", 0.20, 'EXTRA',
+                                {"header1": "value1"})
+    self.connect()
+    output = capsys.readouterr()[1].splitlines()
+    assert output[1] == self.__expect_msg_retry_with_extra("OpenSession")
+    assert output[2] == 
self.__expect_msg_retry_with_extra("CloseImpalaOperation")
+
+  @pytest.mark.execute_serially
+  def test_connect_proxy_bad_retry(self, capsys):
+    """Tests fault injection in ImpalaHS2Client's connect().
+    The injected error contains a body and a junk Retry-After header.
+    OpenSession and CloseImpalaOperation rpcs fail.
+    Retries results in a successful connection."""
+    self.transport.enable_fault(503, "Injected Fault", 0.20, 'EXTRA',
+                                {"header1": "value1",
+                                 "Retry-After": "junk"})
+    self.connect()
+    output = capsys.readouterr()[1].splitlines()
+    assert output[1] == self.__expect_msg_retry_with_extra("OpenSession")
+    assert output[2] == 
self.__expect_msg_retry_with_extra("CloseImpalaOperation")
+
+  @pytest.mark.execute_serially
+  def test_connect_proxy_retry(self, capsys):
+    """Tests fault injection in ImpalaHS2Client's connect().
+    The injected error contains a body and a Retry-After header that can be 
decoded.
+    Retries results in a successful connection."""
+    self.transport.enable_fault(503, "Injected Fault", 0.20, 'EXTRA',
+                                {"header1": "value1",
+                                  "Retry-After": "1"})
+    self.connect()
+    output = capsys.readouterr()[1].splitlines()
+    assert output[1] == self.__expect_msg_retry_with_retry_after("OpenSession")
+    assert output[2] == 
self.__expect_msg_retry_with_retry_after("CloseImpalaOperation")
+
+  @pytest.mark.execute_serially
+  def test_connect_proxy_retry_no_body(self, capsys):
+    """Tests fault injection in ImpalaHS2Client's connect().
+    The injected error has no body but does have a Retry-After header that can 
be decoded.
+    Retries results in a successful connection."""
+    self.transport.enable_fault(503, "Injected Fault", 0.20, None,
+                                {"header1": "value1",
+                                  "Retry-After": "1"})
+    self.connect()
+    output = capsys.readouterr()[1].splitlines()
+    assert output[1] == 
self.__expect_msg_retry_with_retry_after_no_extra("OpenSession")
+    assert output[2] == self.\
+      __expect_msg_retry_with_retry_after_no_extra("CloseImpalaOperation")
+
+  @pytest.mark.execute_serially
   def test_close_connection(self, capsys):
     """Tests fault injection in ImpalaHS2Client's close_connection().
     CloseSession rpc fails due to the fault, but succeeds anyways since 
exceptions

Reply via email to