Repository: impala
Updated Branches:
  refs/heads/2.x ffac1ab48 -> 33dbdd0bb


IMPALA-6908: IsConnResetTException() should include ECONNRESET

The utility function IsConnResetTException() attempted to match error
strings from RPCs that fail due to the remote end resetting the connection
for any reason. However, it did not take care of the situation where
ECONNRESET was sent on a cluster not using TLS.

This patch adds this case as well and adds a custom cluster fault
injection test to test the same.

Change-Id: I1bb997a833917e5166c9ca192da219f50f4439e2
Reviewed-on: http://gerrit.cloudera.org:8080/10265
Reviewed-by: Sailesh Mukil <sail...@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/33dbdd0b
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/33dbdd0b
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/33dbdd0b

Branch: refs/heads/2.x
Commit: 33dbdd0bbe6973b3e926d9e392f4a4edd6a0c41e
Parents: ffac1ab
Author: Sailesh Mukil <sail...@cloudera.com>
Authored: Mon Apr 30 13:53:39 2018 -0700
Committer: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Committed: Tue May 8 21:03:40 2018 +0000

----------------------------------------------------------------------
 be/src/rpc/thrift-util.cc                  | 26 ++++++++++++++-----------
 be/src/testutil/fault-injection-util.cc    |  3 +++
 be/src/testutil/fault-injection-util.h     |  1 +
 tests/custom_cluster/test_rpc_exception.py | 19 +++++++++++-------
 4 files changed, 31 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/33dbdd0b/be/src/rpc/thrift-util.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-util.cc b/be/src/rpc/thrift-util.cc
index c0ba537..2988940 100644
--- a/be/src/rpc/thrift-util.cc
+++ b/be/src/rpc/thrift-util.cc
@@ -57,16 +57,6 @@ using namespace apache::thrift::server;
 using namespace apache::thrift::protocol;
 using namespace apache::thrift::concurrency;
 
-// IsRecvTimeoutTException() and IsSendFailTException() make assumption about 
the
-// implementation of read(), write() and write_partial() in TSocket.cpp and 
those
-// functions may change between different versions of Thrift.
-static_assert(PACKAGE_VERSION[0] == '0', "");
-static_assert(PACKAGE_VERSION[1] == '.', "");
-static_assert(PACKAGE_VERSION[2] == '9', "");
-static_assert(PACKAGE_VERSION[3] == '.', "");
-static_assert(PACKAGE_VERSION[4] == '0', "");
-static_assert(PACKAGE_VERSION[5] == '\0', "");
-
 // Thrift defines operator< but does not implement it. This is a stub
 // implementation so we can link.
 bool Apache::Hadoop::Hive::Partition::operator<(
@@ -185,6 +175,16 @@ bool TNetworkAddressComparator(const TNetworkAddress& a, 
const TNetworkAddress&
   return false;
 }
 
+// IsRecvTimeoutTException() and IsConnResetTException() make assumptions 
about the
+// implementation of read(), write() and write_partial() in TSocket.cpp and 
those
+// functions may change between different versions of Thrift.
+static_assert(PACKAGE_VERSION[0] == '0', "");
+static_assert(PACKAGE_VERSION[1] == '.', "");
+static_assert(PACKAGE_VERSION[2] == '9', "");
+static_assert(PACKAGE_VERSION[3] == '.', "");
+static_assert(PACKAGE_VERSION[4] == '0', "");
+static_assert(PACKAGE_VERSION[5] == '\0', "");
+
 bool IsRecvTimeoutTException(const TTransportException& e) {
   // String taken from TSocket::read() Thrift's TSocket.cpp.
   return (e.getType() == TTransportException::TIMED_OUT &&
@@ -198,10 +198,14 @@ bool IsConnResetTException(const TTransportException& e) {
   // As readAll() is reading non-zero length payload, this can only mean 
recv() called
   // by read() returns 0. According to man page of recv(), this implies a 
stream socket
   // peer has performed an orderly shutdown.
+  // "ECONNRESET" is only returned in 0.9.0, since in 0.9.3, we get
+  // END_OF_FILE: "No more data to read." instead, for the same error code.
   return (e.getType() == TTransportException::END_OF_FILE &&
              strstr(e.what(), "No more data to read.") != nullptr) ||
          (e.getType() == TTransportException::INTERNAL_ERROR &&
-             strstr(e.what(), "SSL_read: Connection reset by peer") != 
nullptr);
+             strstr(e.what(), "SSL_read: Connection reset by peer") != 
nullptr) ||
+         (e.getType() == TTransportException::NOT_OPEN &&
+             strstr(e.what(), "ECONNRESET") != nullptr);
 }
 
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/33dbdd0b/be/src/testutil/fault-injection-util.cc
----------------------------------------------------------------------
diff --git a/be/src/testutil/fault-injection-util.cc 
b/be/src/testutil/fault-injection-util.cc
index f378c48..925fcd9 100644
--- a/be/src/testutil/fault-injection-util.cc
+++ b/be/src/testutil/fault-injection-util.cc
@@ -77,6 +77,9 @@ void FaultInjectionUtil::InjectRpcException(bool is_send, int 
freq) {
         case RPC_EXCEPTION_SEND_STALE_CONNECTION:
           throw TTransportException(TTransportException::END_OF_FILE,
              "No more data to read.");
+        case RPC_EXCEPTION_SEND_STALE_CONNECTION_RESET:
+          throw TTransportException(TTransportException::NOT_OPEN,
+             "ECONNRESET");
         case RPC_EXCEPTION_SSL_SEND_STALE_CONNECTION:
           throw TSSLException("SSL_read: Connection reset by peer");
         // fall through for the default case.

http://git-wip-us.apache.org/repos/asf/impala/blob/33dbdd0b/be/src/testutil/fault-injection-util.h
----------------------------------------------------------------------
diff --git a/be/src/testutil/fault-injection-util.h 
b/be/src/testutil/fault-injection-util.h
index f17327f..40cf702 100644
--- a/be/src/testutil/fault-injection-util.h
+++ b/be/src/testutil/fault-injection-util.h
@@ -41,6 +41,7 @@ class FaultInjectionUtil {
     RPC_EXCEPTION_NONE = 0,
     RPC_EXCEPTION_SEND_CLOSED_CONNECTION,
     RPC_EXCEPTION_SEND_STALE_CONNECTION,
+    RPC_EXCEPTION_SEND_STALE_CONNECTION_RESET,
     RPC_EXCEPTION_SEND_TIMEDOUT,
     RPC_EXCEPTION_RECV_CLOSED_CONNECTION,
     RPC_EXCEPTION_RECV_TIMEDOUT,

http://git-wip-us.apache.org/repos/asf/impala/blob/33dbdd0b/tests/custom_cluster/test_rpc_exception.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_rpc_exception.py 
b/tests/custom_cluster/test_rpc_exception.py
index 2784e88..c4c4ea6 100644
--- a/tests/custom_cluster/test_rpc_exception.py
+++ b/tests/custom_cluster/test_rpc_exception.py
@@ -65,42 +65,47 @@ class TestRPCException(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=3")
+  def test_rpc_send_stale_connection_reset(self, vector):
+    self.execute_test_query(None)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=4")
   def test_rpc_send_timed_out(self, vector):
     self.execute_test_query(None)
 
   @SkipIf.not_thrift
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=4")
+  @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=5")
   def test_rpc_recv_closed_connection(self, vector):
     self.execute_test_query("Called read on non-open socket")
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=5")
+  @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=6")
   def test_rpc_recv_timed_out(self, vector):
     self.execute_test_query(None)
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=6")
+  @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=7")
   def test_rpc_secure_send_closed_connection(self, vector):
     self.execute_test_query(None)
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=7")
+  @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=8")
   def test_rpc_secure_send_stale_connection(self, vector):
     self.execute_test_query(None)
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=8")
+  @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=9")
   def test_rpc_secure_send_timed_out(self, vector):
     self.execute_test_query(None)
 
   @SkipIf.not_thrift
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=9")
+  @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=10")
   def test_rpc_secure_recv_closed_connection(self, vector):
     self.execute_test_query("TTransportException: Transport not open")
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=10")
+  @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=11")
   def test_rpc_secure_recv_timed_out(self, vector):
     self.execute_test_query(None)

Reply via email to