Repository: impala Updated Branches: refs/heads/2.x ffac1ab48 -> 33dbdd0bb
IMPALA-6908: IsConnResetTException() should include ECONNRESET The utility function IsConnResetTException() attempted to match error strings from RPCs that fail due to the remote end resetting the connection for any reason. However, it did not take care of the situation where ECONNRESET was sent on a cluster not using TLS. This patch adds this case as well and adds a custom cluster fault injection test to test the same. Change-Id: I1bb997a833917e5166c9ca192da219f50f4439e2 Reviewed-on: http://gerrit.cloudera.org:8080/10265 Reviewed-by: Sailesh Mukil <sail...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/33dbdd0b Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/33dbdd0b Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/33dbdd0b Branch: refs/heads/2.x Commit: 33dbdd0bbe6973b3e926d9e392f4a4edd6a0c41e Parents: ffac1ab Author: Sailesh Mukil <sail...@cloudera.com> Authored: Mon Apr 30 13:53:39 2018 -0700 Committer: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Committed: Tue May 8 21:03:40 2018 +0000 ---------------------------------------------------------------------- be/src/rpc/thrift-util.cc | 26 ++++++++++++++----------- be/src/testutil/fault-injection-util.cc | 3 +++ be/src/testutil/fault-injection-util.h | 1 + tests/custom_cluster/test_rpc_exception.py | 19 +++++++++++------- 4 files changed, 31 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/33dbdd0b/be/src/rpc/thrift-util.cc ---------------------------------------------------------------------- diff --git a/be/src/rpc/thrift-util.cc b/be/src/rpc/thrift-util.cc index c0ba537..2988940 100644 --- a/be/src/rpc/thrift-util.cc +++ b/be/src/rpc/thrift-util.cc @@ -57,16 +57,6 @@ using namespace apache::thrift::server; using namespace apache::thrift::protocol; using namespace apache::thrift::concurrency; -// IsRecvTimeoutTException() and IsSendFailTException() make assumption about the -// implementation of read(), write() and write_partial() in TSocket.cpp and those -// functions may change between different versions of Thrift. -static_assert(PACKAGE_VERSION[0] == '0', ""); -static_assert(PACKAGE_VERSION[1] == '.', ""); -static_assert(PACKAGE_VERSION[2] == '9', ""); -static_assert(PACKAGE_VERSION[3] == '.', ""); -static_assert(PACKAGE_VERSION[4] == '0', ""); -static_assert(PACKAGE_VERSION[5] == '\0', ""); - // Thrift defines operator< but does not implement it. This is a stub // implementation so we can link. bool Apache::Hadoop::Hive::Partition::operator<( @@ -185,6 +175,16 @@ bool TNetworkAddressComparator(const TNetworkAddress& a, const TNetworkAddress& return false; } +// IsRecvTimeoutTException() and IsConnResetTException() make assumptions about the +// implementation of read(), write() and write_partial() in TSocket.cpp and those +// functions may change between different versions of Thrift. +static_assert(PACKAGE_VERSION[0] == '0', ""); +static_assert(PACKAGE_VERSION[1] == '.', ""); +static_assert(PACKAGE_VERSION[2] == '9', ""); +static_assert(PACKAGE_VERSION[3] == '.', ""); +static_assert(PACKAGE_VERSION[4] == '0', ""); +static_assert(PACKAGE_VERSION[5] == '\0', ""); + bool IsRecvTimeoutTException(const TTransportException& e) { // String taken from TSocket::read() Thrift's TSocket.cpp. return (e.getType() == TTransportException::TIMED_OUT && @@ -198,10 +198,14 @@ bool IsConnResetTException(const TTransportException& e) { // As readAll() is reading non-zero length payload, this can only mean recv() called // by read() returns 0. According to man page of recv(), this implies a stream socket // peer has performed an orderly shutdown. + // "ECONNRESET" is only returned in 0.9.0, since in 0.9.3, we get + // END_OF_FILE: "No more data to read." instead, for the same error code. return (e.getType() == TTransportException::END_OF_FILE && strstr(e.what(), "No more data to read.") != nullptr) || (e.getType() == TTransportException::INTERNAL_ERROR && - strstr(e.what(), "SSL_read: Connection reset by peer") != nullptr); + strstr(e.what(), "SSL_read: Connection reset by peer") != nullptr) || + (e.getType() == TTransportException::NOT_OPEN && + strstr(e.what(), "ECONNRESET") != nullptr); } } http://git-wip-us.apache.org/repos/asf/impala/blob/33dbdd0b/be/src/testutil/fault-injection-util.cc ---------------------------------------------------------------------- diff --git a/be/src/testutil/fault-injection-util.cc b/be/src/testutil/fault-injection-util.cc index f378c48..925fcd9 100644 --- a/be/src/testutil/fault-injection-util.cc +++ b/be/src/testutil/fault-injection-util.cc @@ -77,6 +77,9 @@ void FaultInjectionUtil::InjectRpcException(bool is_send, int freq) { case RPC_EXCEPTION_SEND_STALE_CONNECTION: throw TTransportException(TTransportException::END_OF_FILE, "No more data to read."); + case RPC_EXCEPTION_SEND_STALE_CONNECTION_RESET: + throw TTransportException(TTransportException::NOT_OPEN, + "ECONNRESET"); case RPC_EXCEPTION_SSL_SEND_STALE_CONNECTION: throw TSSLException("SSL_read: Connection reset by peer"); // fall through for the default case. http://git-wip-us.apache.org/repos/asf/impala/blob/33dbdd0b/be/src/testutil/fault-injection-util.h ---------------------------------------------------------------------- diff --git a/be/src/testutil/fault-injection-util.h b/be/src/testutil/fault-injection-util.h index f17327f..40cf702 100644 --- a/be/src/testutil/fault-injection-util.h +++ b/be/src/testutil/fault-injection-util.h @@ -41,6 +41,7 @@ class FaultInjectionUtil { RPC_EXCEPTION_NONE = 0, RPC_EXCEPTION_SEND_CLOSED_CONNECTION, RPC_EXCEPTION_SEND_STALE_CONNECTION, + RPC_EXCEPTION_SEND_STALE_CONNECTION_RESET, RPC_EXCEPTION_SEND_TIMEDOUT, RPC_EXCEPTION_RECV_CLOSED_CONNECTION, RPC_EXCEPTION_RECV_TIMEDOUT, http://git-wip-us.apache.org/repos/asf/impala/blob/33dbdd0b/tests/custom_cluster/test_rpc_exception.py ---------------------------------------------------------------------- diff --git a/tests/custom_cluster/test_rpc_exception.py b/tests/custom_cluster/test_rpc_exception.py index 2784e88..c4c4ea6 100644 --- a/tests/custom_cluster/test_rpc_exception.py +++ b/tests/custom_cluster/test_rpc_exception.py @@ -65,42 +65,47 @@ class TestRPCException(CustomClusterTestSuite): @pytest.mark.execute_serially @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=3") + def test_rpc_send_stale_connection_reset(self, vector): + self.execute_test_query(None) + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=4") def test_rpc_send_timed_out(self, vector): self.execute_test_query(None) @SkipIf.not_thrift @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=4") + @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=5") def test_rpc_recv_closed_connection(self, vector): self.execute_test_query("Called read on non-open socket") @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=5") + @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=6") def test_rpc_recv_timed_out(self, vector): self.execute_test_query(None) @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=6") + @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=7") def test_rpc_secure_send_closed_connection(self, vector): self.execute_test_query(None) @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=7") + @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=8") def test_rpc_secure_send_stale_connection(self, vector): self.execute_test_query(None) @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=8") + @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=9") def test_rpc_secure_send_timed_out(self, vector): self.execute_test_query(None) @SkipIf.not_thrift @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=9") + @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=10") def test_rpc_secure_recv_closed_connection(self, vector): self.execute_test_query("TTransportException: Transport not open") @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=10") + @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=11") def test_rpc_secure_recv_timed_out(self, vector): self.execute_test_query(None)