IMPALA-5537: Retry RPC on somes exceptions with SSL connection

After the fix for IMPALA-5388, all TSSLException thrown will be
treated as fatal error and the query will fail. Turns out that
this is too strict and in a secure cluster under load, queries
can easily hit timeout waiting for RPC response.

When running without SSL, we call RetryRpcRecv() to retry the recv
part of an RPC if the TSocket underlying the RPC gets an EAGAIN
during recv(). This change extends that logic to cover secure
connection. In particular, we pattern match against the exception
string "SSL_read: Resource temporarily unavailable" which corresponds
to EAGAIN error code being thrown in the SSL_read() path.

Similarly, we will handle closed connection in send() path with
secure connection by pattern matching against the exception string
"TTransportException: Transport not open". To verify that the exception
is thrown during the send part of a RPC call, the RPC client interface
has been augmented to take a bool* argument which is set to true after
the send part of the RPC has completed but before the recv part starts.
If DoRPC() catches an exception and the send part isn't done yet, the
entire RPC if the exception string matches certain substrings which are
safe to retry.

The fault injection utility has also been updated to distinguish between
time out and lost connection to exercise different error handling paths
in the send and recv paths.

Change-Id: I8243d4cac93c453e9396b0e24f41e147c8637b8c
Reviewed-on: http://gerrit.cloudera.org:8080/7229
Reviewed-by: Dan Hecht <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/23565166
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/23565166
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/23565166

Branch: refs/heads/master
Commit: 23565166886d7e7b33b477e391c3e6b658a80b32
Parents: 53287df
Author: Michael Ho <[email protected]>
Authored: Mon Jun 19 19:56:59 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Wed Jun 21 10:04:15 2017 +0000

----------------------------------------------------------------------
 be/src/catalog/catalog-service-client-wrapper.h | 103 +++++++++++++++++++
 be/src/exec/catalog-op-executor.cc              |  17 +--
 be/src/rpc/thrift-server-test.cc                |  55 +++++-----
 be/src/rpc/thrift-util.cc                       |  22 ++--
 be/src/runtime/backend-client.h                 |  52 +++++++++-
 be/src/runtime/client-cache-types.h             |   6 +-
 be/src/runtime/client-cache.h                   |  14 ++-
 be/src/service/client-request-state.cc          |   4 +-
 .../statestore-service-client-wrapper.h         |  55 ++++++++++
 .../statestore-subscriber-client-wrapper.h      |  64 ++++++++++++
 be/src/statestore/statestore-subscriber.cc      |   9 +-
 be/src/statestore/statestore-subscriber.h       |   3 +-
 be/src/statestore/statestore.cc                 |  17 +--
 be/src/statestore/statestore.h                  |   7 +-
 be/src/testutil/fault-injection-util.cc         |  36 ++++---
 be/src/testutil/fault-injection-util.h          |  12 ++-
 tests/custom_cluster/test_rpc_exception.py      |  41 ++++++--
 17 files changed, 425 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/23565166/be/src/catalog/catalog-service-client-wrapper.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-service-client-wrapper.h 
b/be/src/catalog/catalog-service-client-wrapper.h
new file mode 100644
index 0000000..22ac56d
--- /dev/null
+++ b/be/src/catalog/catalog-service-client-wrapper.h
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef CATALOG_CATALOG_SERVICE_CLIENT_WRAPPER_H
+#define CATALOG_CATALOG_SERVICE_CLIENT_WRAPPER_H
+
+#include "gen-cpp/CatalogService.h"
+
+namespace impala {
+
+class CatalogServiceClientWrapper : public CatalogServiceClient {
+ public:
+  CatalogServiceClientWrapper(
+      boost::shared_ptr<::apache::thrift::protocol::TProtocol> prot)
+    : CatalogServiceClient(prot) {
+  }
+
+  CatalogServiceClientWrapper(
+      boost::shared_ptr<::apache::thrift::protocol::TProtocol> iprot,
+      boost::shared_ptr<::apache::thrift::protocol::TProtocol> oprot)
+    : CatalogServiceClient(iprot, oprot) {
+  }
+
+/// We intentionally disable this clang warning as we intend to hide the
+/// the same-named functions defined in the base class.
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Woverloaded-virtual"
+
+  void ExecDdl(TDdlExecResponse& _return, const TDdlExecRequest& req, bool* 
send_done) {
+    DCHECK(!*send_done);
+    send_ExecDdl(req);
+    *send_done = true;
+    recv_ExecDdl(_return);
+  }
+
+  void GetCatalogObject(TGetCatalogObjectResponse& _return,
+      const TGetCatalogObjectRequest& req, bool* send_done) {
+    DCHECK(!*send_done);
+    send_GetCatalogObject(req);
+    *send_done = true;
+    recv_GetCatalogObject(_return);
+  }
+
+  void ResetMetadata(TResetMetadataResponse& _return, const 
TResetMetadataRequest& req,
+      bool* send_done) {
+    DCHECK(!*send_done);
+    send_ResetMetadata(req);
+    *send_done = true;
+    recv_ResetMetadata(_return);
+  }
+
+  void UpdateCatalog(TUpdateCatalogResponse& _return, const 
TUpdateCatalogRequest& req,
+      bool* send_done) {
+    DCHECK(!*send_done);
+    send_UpdateCatalog(req);
+    *send_done = true;
+    recv_UpdateCatalog(_return);
+  }
+
+  void GetFunctions(TGetFunctionsResponse& _return, const 
TGetFunctionsRequest& req,
+      bool* send_done) {
+    DCHECK(!*send_done);
+    send_GetFunctions(req);
+    *send_done = true;
+    recv_GetFunctions(_return);
+  }
+
+  void PrioritizeLoad(TPrioritizeLoadResponse& _return, const 
TPrioritizeLoadRequest& req,
+      bool* send_done) {
+    DCHECK(!*send_done);
+    send_PrioritizeLoad(req);
+    *send_done = true;
+    recv_PrioritizeLoad(_return);
+  }
+
+  void SentryAdminCheck(TSentryAdminCheckResponse& _return,
+      const TSentryAdminCheckRequest& req, bool* send_done) {
+    DCHECK(!*send_done);
+    send_SentryAdminCheck(req);
+    *send_done = true;
+    recv_SentryAdminCheck(_return);
+  }
+
+#pragma clang diagnostic pop
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/23565166/be/src/exec/catalog-op-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/catalog-op-executor.cc 
b/be/src/exec/catalog-op-executor.cc
index 11c21cc..252d345 100644
--- a/be/src/exec/catalog-op-executor.cc
+++ b/be/src/exec/catalog-op-executor.cc
@@ -19,8 +19,9 @@
 
 #include <sstream>
 
-#include "exec/incr-stats-util.h"
 #include "common/status.h"
+#include "catalog/catalog-service-client-wrapper.h"
+#include "exec/incr-stats-util.h"
 #include "runtime/lib-cache.h"
 #include "runtime/client-cache-types.h"
 #include "runtime/exec-env.h"
@@ -59,8 +60,8 @@ Status CatalogOpExecutor::Exec(const TCatalogOpRequest& 
request) {
       DCHECK(request.ddl_params.ddl_type != TDdlType::COMPUTE_STATS);
 
       exec_response_.reset(new TDdlExecResponse());
-      RETURN_IF_ERROR(client.DoRpc(&CatalogServiceClient::ExecDdl, 
request.ddl_params,
-          exec_response_.get()));
+      RETURN_IF_ERROR(client.DoRpc(&CatalogServiceClientWrapper::ExecDdl,
+          request.ddl_params, exec_response_.get()));
       catalog_update_result_.reset(
           new TCatalogUpdateResult(exec_response_.get()->result));
       Status status(exec_response_->result.status);
@@ -75,7 +76,7 @@ Status CatalogOpExecutor::Exec(const TCatalogOpRequest& 
request) {
     }
     case TCatalogOpType::RESET_METADATA: {
       TResetMetadataResponse response;
-      RETURN_IF_ERROR(client.DoRpc(&CatalogServiceClient::ResetMetadata,
+      RETURN_IF_ERROR(client.DoRpc(&CatalogServiceClientWrapper::ResetMetadata,
           request.reset_metadata_params, &response));
       catalog_update_result_.reset(new TCatalogUpdateResult(response.result));
       return Status(response.result.status);
@@ -264,7 +265,7 @@ Status CatalogOpExecutor::GetCatalogObject(const 
TCatalogObject& object_desc,
 
   TGetCatalogObjectResponse response;
   RETURN_IF_ERROR(
-      client.DoRpc(&CatalogServiceClient::GetCatalogObject, request, 
&response));
+      client.DoRpc(&CatalogServiceClientWrapper::GetCatalogObject, request, 
&response));
   *result = response.catalog_object;
   return Status::OK();
 }
@@ -276,7 +277,8 @@ Status CatalogOpExecutor::PrioritizeLoad(const 
TPrioritizeLoadRequest& req,
   Status status;
   CatalogServiceConnection client(env_->catalogd_client_cache(), address, 
&status);
   RETURN_IF_ERROR(status);
-  RETURN_IF_ERROR(client.DoRpc(&CatalogServiceClient::PrioritizeLoad, req, 
result));
+  RETURN_IF_ERROR(
+      client.DoRpc(&CatalogServiceClientWrapper::PrioritizeLoad, req, result));
   return Status::OK();
 }
 
@@ -287,6 +289,7 @@ Status CatalogOpExecutor::SentryAdminCheck(const 
TSentryAdminCheckRequest& req)
   CatalogServiceConnection client(env_->catalogd_client_cache(), address, 
&cnxn_status);
   RETURN_IF_ERROR(cnxn_status);
   TSentryAdminCheckResponse resp;
-  RETURN_IF_ERROR(client.DoRpc(&CatalogServiceClient::SentryAdminCheck, req, 
&resp));
+  RETURN_IF_ERROR(
+      client.DoRpc(&CatalogServiceClientWrapper::SentryAdminCheck, req, 
&resp));
   return Status(resp.status);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/23565166/be/src/rpc/thrift-server-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-server-test.cc b/be/src/rpc/thrift-server-test.cc
index 21e4e81..f6edfb2 100644
--- a/be/src/rpc/thrift-server-test.cc
+++ b/be/src/rpc/thrift-server-test.cc
@@ -71,12 +71,12 @@ int GetServerPort() {
 
 TEST(ThriftServer, Connectivity) {
   int port = GetServerPort();
-  ThriftClient<StatestoreServiceClient> wrong_port_client("localhost",
+  ThriftClient<StatestoreServiceClientWrapper> wrong_port_client("localhost",
       port, "", NULL, false);
   ASSERT_FALSE(wrong_port_client.Open().ok());
 
-  ThriftServer* server = new ThriftServer("DummyStatestore", MakeProcessor(), 
port, NULL,
-      NULL, 5);
+  ThriftServer* server =
+      new ThriftServer("DummyStatestore", MakeProcessor(), port, NULL, NULL, 
5);
   ASSERT_OK(server->Start());
 
   // Test that client recovers from failure to connect.
@@ -95,30 +95,33 @@ TEST(SslTest, Connectivity) {
   ASSERT_OK(server->Start());
 
   FLAGS_ssl_client_ca_certificate = SERVER_CERT;
-  ThriftClient<StatestoreServiceClient> ssl_client(
+  ThriftClient<StatestoreServiceClientWrapper> ssl_client(
       "localhost", port, "", NULL, true);
   ASSERT_OK(ssl_client.Open());
   TRegisterSubscriberResponse resp;
-  EXPECT_NO_THROW({
-    ssl_client.iface()->RegisterSubscriber(resp, TRegisterSubscriberRequest());
+  bool send_done = false;
+  EXPECT_NO_THROW({ssl_client.iface()->RegisterSubscriber(resp,
+      TRegisterSubscriberRequest(), &send_done);
   });
 
   // Disable SSL for this client.
-  ThriftClient<StatestoreServiceClient> non_ssl_client(
+  ThriftClient<StatestoreServiceClientWrapper> non_ssl_client(
       "localhost", port, "", NULL, false);
   ASSERT_OK(non_ssl_client.Open());
+  send_done = false;
   EXPECT_THROW(non_ssl_client.iface()->RegisterSubscriber(
-      resp, TRegisterSubscriberRequest()), TTransportException);
+      resp, TRegisterSubscriberRequest(), &send_done), TTransportException);
 }
 
 TEST(SslTest, BadCertificate) {
   FLAGS_ssl_client_ca_certificate = "unknown";
   int port = GetServerPort();
-  ThriftClient<StatestoreServiceClient> ssl_client("localhost", port, "", 
NULL, true);
+  ThriftClient<StatestoreServiceClientWrapper>
+      ssl_client("localhost", port, "", NULL, true);
   ASSERT_FALSE(ssl_client.Open().ok());
 
-  ThriftServer* server = new ThriftServer("DummyStatestore", MakeProcessor(), 
port, NULL,
-      NULL, 5);
+  ThriftServer* server =
+      new ThriftServer("DummyStatestore", MakeProcessor(), port, NULL, NULL, 
5);
   ASSERT_OK(server->EnableSsl(SERVER_CERT, PRIVATE_KEY, "echo password"));
   ASSERT_OK(server->Start());
 
@@ -136,12 +139,13 @@ TEST(PasswordProtectedPemFile, CorrectOperation) {
       SERVER_CERT, PASSWORD_PROTECTED_PRIVATE_KEY, "echo password"));
   ASSERT_OK(server->Start());
   FLAGS_ssl_client_ca_certificate = SERVER_CERT;
-  ThriftClient<StatestoreServiceClient> ssl_client("localhost", port, "", 
NULL, true);
+  ThriftClient<StatestoreServiceClientWrapper>
+      ssl_client("localhost", port, "", NULL, true);
   ASSERT_OK(ssl_client.Open());
   TRegisterSubscriberResponse resp;
-  EXPECT_NO_THROW({
-    ssl_client.iface()->RegisterSubscriber(resp, TRegisterSubscriberRequest());
-  });
+  bool send_done = false;
+  EXPECT_NO_THROW({ssl_client.iface()->RegisterSubscriber(resp,
+      TRegisterSubscriberRequest(), &send_done);});
 }
 
 TEST(PasswordProtectedPemFile, BadPassword) {
@@ -163,15 +167,17 @@ TEST(SslTest, ClientBeforeServer) {
   // Instantiate a thrift client before a thrift server and test if it works 
(IMPALA-2747)
   FLAGS_ssl_client_ca_certificate = SERVER_CERT;
   int port = GetServerPort();
-  ThriftClient<StatestoreServiceClient> ssl_client("localhost", port, "", 
NULL, true);
-  ThriftServer* server = new ThriftServer("DummyStatestore", MakeProcessor(), 
port, NULL,
-      NULL, 5);
+  ThriftClient<StatestoreServiceClientWrapper>
+      ssl_client("localhost", port, "", NULL, true);
+  ThriftServer* server =
+      new ThriftServer("DummyStatestore", MakeProcessor(), port, NULL, NULL, 
5);
   ASSERT_OK(server->EnableSsl(SERVER_CERT, PRIVATE_KEY));
   ASSERT_OK(server->Start());
 
   ASSERT_OK(ssl_client.Open());
+  bool send_done = false;
   TRegisterSubscriberResponse resp;
-    ssl_client.iface()->RegisterSubscriber(resp, TRegisterSubscriberRequest());
+  ssl_client.iface()->RegisterSubscriber(resp, TRegisterSubscriberRequest(), 
&send_done);
 }
 
 /// Test disabled because requires a high ulimit -n on build machines. Since 
the test does
@@ -203,18 +209,19 @@ TEST(NoPasswordPemFile, BadServerCertificate) {
   EXPECT_OK(server->EnableSsl(BAD_SERVER_CERT, BAD_PRIVATE_KEY));
   EXPECT_OK(server->Start());
   FLAGS_ssl_client_ca_certificate = SERVER_CERT;
-  ThriftClient<StatestoreServiceClient> ssl_client(
+  ThriftClient<StatestoreServiceClientWrapper> ssl_client(
       "localhost", FLAGS_state_store_port + 5, "", NULL, true);
   EXPECT_OK(ssl_client.Open());
   TRegisterSubscriberResponse resp;
-  EXPECT_THROW({
-    ssl_client.iface()->RegisterSubscriber(resp, TRegisterSubscriberRequest());
+  bool send_done = false;
+  EXPECT_THROW({ssl_client.iface()->RegisterSubscriber(resp, 
TRegisterSubscriberRequest(),
+      &send_done);
   }, TSSLException);
   // Close and reopen the socket
   ssl_client.Close();
   EXPECT_OK(ssl_client.Open());
-  EXPECT_THROW({
-    ssl_client.iface()->RegisterSubscriber(resp, TRegisterSubscriberRequest());
+  EXPECT_THROW({ssl_client.iface()->RegisterSubscriber(resp, 
TRegisterSubscriberRequest(),
+      &send_done);
   }, TSSLException);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/23565166/be/src/rpc/thrift-util.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-util.cc b/be/src/rpc/thrift-util.cc
index 7ab5cd8..4e3f1b6 100644
--- a/be/src/rpc/thrift-util.cc
+++ b/be/src/rpc/thrift-util.cc
@@ -188,22 +188,28 @@ bool TNetworkAddressComparator(const TNetworkAddress& a, 
const TNetworkAddress&
 
 bool IsRecvTimeoutTException(const TTransportException& e) {
   // String taken from TSocket::read() Thrift's TSocket.cpp.
-  return e.getType() == TTransportException::TIMED_OUT &&
-      strstr(e.what(), "EAGAIN (timed out)") != nullptr;
+  return (e.getType() == TTransportException::TIMED_OUT &&
+             strstr(e.what(), "EAGAIN (timed out)") != nullptr) ||
+         (e.getType() == TTransportException::INTERNAL_ERROR &&
+             strstr(e.what(), "SSL_read: Resource temporarily unavailable") != 
nullptr);
 }
 
 // This function implements some heuristics to match against exception details
-// thrown by write_partial() in thrift library. This is not very robust as it's
-// possible that the receiver of the RPC call may have received all the RPC 
payload
-// but the ACK to the sender may have been dropped somehow. In which case, it's
-// not safe to retry the RPC if it's not idempotent.
-// TODO: end-to-end tracking of RPC calls to detect duplicated calls in the 
receiver side
+// thrown by functions in TSocket.cpp and TSSLSocket.cpp in thrift library. 
It's
+// expected the caller (e.g. DoRpc()) has already verified the send part of 
the RPC
+// didn't complete. It's only safe to retry an RPC if the send part didn't 
complete.
+// It's also expected that the RPC client will close the existing connection 
and reopen
+// a new connection before retrying the RPC. If the exception occurs after the 
send part
+// is done, only the recv part of the RPC can be retried.
 bool IsSendFailTException(const TTransportException& e) {
   // String taken from TSocket::write_partial() in Thrift's TSocket.cpp
   return (e.getType() == TTransportException::TIMED_OUT &&
              strstr(e.what(), "send timeout expired") != nullptr) ||
          (e.getType() == TTransportException::NOT_OPEN &&
-             (strstr(e.what(), "write() send()") != nullptr ||
+             // "TTransportException: Transport not open" can be from 
TSSLSocket.cpp
+             // when the underlying socket was closed.
+             (strstr(e.what(), "TTransportException: Transport not open") ||
+              strstr(e.what(), "write() send()") != nullptr ||
               strstr(e.what(), "Called write on non-open socket") != nullptr ||
               strstr(e.what(), "Socket send returned 0.") != nullptr));
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/23565166/be/src/runtime/backend-client.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/backend-client.h b/be/src/runtime/backend-client.h
index fe9c9bc..dd7c61e 100644
--- a/be/src/runtime/backend-client.h
+++ b/be/src/runtime/backend-client.h
@@ -40,7 +40,38 @@ class ImpalaBackendClient : public 
ImpalaInternalServiceClient {
     : ImpalaInternalServiceClient(iprot, oprot), transmit_csw_(NULL) {
   }
 
-  void TransmitData(TTransmitDataResult& _return, const TTransmitDataParams& 
params) {
+/// We intentionally disable this clang warning as we intend to hide the
+/// the same-named functions defined in the base class.
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Woverloaded-virtual"
+
+  void ExecQueryFInstances(TExecQueryFInstancesResult& _return,
+      const TExecQueryFInstancesParams& params, bool* send_done) {
+    DCHECK(!*send_done);
+    ImpalaInternalServiceClient::send_ExecQueryFInstances(params);
+    *send_done = true;
+    ImpalaInternalServiceClient::recv_ExecQueryFInstances(_return);
+  }
+
+  void ReportExecStatus(TReportExecStatusResult& _return,
+      const TReportExecStatusParams& params, bool* send_done) {
+    DCHECK(!*send_done);
+    ImpalaInternalServiceClient::send_ReportExecStatus(params);
+    *send_done = true;
+    ImpalaInternalServiceClient::recv_ReportExecStatus(_return);
+  }
+
+  void CancelQueryFInstances(TCancelQueryFInstancesResult& _return,
+      const TCancelQueryFInstancesParams& params, bool* send_done) {
+    DCHECK(!*send_done);
+    ImpalaInternalServiceClient::send_CancelQueryFInstances(params);
+    *send_done = true;
+    ImpalaInternalServiceClient::recv_CancelQueryFInstances(_return);
+  }
+
+  void TransmitData(TTransmitDataResult& _return, const TTransmitDataParams& 
params,
+      bool* send_done) {
+    DCHECK(!*send_done);
     FAULT_INJECTION_RPC_EXCEPTION(RPC_TRANSMITDATA, true /* is_send */);
     if (transmit_csw_ != NULL) {
       SCOPED_CONCURRENT_COUNTER(transmit_csw_);
@@ -48,6 +79,7 @@ class ImpalaBackendClient : public 
ImpalaInternalServiceClient {
     } else {
       ImpalaInternalServiceClient::send_TransmitData(params);
     }
+    *send_done = true;
     FAULT_INJECTION_RPC_EXCEPTION(RPC_TRANSMITDATA, false /* is_send */);
     ImpalaInternalServiceClient::recv_TransmitData(_return);
   }
@@ -65,6 +97,24 @@ class ImpalaBackendClient : public 
ImpalaInternalServiceClient {
     transmit_csw_ = NULL;
   }
 
+  void UpdateFilter(TUpdateFilterResult& _return, const TUpdateFilterParams& 
params,
+      bool* send_done) {
+    DCHECK(!*send_done);
+    ImpalaInternalServiceClient::send_UpdateFilter(params);
+    *send_done = true;
+    ImpalaInternalServiceClient::recv_UpdateFilter(_return);
+  }
+
+  void PublishFilter(TPublishFilterResult& _return, const 
TPublishFilterParams& params,
+      bool* send_done) {
+    DCHECK(!*send_done);
+    ImpalaInternalServiceClient::send_PublishFilter(params);
+    *send_done = true;
+    ImpalaInternalServiceClient::recv_PublishFilter(_return);
+  }
+
+#pragma clang diagnostic pop
+
  private:
   RuntimeProfile::ConcurrentTimerCounter* transmit_csw_;
 };

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/23565166/be/src/runtime/client-cache-types.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/client-cache-types.h 
b/be/src/runtime/client-cache-types.h
index 6a18c25..df9a0e7 100644
--- a/be/src/runtime/client-cache-types.h
+++ b/be/src/runtime/client-cache-types.h
@@ -33,9 +33,9 @@ class ImpalaInternalServiceClient;
 typedef ClientCache<ImpalaInternalServiceClient> 
ImpalaInternalServiceClientCache;
 typedef ClientConnection<ImpalaInternalServiceClient> 
ImpalaInternalServiceConnection;
 
-class CatalogServiceClient;
-typedef ClientCache<CatalogServiceClient> CatalogServiceClientCache;
-typedef ClientConnection<CatalogServiceClient> CatalogServiceConnection;
+class CatalogServiceClientWrapper;
+typedef ClientCache<CatalogServiceClientWrapper> CatalogServiceClientCache;
+typedef ClientConnection<CatalogServiceClientWrapper> CatalogServiceConnection;
 
 class ImpalaBackendClient;
 typedef ClientCache<ImpalaBackendClient> ImpalaBackendClientCache;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/23565166/be/src/runtime/client-cache.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/client-cache.h b/be/src/runtime/client-cache.h
index c9b557d..09f1446 100644
--- a/be/src/runtime/client-cache.h
+++ b/be/src/runtime/client-cache.h
@@ -26,6 +26,7 @@
 #include <boost/bind.hpp>
 #include <gutil/strings/substitute.h>
 
+#include "catalog/catalog-service-client-wrapper.h"
 #include "runtime/client-cache-types.h"
 #include "util/metrics.h"
 #include "rpc/thrift-client.h"
@@ -237,20 +238,23 @@ class ClientConnection {
   /// closed cnxn).
   /// Application-level failures should be signalled through the response type.
   ///
+  /// TODO: Consider replacing 'retry_is_safe' with a bool which callers pass 
to
+  /// indicate intention to retry recv part of the RPC if it times out.
   template <class F, class Request, class Response>
   Status DoRpc(const F& f, const Request& request, Response* response,
       bool* retry_is_safe = NULL) {
     DCHECK(response != NULL);
     client_is_unrecoverable_ = true;
     if (retry_is_safe != nullptr) *retry_is_safe = false;
+    bool send_done = false;
     try {
-      (client_->*f)(*response, request);
+      (client_->*f)(*response, request, &send_done);
     } catch (const apache::thrift::transport::TTransportException& e) {
-      if (IsRecvTimeoutTException(e)) {
+      if (send_done && IsRecvTimeoutTException(e)) {
         return Status(TErrorCode::RPC_RECV_TIMEOUT, strings::Substitute(
             "Client $0 timed-out during recv call.", 
TNetworkAddressToString(address_)));
       }
-      if (IsSendFailTException(e)) {
+      if (!send_done && IsSendFailTException(e)) {
         return RetryRpc(f, request, response, retry_is_safe);
       }
       return Status(TErrorCode::RPC_GENERAL_ERROR, ExceptionMsg(e));
@@ -315,13 +319,15 @@ class ClientConnection {
       return Status(TErrorCode::RPC_CLIENT_CONNECT_FAILURE, 
status.GetDetail());
     }
     try {
-      (client_->*f)(*response, request);
+      bool send_done = false;
+      (client_->*f)(*response, request, &send_done);
     } catch (const apache::thrift::TException& e) {
       // By this point the RPC really has failed.
       // TODO: Revisit this logic later. It's possible that the new connection
       // works but we hit timeout here.
       return Status(TErrorCode::RPC_GENERAL_ERROR, ExceptionMsg(e));
     }
+    client_is_unrecoverable_ = false;
     return Status::OK();
   }
 };

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/23565166/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc 
b/be/src/service/client-request-state.cc
index 4fe7431..c454ebe 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -931,8 +931,8 @@ Status ClientRequestState::UpdateCatalog() {
 
       VLOG_QUERY << "Executing FinalizeDml() using CatalogService";
       TUpdateCatalogResponse resp;
-      RETURN_IF_ERROR(
-          client.DoRpc(&CatalogServiceClient::UpdateCatalog, catalog_update, 
&resp));
+      RETURN_IF_ERROR(client.DoRpc(&CatalogServiceClientWrapper::UpdateCatalog,
+          catalog_update, &resp));
 
       Status status(resp.result.status);
       if (!status.ok()) LOG(ERROR) << "ERROR Finalizing DML: " << 
status.GetDetail();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/23565166/be/src/statestore/statestore-service-client-wrapper.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore-service-client-wrapper.h 
b/be/src/statestore/statestore-service-client-wrapper.h
new file mode 100644
index 0000000..79068a2
--- /dev/null
+++ b/be/src/statestore/statestore-service-client-wrapper.h
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef STATESTORE_STATESTORE_SERVICE_CLIENT_WRAPPER_H
+#define STATESTORE_STATESTORE_SERVICE_CLIENT_WRAPPER_H
+
+#include "gen-cpp/StatestoreService.h"
+
+namespace impala {
+
+class StatestoreServiceClientWrapper : public StatestoreServiceClient {
+ public:
+  StatestoreServiceClientWrapper(
+      boost::shared_ptr<::apache::thrift::protocol::TProtocol> prot)
+    : StatestoreServiceClient(prot) {
+  }
+
+  StatestoreServiceClientWrapper(
+      boost::shared_ptr<::apache::thrift::protocol::TProtocol> iprot,
+      boost::shared_ptr<::apache::thrift::protocol::TProtocol> oprot)
+    : StatestoreServiceClient(iprot, oprot) {
+  }
+
+/// We intentionally disable this clang warning as we intend to hide the
+/// the same-named functions defined in the base class.
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Woverloaded-virtual"
+
+  void RegisterSubscriber(TRegisterSubscriberResponse& _return,
+      const TRegisterSubscriberRequest& params, bool* send_done) {
+    DCHECK(!*send_done);
+    send_RegisterSubscriber(params);
+    *send_done = true;
+    recv_RegisterSubscriber(_return);
+  }
+
+#pragma clang diagnostic pop
+};
+
+}
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/23565166/be/src/statestore/statestore-subscriber-client-wrapper.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore-subscriber-client-wrapper.h 
b/be/src/statestore/statestore-subscriber-client-wrapper.h
new file mode 100644
index 0000000..2d40031
--- /dev/null
+++ b/be/src/statestore/statestore-subscriber-client-wrapper.h
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef STATESTORE_STATESTORE_SUBSCRIBER_CLIENT_WRAPPER_H
+#define STATESTORE_STATESTORE_SUBSCRIBER_CLIENT_WRAPPER_H
+
+#include "gen-cpp/StatestoreSubscriber.h"
+
+namespace impala {
+
+class StatestoreSubscriberClientWrapper : public StatestoreSubscriberClient {
+  public:
+   StatestoreSubscriberClientWrapper(
+       boost::shared_ptr<::apache::thrift::protocol::TProtocol> prot)
+     : StatestoreSubscriberClient(prot) {
+   }
+
+   StatestoreSubscriberClientWrapper(
+       boost::shared_ptr<::apache::thrift::protocol::TProtocol> iprot,
+       boost::shared_ptr<::apache::thrift::protocol::TProtocol> oprot)
+     : StatestoreSubscriberClient(iprot, oprot) {
+   }
+
+/// We intentionally disable this clang warning as we intend to hide the
+/// the same-named functions defined in the base class.
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Woverloaded-virtual"
+
+   void Heartbeat(THeartbeatResponse& _return, const THeartbeatRequest& params,
+       bool* send_done) {
+     DCHECK(!*send_done);
+     send_Heartbeat(params);
+     *send_done = true;
+     recv_Heartbeat(_return);
+   }
+
+   void UpdateState(TUpdateStateResponse& _return, const TUpdateStateRequest& 
params,
+       bool* send_done) {
+     DCHECK(!*send_done);
+     send_UpdateState(params);
+     *send_done = true;
+     recv_UpdateState(_return);
+   }
+
+#pragma clang diagnostic pop
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/23565166/be/src/statestore/statestore-subscriber.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore-subscriber.cc 
b/be/src/statestore/statestore-subscriber.cc
index 08c11ac..12efdcd 100644
--- a/be/src/statestore/statestore-subscriber.cc
+++ b/be/src/statestore/statestore-subscriber.cc
@@ -31,6 +31,7 @@
 #include "gen-cpp/StatestoreService_types.h"
 #include "rpc/rpc-trace.h"
 #include "rpc/thrift-util.h"
+#include "statestore/statestore-service-client-wrapper.h"
 #include "util/time.h"
 #include "util/debug-util.h"
 
@@ -64,7 +65,7 @@ const string CALLBACK_METRIC_PATTERN = 
"statestore-subscriber.topic-$0.processin
 // statestore after a failure.
 const int32_t SLEEP_INTERVAL_MS = 5000;
 
-typedef ClientConnection<StatestoreServiceClient> StatestoreConnection;
+typedef ClientConnection<StatestoreServiceClientWrapper> StatestoreServiceConn;
 
 // Proxy class for the subscriber heartbeat thrift API, which
 // translates RPCs into method calls on the local subscriber object.
@@ -142,7 +143,7 @@ Status StatestoreSubscriber::AddTopic(const 
Statestore::TopicId& topic_id,
 
 Status StatestoreSubscriber::Register() {
   Status client_status;
-  StatestoreConnection client(client_cache_.get(), statestore_address_, 
&client_status);
+  StatestoreServiceConn client(client_cache_.get(), statestore_address_, 
&client_status);
   RETURN_IF_ERROR(client_status);
 
   TRegisterSubscriberRequest request;
@@ -157,8 +158,8 @@ Status StatestoreSubscriber::Register() {
   request.subscriber_location = heartbeat_address_;
   request.subscriber_id = subscriber_id_;
   TRegisterSubscriberResponse response;
-  RETURN_IF_ERROR(
-      client.DoRpc(&StatestoreServiceClient::RegisterSubscriber, request, 
&response));
+  
RETURN_IF_ERROR(client.DoRpc(&StatestoreServiceClientWrapper::RegisterSubscriber,
+      request, &response));
   Status status = Status(response.status);
   if (status.ok()) connected_to_statestore_metric_->set_value(true);
   if (response.__isset.registration_id) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/23565166/be/src/statestore/statestore-subscriber.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore-subscriber.h 
b/be/src/statestore/statestore-subscriber.h
index 1a24a76..65dcac9 100644
--- a/be/src/statestore/statestore-subscriber.h
+++ b/be/src/statestore/statestore-subscriber.h
@@ -29,6 +29,7 @@
 #include "util/stopwatch.h"
 #include "rpc/thrift-util.h"
 #include "rpc/thrift-client.h"
+#include "statestore/statestore-service-client-wrapper.h"
 #include "util/metrics.h"
 #include "gen-cpp/StatestoreService.h"
 #include "gen-cpp/StatestoreSubscriber.h"
@@ -41,7 +42,7 @@ class Thread;
 class ThriftServer;
 class TNetworkAddress;
 
-typedef ClientCache<StatestoreServiceClient> StatestoreClientCache;
+typedef ClientCache<StatestoreServiceClientWrapper> StatestoreClientCache;
 
 /// A StatestoreSubscriber communicates with a statestore periodically through 
the exchange
 /// of topic update messages. These messages contain updates from the 
statestore to a list

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/23565166/be/src/statestore/statestore.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index a19de2e..bd6361f 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -24,8 +24,9 @@
 
 #include "common/status.h"
 #include "gen-cpp/StatestoreService_types.h"
-#include "statestore/failure-detector.h"
 #include "rpc/thrift-util.h"
+#include "statestore/failure-detector.h"
+#include "statestore/statestore-subscriber-client-wrapper.h"
 #include "util/debug-util.h"
 #include "util/logging-support.h"
 #include "util/time.h"
@@ -100,7 +101,7 @@ const int32_t STATESTORE_MAX_SUBSCRIBERS = 10000;
 // Updates or heartbeats that miss their deadline by this much are logged.
 const uint32_t DEADLINE_MISS_THRESHOLD_MS = 2000;
 
-typedef ClientConnection<StatestoreSubscriberClient> 
StatestoreSubscriberConnection;
+typedef ClientConnection<StatestoreSubscriberClientWrapper> 
StatestoreSubscriberConn;
 
 class StatestoreThriftIf : public StatestoreServiceIf {
  public:
@@ -224,11 +225,11 @@ Statestore::Statestore(MetricGroup* metrics)
         FLAGS_statestore_num_heartbeat_threads,
         STATESTORE_MAX_SUBSCRIBERS,
         bind<void>(mem_fn(&Statestore::DoSubscriberUpdate), this, true, _1, 
_2)),
-    update_state_client_cache_(new ClientCache<StatestoreSubscriberClient>(1, 
0,
+    update_state_client_cache_(new StatestoreSubscriberClientCache(1, 0,
         FLAGS_statestore_update_tcp_timeout_seconds * 1000,
         FLAGS_statestore_update_tcp_timeout_seconds * 1000, "",
         EnableInternalSslConnections())),
-    heartbeat_client_cache_(new ClientCache<StatestoreSubscriberClient>(1, 0,
+    heartbeat_client_cache_(new StatestoreSubscriberClientCache(1, 0,
         FLAGS_statestore_heartbeat_tcp_timeout_seconds * 1000,
         FLAGS_statestore_heartbeat_tcp_timeout_seconds * 1000, "",
         EnableInternalSslConnections())),
@@ -422,13 +423,13 @@ Status Statestore::SendTopicUpdate(Subscriber* 
subscriber, bool* update_skipped)
 
   // Second: try and send it
   Status status;
-  StatestoreSubscriberConnection client(update_state_client_cache_.get(),
+  StatestoreSubscriberConn client(update_state_client_cache_.get(),
       subscriber->network_address(), &status);
   RETURN_IF_ERROR(status);
 
   TUpdateStateResponse response;
   RETURN_IF_ERROR(client.DoRpc(
-      &StatestoreSubscriberClient::UpdateState, update_state_request, 
&response));
+      &StatestoreSubscriberClientWrapper::UpdateState, update_state_request, 
&response));
 
   status = Status(response.status);
   if (!status.ok()) {
@@ -602,7 +603,7 @@ Status Statestore::SendHeartbeat(Subscriber* subscriber) {
   sw.Start();
 
   Status status;
-  StatestoreSubscriberConnection client(heartbeat_client_cache_.get(),
+  StatestoreSubscriberConn client(heartbeat_client_cache_.get(),
       subscriber->network_address(), &status);
   RETURN_IF_ERROR(status);
 
@@ -610,7 +611,7 @@ Status Statestore::SendHeartbeat(Subscriber* subscriber) {
   THeartbeatResponse response;
   request.__set_registration_id(subscriber->registration_id());
   RETURN_IF_ERROR(
-      client.DoRpc(&StatestoreSubscriberClient::Heartbeat, request, 
&response));
+      client.DoRpc(&StatestoreSubscriberClientWrapper::Heartbeat, request, 
&response));
 
   heartbeat_duration_metric_->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 
1000.0));
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/23565166/be/src/statestore/statestore.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index 91adb8d..44d9792 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -35,6 +35,7 @@
 #include "runtime/client-cache.h"
 #include "runtime/timestamp-value.h"
 #include "statestore/failure-detector.h"
+#include "statestore/statestore-subscriber-client-wrapper.h"
 #include "util/aligned-new.h"
 #include "util/collection-metrics.h"
 #include "util/metrics.h"
@@ -45,6 +46,8 @@ namespace impala {
 
 class Status;
 
+typedef ClientCache<StatestoreSubscriberClientWrapper> 
StatestoreSubscriberClientCache;
+
 /// The Statestore is a soft-state key-value store that maintains a set of 
Topics, which
 /// are maps from string keys to byte array values.
 //
@@ -400,13 +403,13 @@ class Statestore : public CacheLineAligned {
 
   /// Cache of subscriber clients used for UpdateState() RPCs. Only one client 
per
   /// subscriber should be used, but the cache helps with the client lifecycle 
on failure.
-  boost::scoped_ptr<ClientCache<StatestoreSubscriberClient>> 
update_state_client_cache_;
+  boost::scoped_ptr<StatestoreSubscriberClientCache> 
update_state_client_cache_;
 
   /// Cache of subscriber clients used for Heartbeat() RPCs. Separate from
   /// update_state_client_cache_ because we enable TCP-level timeouts for 
these calls,
   /// whereas they are not safe for UpdateState() RPCs which can take an 
unbounded amount
   /// of time.
-  boost::scoped_ptr<ClientCache<StatestoreSubscriberClient>> 
heartbeat_client_cache_;
+  boost::scoped_ptr<StatestoreSubscriberClientCache> heartbeat_client_cache_;
 
   /// Thrift API implementation which proxies requests onto this Statestore
   boost::shared_ptr<StatestoreServiceIf> thrift_iface_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/23565166/be/src/testutil/fault-injection-util.cc
----------------------------------------------------------------------
diff --git a/be/src/testutil/fault-injection-util.cc 
b/be/src/testutil/fault-injection-util.cc
index cf418da..3a77d08 100644
--- a/be/src/testutil/fault-injection-util.cc
+++ b/be/src/testutil/fault-injection-util.cc
@@ -62,29 +62,35 @@ void FaultInjectionUtil::InjectRpcException(RpcCallType 
my_type, bool is_send) {
   DCHECK_EQ(target_rpc_type, RPC_TRANSMITDATA);
 
   if (is_send) {
-    int32_t count = send_count.Add(1);
-    if (count % 1024 == 0) {
-      if (xcp_type == RPC_EXCEPTION_LOST_CONNECTION_SEND) {
-        // Test both exception types which are considered recoverable
-        // by IsSendFailTException().
-        if ((count / 1024) % 2 == 0) {
+    if (send_count.Add(1) % 1024 == 0) {
+      switch (xcp_type) {
+        case RPC_EXCEPTION_SEND_LOST_CONNECTION:
           throw TTransportException(TTransportException::NOT_OPEN,
               "Called write on non-open socket");
-        } else {
+        case RPC_EXCEPTION_SEND_TIMEDOUT:
           throw TTransportException(TTransportException::TIMED_OUT,
               "send timeout expired");
-        }
-      } else if (xcp_type == RPC_EXCEPTION_SSL_ERROR_SEND) {
-        throw TSSLException("SSL_write: SSL resource temporarily unavailable");
+        case RPC_EXCEPTION_SSL_SEND_LOST_CONNECTION:
+          throw TTransportException(TTransportException::NOT_OPEN);
+        case RPC_EXCEPTION_SSL_SEND_TIMEDOUT:
+          throw TSSLException("SSL_write: Resource temporarily unavailable");
+        // fall through for the default case.
       }
     }
   } else {
     if (recv_count.Add(1) % 1024 == 0) {
-      if (xcp_type == RPC_EXCEPTION_LOST_CONNECTION_RECV) {
-        throw TTransportException(TTransportException::NOT_OPEN,
-            "Called read on non-open socket");
-      } else if (xcp_type == RPC_EXCEPTION_SSL_ERROR_RECV) {
-        throw TSSLException("SSL_read: SSL resource temporarily unavailable");
+      switch (xcp_type) {
+        case RPC_EXCEPTION_RECV_LOST_CONNECTION:
+          throw TTransportException(TTransportException::NOT_OPEN,
+              "Called read on non-open socket");
+        case RPC_EXCEPTION_RECV_TIMEDOUT:
+          throw TTransportException(TTransportException::TIMED_OUT,
+              "EAGAIN (timed out)");
+        case RPC_EXCEPTION_SSL_RECV_LOST_CONNECTION:
+          throw TTransportException(TTransportException::NOT_OPEN);
+        case RPC_EXCEPTION_SSL_RECV_TIMEDOUT:
+          throw TSSLException("SSL_read: Resource temporarily unavailable");
+        // fall through for the default case.
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/23565166/be/src/testutil/fault-injection-util.h
----------------------------------------------------------------------
diff --git a/be/src/testutil/fault-injection-util.h 
b/be/src/testutil/fault-injection-util.h
index 765c246..70a2375 100644
--- a/be/src/testutil/fault-injection-util.h
+++ b/be/src/testutil/fault-injection-util.h
@@ -39,10 +39,14 @@ class FaultInjectionUtil {
 
   enum RpcExceptionType {
     RPC_EXCEPTION_NONE = 0,
-    RPC_EXCEPTION_LOST_CONNECTION_SEND,
-    RPC_EXCEPTION_LOST_CONNECTION_RECV,
-    RPC_EXCEPTION_SSL_ERROR_SEND,
-    RPC_EXCEPTION_SSL_ERROR_RECV
+    RPC_EXCEPTION_SEND_LOST_CONNECTION,
+    RPC_EXCEPTION_SEND_TIMEDOUT,
+    RPC_EXCEPTION_RECV_LOST_CONNECTION,
+    RPC_EXCEPTION_RECV_TIMEDOUT,
+    RPC_EXCEPTION_SSL_SEND_LOST_CONNECTION,
+    RPC_EXCEPTION_SSL_SEND_TIMEDOUT,
+    RPC_EXCEPTION_SSL_RECV_LOST_CONNECTION,
+    RPC_EXCEPTION_SSL_RECV_TIMEDOUT,
   };
 
   /// Test util function that injects delays to specified RPC server handling 
function

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/23565166/tests/custom_cluster/test_rpc_exception.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_rpc_exception.py 
b/tests/custom_cluster/test_rpc_exception.py
index 6faea87..f7813e2 100644
--- a/tests/custom_cluster/test_rpc_exception.py
+++ b/tests/custom_cluster/test_rpc_exception.py
@@ -43,7 +43,7 @@ class TestRPCException(CustomClusterTestSuite):
   # Execute TEST_QUERY. If 'exception_string' is None, it's expected to 
complete
   # sucessfully with result matching EXPECTED_RESULT. Otherwise, it's expected
   # to fail with 'exception_string'.
-  def execute_query(self, exception_string):
+  def execute_test_query(self, exception_string):
     try:
       result = self.client.execute(self.TEST_QUERY)
       assert result.data == self.EXPECTED_RESULT
@@ -54,24 +54,47 @@ class TestRPCException(CustomClusterTestSuite):
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=1"
       " --fault_injection_rpc_type=5")
-  def test_transmitdata_send_fail(self, vector):
-    self.execute_query(None)
+  def test_transmitdata_send_lost_connection(self, vector):
+    self.execute_test_query(None)
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=2"
       " --fault_injection_rpc_type=5")
-  def test_transmitdata_recv_fail(self, vector):
-    self.execute_query("Called read on non-open socket")
+  def test_transmitdata_send_timed_out(self, vector):
+    self.execute_test_query(None)
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=3"
       " --fault_injection_rpc_type=5")
-  def test_transmitdata_secure_send_error(self, vector):
-    self.execute_query("SSL_write: SSL resource temporarily unavailable")
+  def test_transmitdata_recv_lost_connection(self, vector):
+    self.execute_test_query("Called read on non-open socket")
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=4"
       " --fault_injection_rpc_type=5")
-  def test_transmitdata_secure_recv_error(self, vector):
-    self.execute_query("SSL_read: SSL resource temporarily unavailable")
+  def test_transmitdata_recv_timed_out(self, vector):
+    self.execute_test_query(None)
 
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=5"
+      " --fault_injection_rpc_type=5")
+  def test_transmitdata_secure_send_lost_connection(self, vector):
+    self.execute_test_query(None);
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=6"
+      " --fault_injection_rpc_type=5")
+  def test_transmitdata_secure_send_timed_out(self, vector):
+    self.execute_test_query("SSL_write: Resource temporarily unavailable")
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=7"
+      " --fault_injection_rpc_type=5")
+  def test_transmitdata_secure_recv_lost_connection(self, vector):
+    self.execute_test_query("TTransportException: Transport not open")
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=8"
+      " --fault_injection_rpc_type=5")
+  def test_transmitdata_secure_recv_timed_out(self, vector):
+    self.execute_test_query(None)


Reply via email to