IMPALA-5388: Only retry RPC on lost connection in send call

Previously, DoRpc() blacklists only a couple of conditions
which shouldn't retry the RPC on exception. This is fragile
as the errors could have happened after the payload has been
successfully sent to the destination. Such aggressive retry
behavior can lead to duplicated row batches being sent, causing
wrong results in queries.

This change fixes the problem by whitelisting the conditions
in which the RPC can be retried. Specifically, it pattern-matches
against certain errors in TSocket::write_partial() in the thrift
library and only retries the RPC in those cases. With SSL enabled,
we will never retry. We should investigate whether there are some
cases in which it's safe to retry.

This change also adds fault injection in the TransmitData() RPC
caller's path to emulate different exception cases.

Change-Id: I176975f2aa521d5be8a40de51067b1497923d09b
Reviewed-on: http://gerrit.cloudera.org:8080/7063
Reviewed-by: Michael Ho <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/7db2d306
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/7db2d306
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/7db2d306

Branch: refs/heads/master
Commit: 7db2d3064620b984f7dd9f8cd747dc45a4553a9c
Parents: 4882910
Author: Michael Ho <[email protected]>
Authored: Thu Jun 1 12:06:32 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Thu Jun 8 10:09:18 2017 +0000

----------------------------------------------------------------------
 be/src/common/global-flags.cc              |  3 +
 be/src/rpc/thrift-util.cc                  | 35 ++++++++-
 be/src/rpc/thrift-util.h                   |  9 ++-
 be/src/runtime/backend-client.h            |  3 +
 be/src/runtime/client-cache.h              | 70 ++++++++++--------
 be/src/testutil/CMakeLists.txt             |  1 +
 be/src/testutil/fault-injection-util.cc    | 95 +++++++++++++++++++++++++
 be/src/testutil/fault-injection-util.h     | 63 ++++++++++------
 tests/custom_cluster/test_rpc_exception.py | 77 ++++++++++++++++++++
 9 files changed, 301 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7db2d306/be/src/common/global-flags.cc
----------------------------------------------------------------------
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index dd170b7..1a36fad 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -106,6 +106,9 @@ DEFINE_int32(fault_injection_rpc_delay_ms, 0, "A fault 
injection option that cau
     "Effective in debug builds only.");
 DEFINE_int32(fault_injection_rpc_type, 0, "A fault injection option that 
specifies "
     "which rpc call will be injected with the delay. Effective in debug builds 
only.");
+DEFINE_int32(fault_injection_rpc_exception_type, 0, "A fault injection option 
that "
+    "specifies the exception to be thrown in the caller side of an RPC call. 
Effective "
+    "in debug builds only");
 DEFINE_int32(stress_scratch_write_delay_ms, 0, "A stress option which causes 
writes to "
     "scratch files to be to be delayed to simulate slow writes.");
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7db2d306/be/src/rpc/thrift-util.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-util.cc b/be/src/rpc/thrift-util.cc
index d26bc39..7ab5cd8 100644
--- a/be/src/rpc/thrift-util.cc
+++ b/be/src/rpc/thrift-util.cc
@@ -18,6 +18,7 @@
 #include "rpc/thrift-util.h"
 
 #include <boost/thread.hpp>
+#include <thrift/config.h>
 
 #include "util/hash-util.h"
 #include "util/time.h"
@@ -39,6 +40,7 @@
 // TODO: get thrift to fix this.
 #pragma clang diagnostic push
 #pragma clang diagnostic ignored "-Wstring-plus-int"
+#include <gutil/strings/substitute.h>
 #include <thrift/Thrift.h>
 #include <thrift/transport/TSocket.h>
 #include <thrift/transport/TServerSocket.h>
@@ -55,6 +57,16 @@ using namespace apache::thrift::server;
 using namespace apache::thrift::protocol;
 using namespace apache::thrift::concurrency;
 
+// IsRecvTimeoutTException() and IsSendFailTException() make assumption about 
the
+// implementation of read(), write() and write_partial() in TSocket.cpp and 
those
+// functions may change between different versions of Thrift.
+static_assert(PACKAGE_VERSION[0] == '0', "");
+static_assert(PACKAGE_VERSION[1] == '.', "");
+static_assert(PACKAGE_VERSION[2] == '9', "");
+static_assert(PACKAGE_VERSION[3] == '.', "");
+static_assert(PACKAGE_VERSION[4] == '0', "");
+static_assert(PACKAGE_VERSION[5] == '\0', "");
+
 // Thrift defines operator< but does not implement it. This is a stub
 // implementation so we can link.
 bool Apache::Hadoop::Hive::Partition::operator<(
@@ -174,9 +186,26 @@ bool TNetworkAddressComparator(const TNetworkAddress& a, 
const TNetworkAddress&
   return false;
 }
 
-bool IsRecvTimeoutTException(const TException& e) {
-  // String taken from Thrift's TSocket.cpp, this only happens in 
TSocket::read()
-  return strstr(e.what(), "EAGAIN (timed out)") != NULL;
+bool IsRecvTimeoutTException(const TTransportException& e) {
+  // String taken from TSocket::read() Thrift's TSocket.cpp.
+  return e.getType() == TTransportException::TIMED_OUT &&
+      strstr(e.what(), "EAGAIN (timed out)") != nullptr;
+}
+
+// This function implements some heuristics to match against exception details
+// thrown by write_partial() in thrift library. This is not very robust as it's
+// possible that the receiver of the RPC call may have received all the RPC 
payload
+// but the ACK to the sender may have been dropped somehow. In which case, it's
+// not safe to retry the RPC if it's not idempotent.
+// TODO: end-to-end tracking of RPC calls to detect duplicated calls in the 
receiver side
+bool IsSendFailTException(const TTransportException& e) {
+  // String taken from TSocket::write_partial() in Thrift's TSocket.cpp
+  return (e.getType() == TTransportException::TIMED_OUT &&
+             strstr(e.what(), "send timeout expired") != nullptr) ||
+         (e.getType() == TTransportException::NOT_OPEN &&
+             (strstr(e.what(), "write() send()") != nullptr ||
+              strstr(e.what(), "Called write on non-open socket") != nullptr ||
+              strstr(e.what(), "Socket send returned 0.") != nullptr));
 }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7db2d306/be/src/rpc/thrift-util.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-util.h b/be/src/rpc/thrift-util.h
index 51fecd6..0cca51f 100644
--- a/be/src/rpc/thrift-util.h
+++ b/be/src/rpc/thrift-util.h
@@ -26,6 +26,7 @@
 #include <thrift/TApplicationException.h>
 #include <thrift/protocol/TDebugProtocol.h>
 #include <thrift/transport/TBufferTransports.h>
+#include <thrift/transport/TTransportException.h>
 
 #include "common/status.h"
 
@@ -154,8 +155,12 @@ std::ostream& operator<<(std::ostream& out, const 
TColumnValue& colval);
 /// string representation
 bool TNetworkAddressComparator(const TNetworkAddress& a, const 
TNetworkAddress& b);
 
-/// Returns true if the TException corresponds to a TCP socket recv timeout.
-bool IsRecvTimeoutTException(const apache::thrift::TException& e);
+/// Returns true if the TTransportException corresponds to a TCP socket recv 
timeout.
+bool IsRecvTimeoutTException(const 
apache::thrift::transport::TTransportException& e);
+
+/// Returns true if the TTransportException corresponds to a send failure due 
to
+/// lost network connection or timeout.
+bool IsSendFailTException(const 
apache::thrift::transport::TTransportException& e);
 
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7db2d306/be/src/runtime/backend-client.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/backend-client.h b/be/src/runtime/backend-client.h
index c8c26ab..fe9c9bc 100644
--- a/be/src/runtime/backend-client.h
+++ b/be/src/runtime/backend-client.h
@@ -19,6 +19,7 @@
 #define IMPALA_BACKEND_CLIENT_H
 
 #include "runtime/client-cache.h"
+#include "testutil/fault-injection-util.h"
 #include "util/runtime-profile-counters.h"
 
 #include "gen-cpp/ImpalaInternalService.h"
@@ -40,12 +41,14 @@ class ImpalaBackendClient : public 
ImpalaInternalServiceClient {
   }
 
   void TransmitData(TTransmitDataResult& _return, const TTransmitDataParams& 
params) {
+    FAULT_INJECTION_RPC_EXCEPTION(RPC_TRANSMITDATA, true /* is_send */);
     if (transmit_csw_ != NULL) {
       SCOPED_CONCURRENT_COUNTER(transmit_csw_);
       ImpalaInternalServiceClient::send_TransmitData(params);
     } else {
       ImpalaInternalServiceClient::send_TransmitData(params);
     }
+    FAULT_INJECTION_RPC_EXCEPTION(RPC_TRANSMITDATA, false /* is_send */);
     ImpalaInternalServiceClient::recv_TransmitData(_return);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7db2d306/be/src/runtime/client-cache.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/client-cache.h b/be/src/runtime/client-cache.h
index 8f97b67..c9b557d 100644
--- a/be/src/runtime/client-cache.h
+++ b/be/src/runtime/client-cache.h
@@ -224,7 +224,6 @@ class ClientConnection {
   /// TCP connection underpinning this client has been closed unexpectedly. 
Note that
   /// this can lead to f() being called twice, as this method may retry f() 
once,
   /// depending on the error received from the first attempt.
-  /// TODO: Detect already-closed cnxns and only retry in that case.
   ///
   /// retry_is_safe is an output parameter. In case of connection failure,
   /// '*retry_is_safe' is set to true because the send never occurred and it's
@@ -235,46 +234,28 @@ class ClientConnection {
   /// Returns RPC_RECV_TIMEOUT if a timeout occurred while waiting for a 
response,
   /// RPC_CLIENT_CONNECT_FAILURE if the client failed to connect, and 
RPC_GENERAL_ERROR
   /// if the RPC could not be completed for any other reason (except for an 
unexpectedly
-  /// closed cnxn, see TODO).
+  /// closed cnxn).
   /// Application-level failures should be signalled through the response type.
   ///
-  /// TODO: Use TTransportException::TTransportExceptionType to distinguish 
between
-  /// failure modes.
   template <class F, class Request, class Response>
   Status DoRpc(const F& f, const Request& request, Response* response,
       bool* retry_is_safe = NULL) {
     DCHECK(response != NULL);
     client_is_unrecoverable_ = true;
-    if (retry_is_safe != NULL) *retry_is_safe = false;
+    if (retry_is_safe != nullptr) *retry_is_safe = false;
     try {
       (client_->*f)(*response, request);
-    } catch (const apache::thrift::TApplicationException& e) {
-      // TApplicationException only happens in recv RPC call.
-      // which means send RPC call is done, should not retry.
-      return Status(TErrorCode::RPC_GENERAL_ERROR, e.what());
-    } catch (const apache::thrift::TException& e) {
+    } catch (const apache::thrift::transport::TTransportException& e) {
       if (IsRecvTimeoutTException(e)) {
         return Status(TErrorCode::RPC_RECV_TIMEOUT, strings::Substitute(
             "Client $0 timed-out during recv call.", 
TNetworkAddressToString(address_)));
       }
-      VLOG(1) << "client " << client_ << " unexpected exception: "
-              << e.what() << ", type=" << typeid(e).name();
-
-      // Client may have unexpectedly been closed, so re-open and retry.
-      // TODO: ThriftClient should return proper error codes.
-      const Status& status = Reopen();
-      if (!status.ok()) {
-        if (retry_is_safe != NULL) *retry_is_safe = true;
-        return Status(TErrorCode::RPC_CLIENT_CONNECT_FAILURE, 
status.GetDetail());
-      }
-      try {
-        (client_->*f)(*response, request);
-      } catch (apache::thrift::TException& e) {
-        // By this point the RPC really has failed.
-        // TODO: Revisit this logic later. It's possible that the new 
connection
-        // works but we hit timeout here.
-        return Status(TErrorCode::RPC_GENERAL_ERROR, e.what());
+      if (IsSendFailTException(e)) {
+        return RetryRpc(f, request, response, retry_is_safe);
       }
+      return Status(TErrorCode::RPC_GENERAL_ERROR, ExceptionMsg(e));
+    } catch (const apache::thrift::TException& e) {
+      return Status(TErrorCode::RPC_GENERAL_ERROR, ExceptionMsg(e));
     }
     client_is_unrecoverable_ = false;
     return Status::OK();
@@ -289,13 +270,15 @@ class ClientConnection {
     DCHECK(client_is_unrecoverable_);
     try {
       (client_->*recv_func)(*response);
-    } catch (const apache::thrift::TException& e) {
+    } catch (const apache::thrift::transport::TTransportException& e) {
       if (IsRecvTimeoutTException(e)) {
         return Status(TErrorCode::RPC_RECV_TIMEOUT, strings::Substitute(
             "Client $0 timed-out during recv call.", 
TNetworkAddressToString(address_)));
       }
       // If it's not timeout exception, then the connection is broken, stop 
retrying.
       return Status(TErrorCode::RPC_GENERAL_ERROR, e.what());
+    } catch (const apache::thrift::TException& e) {
+      return Status(TErrorCode::RPC_GENERAL_ERROR, e.what());
     }
     client_is_unrecoverable_ = false;
     return Status::OK();
@@ -310,6 +293,37 @@ class ClientConnection {
   /// fails for any reason, the connection could be left in a bad state and 
cannot be
   /// recovered.
   bool client_is_unrecoverable_;
+
+  std::string ExceptionMsg(const apache::thrift::TException& e) {
+    return strings::Substitute("Client for $0 hits an unexpected exception: 
$1, type: $2",
+        TNetworkAddressToString(address_), e.what(), typeid(e).name());
+  }
+
+  /// Retry the RPC if TCP connection underpinning this client has been closed
+  /// unexpectedly. Called only when IsSendFailTException() is true for the 
failure
+  /// returned in the first invocation of RPC call. Returns 
RPC_CLIENT_CONNECT_FAILURE
+  /// on connection failure or RPC_GENERAL_ERROR for all other RPC failures.
+  template <class F, class Request, class Response>
+  Status RetryRpc(const F& f, const Request& request, Response* response,
+      bool* retry_is_safe) {
+    DCHECK(client_is_unrecoverable_);
+    // Client may have unexpectedly been closed, so re-open and retry.
+    // TODO: ThriftClient should return proper error codes.
+    Status status = Reopen();
+    if (!status.ok()) {
+      if (retry_is_safe != nullptr) *retry_is_safe = true;
+      return Status(TErrorCode::RPC_CLIENT_CONNECT_FAILURE, 
status.GetDetail());
+    }
+    try {
+      (client_->*f)(*response, request);
+    } catch (const apache::thrift::TException& e) {
+      // By this point the RPC really has failed.
+      // TODO: Revisit this logic later. It's possible that the new connection
+      // works but we hit timeout here.
+      return Status(TErrorCode::RPC_GENERAL_ERROR, ExceptionMsg(e));
+    }
+    return Status::OK();
+  }
 };
 
 /// Generic cache of Thrift clients for a given service type.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7db2d306/be/src/testutil/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/testutil/CMakeLists.txt b/be/src/testutil/CMakeLists.txt
index 12fbe05..a375986 100644
--- a/be/src/testutil/CMakeLists.txt
+++ b/be/src/testutil/CMakeLists.txt
@@ -25,6 +25,7 @@ set(LIBRARY_OUTPUT_PATH 
"${BUILD_OUTPUT_ROOT_DIRECTORY}/testutil")
 set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/testutil")
 
 add_library(TestUtil
+  fault-injection-util.cc
   impalad-query-executor.cc
   in-process-servers.cc
   desc-tbl-builder.cc

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7db2d306/be/src/testutil/fault-injection-util.cc
----------------------------------------------------------------------
diff --git a/be/src/testutil/fault-injection-util.cc 
b/be/src/testutil/fault-injection-util.cc
new file mode 100644
index 0000000..cf418da
--- /dev/null
+++ b/be/src/testutil/fault-injection-util.cc
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef NDEBUG
+
+#include "testutil/fault-injection-util.h"
+
+#include <thrift/transport/TSSLSocket.h>
+#include <thrift/transport/TTransportException.h>
+
+#include "common/atomic.h"
+
+#include "common/names.h"
+
+DECLARE_int32(fault_injection_rpc_delay_ms);
+DECLARE_int32(fault_injection_rpc_type);
+DECLARE_int32(fault_injection_rpc_exception_type);
+
+namespace impala {
+
+using apache::thrift::transport::TTransportException;
+using apache::thrift::transport::TSSLException;
+
+int32_t FaultInjectionUtil::GetTargetRPCType() {
+  int32_t target_rpc_type = FLAGS_fault_injection_rpc_type;
+  if (target_rpc_type == RPC_RANDOM) target_rpc_type = rand() % RPC_RANDOM;
+  DCHECK_LT(target_rpc_type, RPC_RANDOM);
+  return target_rpc_type;
+}
+
+void FaultInjectionUtil::InjectRpcDelay(RpcCallType my_type) {
+  std::random_device rd;
+  srand(rd());
+  int32_t delay_ms = FLAGS_fault_injection_rpc_delay_ms;
+  if (delay_ms == 0) return;
+  int32_t target_rpc_type = GetTargetRPCType();
+  if (target_rpc_type == my_type) SleepForMs(delay_ms);
+}
+
+void FaultInjectionUtil::InjectRpcException(RpcCallType my_type, bool is_send) 
{
+  static AtomicInt32 send_count(-1);
+  static AtomicInt32 recv_count(-1);
+  int32_t xcp_type = FLAGS_fault_injection_rpc_exception_type;
+  if (xcp_type == RPC_EXCEPTION_NONE) return;
+
+  // We currently support injecting exception at TransmitData() RPC only.
+  int32_t target_rpc_type = GetTargetRPCType();
+  DCHECK_EQ(target_rpc_type, RPC_TRANSMITDATA);
+
+  if (is_send) {
+    int32_t count = send_count.Add(1);
+    if (count % 1024 == 0) {
+      if (xcp_type == RPC_EXCEPTION_LOST_CONNECTION_SEND) {
+        // Test both exception types which are considered recoverable
+        // by IsSendFailTException().
+        if ((count / 1024) % 2 == 0) {
+          throw TTransportException(TTransportException::NOT_OPEN,
+              "Called write on non-open socket");
+        } else {
+          throw TTransportException(TTransportException::TIMED_OUT,
+              "send timeout expired");
+        }
+      } else if (xcp_type == RPC_EXCEPTION_SSL_ERROR_SEND) {
+        throw TSSLException("SSL_write: SSL resource temporarily unavailable");
+      }
+    }
+  } else {
+    if (recv_count.Add(1) % 1024 == 0) {
+      if (xcp_type == RPC_EXCEPTION_LOST_CONNECTION_RECV) {
+        throw TTransportException(TTransportException::NOT_OPEN,
+            "Called read on non-open socket");
+      } else if (xcp_type == RPC_EXCEPTION_SSL_ERROR_RECV) {
+        throw TSSLException("SSL_read: SSL resource temporarily unavailable");
+      }
+    }
+  }
+}
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7db2d306/be/src/testutil/fault-injection-util.h
----------------------------------------------------------------------
diff --git a/be/src/testutil/fault-injection-util.h 
b/be/src/testutil/fault-injection-util.h
index 99816a4..765c246 100644
--- a/be/src/testutil/fault-injection-util.h
+++ b/be/src/testutil/fault-injection-util.h
@@ -20,14 +20,12 @@
 
 #include "util/time.h"
 
-#ifndef NDEBUG
-  DECLARE_int32(fault_injection_rpc_delay_ms);
-  DECLARE_int32(fault_injection_rpc_type);
-#endif
-
 namespace impala {
 
 #ifndef NDEBUG
+
+class FaultInjectionUtil {
+ public:
   enum RpcCallType {
     RPC_NULL = 0,
     RPC_EXECQUERYFINSTANCES,
@@ -39,23 +37,44 @@ namespace impala {
     RPC_RANDOM    // This must be last.
   };
 
-  /// Test util function that can inject delay to specified RPC server handling
-  /// function so that RPC caller could hit the RPC recv timeout condition.
-  /// my_type specifies which RPC type the current function is.
-  /// rpc_type specifies which RPC function the delay should be enabled.
-  /// delay_ms specifies how long the delay should be.
-  static void InjectRpcDelay(RpcCallType my_type, int32_t rpc_type, int32_t 
delay_ms) {
-    std::random_device rd;
-    srand(rd());
-    if (delay_ms == 0) return;
-    if (rpc_type == RPC_RANDOM) rpc_type = rand() % RPC_RANDOM;
-    if (rpc_type == my_type) SleepForMs(delay_ms);
-  }
-
-  #define FAULT_INJECTION_RPC_DELAY(type) InjectRpcDelay(type, \
-      FLAGS_fault_injection_rpc_type, FLAGS_fault_injection_rpc_delay_ms)
-#else
-  #define FAULT_INJECTION_RPC_DELAY(type)
+  enum RpcExceptionType {
+    RPC_EXCEPTION_NONE = 0,
+    RPC_EXCEPTION_LOST_CONNECTION_SEND,
+    RPC_EXCEPTION_LOST_CONNECTION_RECV,
+    RPC_EXCEPTION_SSL_ERROR_SEND,
+    RPC_EXCEPTION_SSL_ERROR_RECV
+  };
+
+  /// Test util function that injects delays to specified RPC server handling 
function
+  /// so that RPC caller could hit the RPC recv timeout condition.
+  /// 'my_type' specifies which RPC type of the current function.
+  /// FLAGS_fault_injection_rpc_type specifies which RPC function the delay 
should
+  /// be enabled. FLAGS_fault_injection_rpc_delay_ms specifies the delay in ms.
+  static void InjectRpcDelay(RpcCallType my_type);
+
+  /// Test util function that injects exceptions to RPC client functions.
+  /// 'my_type' specifies which RPC type of the current function. Currently, 
only
+  /// TransmitData() is supported.
+  /// 'is_send' indicates whether injected fault is at the send RPC call or 
recv RPC.
+  ///  It's true if for send RPC call and false for recv RPC call.
+  /// FLAGS_fault_injection_rpc_exception_type specifies the exception to be 
injected.
+  static void InjectRpcException(RpcCallType my_type, bool is_send);
+
+ private:
+  static int32_t GetTargetRPCType();
+
+};
+
+#define FAULT_INJECTION_RPC_DELAY(type)                         \
+    FaultInjectionUtil::InjectRpcDelay(FaultInjectionUtil::type)
+#define FAULT_INJECTION_RPC_EXCEPTION(type, is_send)            \
+    FaultInjectionUtil::InjectRpcException(FaultInjectionUtil::type, is_send)
+
+#else // NDEBUG
+
+#define FAULT_INJECTION_RPC_DELAY(type)
+#define FAULT_INJECTION_RPC_EXCEPTION(type, is_send)
+
 #endif
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7db2d306/tests/custom_cluster/test_rpc_exception.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_rpc_exception.py 
b/tests/custom_cluster/test_rpc_exception.py
new file mode 100644
index 0000000..6faea87
--- /dev/null
+++ b/tests/custom_cluster/test_rpc_exception.py
@@ -0,0 +1,77 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.skip import SkipIfBuildType
+
[email protected]_dev_build
+class TestRPCException(CustomClusterTestSuite):
+  """Tests Impala exception handling in TransmitData() RPC to make sure no
+     duplicated row batches are sent. """
+  # This query ends up calling TransmitData() more than 2048 times to ensure
+  # proper test coverage.
+  TEST_QUERY = "select count(*) from tpch_parquet.lineitem t1, 
tpch_parquet.lineitem t2 \
+      where t1.l_orderkey = t2.l_orderkey"
+  EXPECTED_RESULT = ['30012985']
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def setup_class(cls):
+    if cls.exploration_strategy() != 'exhaustive':
+      pytest.skip('runs only in exhaustive')
+    super(TestRPCException, cls).setup_class()
+
+  # Execute TEST_QUERY. If 'exception_string' is None, it's expected to 
complete
+  # sucessfully with result matching EXPECTED_RESULT. Otherwise, it's expected
+  # to fail with 'exception_string'.
+  def execute_query(self, exception_string):
+    try:
+      result = self.client.execute(self.TEST_QUERY)
+      assert result.data == self.EXPECTED_RESULT
+      assert not exception_string
+    except ImpalaBeeswaxException as e:
+      assert exception_string in str(e)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=1"
+      " --fault_injection_rpc_type=5")
+  def test_transmitdata_send_fail(self, vector):
+    self.execute_query(None)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=2"
+      " --fault_injection_rpc_type=5")
+  def test_transmitdata_recv_fail(self, vector):
+    self.execute_query("Called read on non-open socket")
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=3"
+      " --fault_injection_rpc_type=5")
+  def test_transmitdata_secure_send_error(self, vector):
+    self.execute_query("SSL_write: SSL resource temporarily unavailable")
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=4"
+      " --fault_injection_rpc_type=5")
+  def test_transmitdata_secure_recv_error(self, vector):
+    self.execute_query("SSL_read: SSL resource temporarily unavailable")
+

Reply via email to