Repository: impala
Updated Branches:
  refs/heads/master 0d7787fe4 -> 8079cd9d2


KUDU-2334: Fix OutboundTransfer::TransferStarted() to work with SSL_write()

Previously, OutboundTransfer::TransferStarted() returns true iff
non-zero bytes have been successfully sent via Writev(). As it turns
out, this doesn't work well with SSL_write(). When SSL_write() returns -1
with errno EAGAIN or ETRYAGAIN, we need to retry the call with exactly
the same buffer pointer next time even if 0 bytes have been written.

The following sequence becomes problematic with the previous implementation
of OutboundTransfer::TransferStarted():

- WriteHandler() calls SendBuffer() on an OutboundTransfer.
- SendBuffer() calls TlsSocket::Writev() which hits the EAGAIN error above.
  Since 0 bytes were written, cur_slice_idx_ and cur_offset_in_slice_ remain 0
  and OutboundTransfer::TransferStarted() still returns false.
- OutboundTransfer is cancelled or timed out. car->call is set to NULL.
- WirteHandler() is called again and as it notices that the OutboundTransfer
  hasn't really started yet and "car->call" is NULL due to cancellation, it
  removes it from the outbound transfer queue and moves on to the next entry
  in the queue.
- WriteHandler() calls SendBuffer() with the next entry in the queue and
  eventually calls SSL_write() with a different buffer than expected by
  SSL_write(), leading to "SSL3_WRITE_PENDING:bad write retry" error.

This change fixes the problem above by adding a boolean flag 'started_'
which is set to true if OutboundTransfer::SendBuffer() has been called
at least once. Also added some tests to exercise cancellation paths with
multiple concurrent RPCs.

Confirmed the problem above is fixed by running stress test in a 130 node
cluster with Impala. The problem happened consistently without the fix.

Change-Id: Id7ebdcbc1ef2a3e0c5e7162f03214c232755b683
Reviewed-on: http://gerrit.cloudera.org:8080/9587
Reviewed-by: Sailesh Mukil <sail...@cloudera.com>
Reviewed-by: Todd Lipcon <t...@apache.org>
Tested-by: Todd Lipcon <t...@apache.org>
Reviewed-on: http://gerrit.cloudera.org:8080/9606
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/8079cd9d
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/8079cd9d
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/8079cd9d

Branch: refs/heads/master
Commit: 8079cd9d2a87051f81a41910b74fab15e35f36ea
Parents: 0d7787f
Author: Michael Ho <k...@cloudera.com>
Authored: Mon Mar 12 12:27:34 2018 -0700
Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org>
Committed: Tue Mar 13 22:20:39 2018 +0000

----------------------------------------------------------------------
 be/src/kudu/rpc/rpc-test-base.h |  4 ++
 be/src/kudu/rpc/rpc-test.cc     | 72 ++++++++++++++++++++++++++++++++++++
 be/src/kudu/rpc/transfer.cc     |  4 +-
 be/src/kudu/rpc/transfer.h      |  5 +++
 4 files changed, 84 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/8079cd9d/be/src/kudu/rpc/rpc-test-base.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc-test-base.h b/be/src/kudu/rpc/rpc-test-base.h
index a30e8dc..332a7a1 100644
--- a/be/src/kudu/rpc/rpc-test-base.h
+++ b/be/src/kudu/rpc/rpc-test-base.h
@@ -50,6 +50,8 @@
 #include "kudu/util/test_util.h"
 #include "kudu/util/trace.h"
 
+DECLARE_bool(rpc_encrypt_loopback_connections);
+
 namespace kudu {
 namespace rpc {
 
@@ -438,9 +440,11 @@ class RpcTestBase : public KuduTest {
     MessengerBuilder bld(name);
 
     if (enable_ssl) {
+      FLAGS_rpc_encrypt_loopback_connections = true;
       bld.set_epki_cert_key_files(rpc_certificate_file, rpc_private_key_file);
       bld.set_epki_certificate_authority_file(rpc_ca_certificate_file);
       bld.set_epki_private_password_key_cmd(rpc_private_key_password_cmd);
+      bld.set_rpc_encryption("required");
       bld.enable_inbound_tls();
     }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/8079cd9d/be/src/kudu/rpc/rpc-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc-test.cc b/be/src/kudu/rpc/rpc-test.cc
index dcbe5a7..f6d930f 100644
--- a/be/src/kudu/rpc/rpc-test.cc
+++ b/be/src/kudu/rpc/rpc-test.cc
@@ -699,6 +699,8 @@ TEST_P(TestRpc, TestRpcSidecarLimits) {
     ASSERT_STR_MATCHES(status.ToString(),
                        // Linux
                        "Connection reset by peer"
+                       // Linux, SSL enabled
+                       "|failed to read from TLS socket"
                        // macOS, while reading from socket.
                        "|got EOF from remote"
                        // macOS, while writing to socket.
@@ -1119,6 +1121,7 @@ static void SleepCallback(uint8_t* payload, 
CountDownLatch* latch) {
   latch->CountDown();
 }
 
+// Test to verify that sidecars aren't corrupted when cancelling an async RPC.
 TEST_P(TestRpc, TestCancellationAsync) {
   // Set up server.
   Sockaddr server_addr;
@@ -1168,5 +1171,74 @@ TEST_P(TestRpc, TestCancellationAsync) {
   client_messenger->Shutdown();
 }
 
+// This function loops for 40 iterations and for each iteration, sends an 
async RPC
+// and sleeps for some time between 1 to 100 microseconds before cancelling 
the RPC.
+// This serves as a helper function for TestCancellationMultiThreads() to 
exercise
+// cancellation when there are concurrent RPCs.
+static void SendAndCancelRpcs(Proxy* p, const Slice& slice) {
+  RpcController controller;
+
+  // Used to generate sleep time between invoking RPC and requesting 
cancellation.
+  Random rand(SeedRandom());
+
+  for (int i = 0; i < 40; ++i) {
+    controller.Reset();
+    PushTwoStringsRequestPB request;
+    PushTwoStringsResponsePB resp;
+    int idx;
+    CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(slice), 
&idx));
+    request.set_sidecar1_idx(idx);
+    CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(slice), 
&idx));
+    request.set_sidecar2_idx(idx);
+
+    CountDownLatch latch(1);
+    p->AsyncRequest(GenericCalculatorService::kPushTwoStringsMethodName,
+                    request, &resp, &controller,
+                    boost::bind(&CountDownLatch::CountDown, 
boost::ref(latch)));
+
+    if ((i % 8) != 0) {
+      // Sleep for a while before cancelling the RPC.
+      SleepFor(MonoDelta::FromMicroseconds(rand.Uniform64(100)));
+      controller.Cancel();
+    }
+    latch.Wait();
+    CHECK(controller.status().IsAborted() || 
controller.status().IsServiceUnavailable() ||
+          controller.status().ok()) << controller.status().ToString();
+  }
+}
+
+// Test to exercise cancellation when there are multiple concurrent RPCs from 
the
+// same client to the same server.
+TEST_P(TestRpc, TestCancellationMultiThreads) {
+  // Set up server.
+  Sockaddr server_addr;
+  bool enable_ssl = GetParam();
+  ASSERT_OK(StartTestServer(&server_addr, enable_ssl));
+
+  // Set up client.
+  LOG(INFO) << "Connecting to " << server_addr.ToString();
+  shared_ptr<Messenger> client_messenger;
+  ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl));
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
+
+  // Buffer used for sidecars by SendAndCancelRpcs().
+  string buf(16 * 1024 * 1024, 'a');
+  Slice slice(buf);
+
+  // Start a bunch of threads which invoke async RPC and cancellation.
+  std::vector<scoped_refptr<Thread>> threads;
+  for (int i = 0; i < 30; ++i) {
+    scoped_refptr<Thread> rpc_thread;
+    ASSERT_OK(Thread::Create("test", "rpc", SendAndCancelRpcs, &p, slice, 
&rpc_thread));
+    threads.push_back(rpc_thread);
+  }
+  // Wait for all threads to complete.
+  for (scoped_refptr<Thread>& rpc_thread : threads) {
+    rpc_thread->Join();
+  }
+  client_messenger->Shutdown();
+}
+
 } // namespace rpc
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/8079cd9d/be/src/kudu/rpc/transfer.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/transfer.cc b/be/src/kudu/rpc/transfer.cc
index f83c432..4203b03 100644
--- a/be/src/kudu/rpc/transfer.cc
+++ b/be/src/kudu/rpc/transfer.cc
@@ -159,6 +159,7 @@ OutboundTransfer::OutboundTransfer(int32_t call_id,
     cur_offset_in_slice_(0),
     callbacks_(callbacks),
     call_id_(call_id),
+    started_(false),
     aborted_(false) {
 
   n_payload_slices_ = n_payload_slices;
@@ -185,6 +186,7 @@ void OutboundTransfer::Abort(const Status &status) {
 Status OutboundTransfer::SendBuffer(Socket &socket) {
   CHECK_LT(cur_slice_idx_, n_payload_slices_);
 
+  started_ = true;
   int n_iovecs = n_payload_slices_ - cur_slice_idx_;
   struct iovec iovec[n_iovecs];
   {
@@ -232,7 +234,7 @@ Status OutboundTransfer::SendBuffer(Socket &socket) {
 }
 
 bool OutboundTransfer::TransferStarted() const {
-  return cur_offset_in_slice_ != 0 || cur_slice_idx_ != 0;
+  return started_;
 }
 
 bool OutboundTransfer::TransferFinished() const {

http://git-wip-us.apache.org/repos/asf/impala/blob/8079cd9d/be/src/kudu/rpc/transfer.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/transfer.h b/be/src/kudu/rpc/transfer.h
index 533fe83..2c81658 100644
--- a/be/src/kudu/rpc/transfer.h
+++ b/be/src/kudu/rpc/transfer.h
@@ -187,6 +187,11 @@ class OutboundTransfer : public 
boost::intrusive::list_base_hook<> {
   // In the case of call responses, kInvalidCallId
   int32_t call_id_;
 
+  // True if SendBuffer() has been called at least once. This can be true even 
if
+  // no bytes were sent successfully. This is needed as SSL_write() is 
stateful.
+  // Please see KUDU-2334 for details.
+  bool started_;
+
   bool aborted_;
 
   DISALLOW_COPY_AND_ASSIGN(OutboundTransfer);

Reply via email to