Repository: kudu Updated Branches: refs/heads/master 287dc5408 -> de57e3c92
KUDU-2334: Fix OutboundTransfer::TransferStarted() to work with SSL_write() Previously, OutboundTransfer::TransferStarted() returns true iff non-zero bytes have been successfully sent via Writev(). As it turns out, this doesn't work well with SSL_write(). When SSL_write() returns -1 with errno EAGAIN or ETRYAGAIN, we need to retry the call with exactly the same buffer pointer next time even if 0 bytes have been written. The following sequence becomes problematic with the previous implementation of OutboundTransfer::TransferStarted(): - WriteHandler() calls SendBuffer() on an OutboundTransfer. - SendBuffer() calls TlsSocket::Writev() which hits the EAGAIN error above. Since 0 bytes were written, cur_slice_idx_ and cur_offset_in_slice_ remain 0 and OutboundTransfer::TransferStarted() still returns false. - OutboundTransfer is cancelled or timed out. car->call is set to NULL. - WirteHandler() is called again and as it notices that the OutboundTransfer hasn't really started yet and "car->call" is NULL due to cancellation, it removes it from the outbound transfer queue and moves on to the next entry in the queue. - WriteHandler() calls SendBuffer() with the next entry in the queue and eventually calls SSL_write() with a different buffer than expected by SSL_write(), leading to "SSL3_WRITE_PENDING:bad write retry" error. This change fixes the problem above by adding a boolean flag 'started_' which is set to true if OutboundTransfer::SendBuffer() has been called at least once. Also added some tests to exercise cancellation paths with multiple concurrent RPCs. Confirmed the problem above is fixed by running stress test in a 130 node cluster with Impala. The problem happened consistently without the fix. Change-Id: Id7ebdcbc1ef2a3e0c5e7162f03214c232755b683 Reviewed-on: http://gerrit.cloudera.org:8080/9587 Reviewed-by: Sailesh Mukil <[email protected]> Reviewed-by: Todd Lipcon <[email protected]> Tested-by: Todd Lipcon <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/de57e3c9 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/de57e3c9 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/de57e3c9 Branch: refs/heads/master Commit: de57e3c924fbb2521743ef9762a556b871691a03 Parents: 287dc54 Author: Michael Ho <[email protected]> Authored: Mon Mar 12 12:27:34 2018 -0700 Committer: Todd Lipcon <[email protected]> Committed: Tue Mar 13 15:11:11 2018 +0000 ---------------------------------------------------------------------- src/kudu/rpc/rpc-test-base.h | 4 +++ src/kudu/rpc/rpc-test.cc | 72 +++++++++++++++++++++++++++++++++++++++ src/kudu/rpc/transfer.cc | 4 ++- src/kudu/rpc/transfer.h | 5 +++ 4 files changed, 84 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/de57e3c9/src/kudu/rpc/rpc-test-base.h ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h index b49d6a5..3c1f571 100644 --- a/src/kudu/rpc/rpc-test-base.h +++ b/src/kudu/rpc/rpc-test-base.h @@ -51,6 +51,8 @@ #include "kudu/util/test_util.h" #include "kudu/util/trace.h" +DECLARE_bool(rpc_encrypt_loopback_connections); + namespace kudu { namespace rpc { @@ -436,9 +438,11 @@ class RpcTestBase : public KuduTest { MessengerBuilder bld(name); if (enable_ssl) { + FLAGS_rpc_encrypt_loopback_connections = true; bld.set_epki_cert_key_files(rpc_certificate_file, rpc_private_key_file); bld.set_epki_certificate_authority_file(rpc_ca_certificate_file); bld.set_epki_private_password_key_cmd(rpc_private_key_password_cmd); + bld.set_rpc_encryption("required"); bld.enable_inbound_tls(); } http://git-wip-us.apache.org/repos/asf/kudu/blob/de57e3c9/src/kudu/rpc/rpc-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc index f6496cf..0960dd2 100644 --- a/src/kudu/rpc/rpc-test.cc +++ b/src/kudu/rpc/rpc-test.cc @@ -737,6 +737,8 @@ TEST_P(TestRpc, TestRpcSidecarLimits) { ASSERT_STR_MATCHES(status.ToString(), // Linux "Connection reset by peer" + // Linux, SSL enabled + "|failed to read from TLS socket" // macOS, while reading from socket. "|got EOF from remote" // macOS, while writing to socket. @@ -1167,6 +1169,7 @@ static void SleepCallback(uint8_t* payload, CountDownLatch* latch) { latch->CountDown(); } +// Test to verify that sidecars aren't corrupted when cancelling an async RPC. TEST_P(TestRpc, TestCancellationAsync) { // Set up server. Sockaddr server_addr; @@ -1217,5 +1220,74 @@ TEST_P(TestRpc, TestCancellationAsync) { client_messenger->Shutdown(); } +// This function loops for 40 iterations and for each iteration, sends an async RPC +// and sleeps for some time between 1 to 100 microseconds before cancelling the RPC. +// This serves as a helper function for TestCancellationMultiThreads() to exercise +// cancellation when there are concurrent RPCs. +static void SendAndCancelRpcs(Proxy* p, const Slice& slice) { + RpcController controller; + + // Used to generate sleep time between invoking RPC and requesting cancellation. + Random rand(SeedRandom()); + + for (int i = 0; i < 40; ++i) { + controller.Reset(); + PushTwoStringsRequestPB request; + PushTwoStringsResponsePB resp; + int idx; + CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(slice), &idx)); + request.set_sidecar1_idx(idx); + CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(slice), &idx)); + request.set_sidecar2_idx(idx); + + CountDownLatch latch(1); + p->AsyncRequest(GenericCalculatorService::kPushTwoStringsMethodName, + request, &resp, &controller, + boost::bind(&CountDownLatch::CountDown, boost::ref(latch))); + + if ((i % 8) != 0) { + // Sleep for a while before cancelling the RPC. + SleepFor(MonoDelta::FromMicroseconds(rand.Uniform64(100))); + controller.Cancel(); + } + latch.Wait(); + CHECK(controller.status().IsAborted() || controller.status().IsServiceUnavailable() || + controller.status().ok()) << controller.status().ToString(); + } +} + +// Test to exercise cancellation when there are multiple concurrent RPCs from the +// same client to the same server. +TEST_P(TestRpc, TestCancellationMultiThreads) { + // Set up server. + Sockaddr server_addr; + bool enable_ssl = GetParam(); + ASSERT_OK(StartTestServer(&server_addr, enable_ssl)); + + // Set up client. + LOG(INFO) << "Connecting to " << server_addr.ToString(); + shared_ptr<Messenger> client_messenger; + ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl)); + Proxy p(client_messenger, server_addr, server_addr.host(), + GenericCalculatorService::static_service_name()); + + // Buffer used for sidecars by SendAndCancelRpcs(). + string buf(16 * 1024 * 1024, 'a'); + Slice slice(buf); + + // Start a bunch of threads which invoke async RPC and cancellation. + std::vector<scoped_refptr<Thread>> threads; + for (int i = 0; i < 30; ++i) { + scoped_refptr<Thread> rpc_thread; + ASSERT_OK(Thread::Create("test", "rpc", SendAndCancelRpcs, &p, slice, &rpc_thread)); + threads.push_back(rpc_thread); + } + // Wait for all threads to complete. + for (scoped_refptr<Thread>& rpc_thread : threads) { + rpc_thread->Join(); + } + client_messenger->Shutdown(); +} + } // namespace rpc } // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/de57e3c9/src/kudu/rpc/transfer.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/transfer.cc b/src/kudu/rpc/transfer.cc index ee83d55..d8786c4 100644 --- a/src/kudu/rpc/transfer.cc +++ b/src/kudu/rpc/transfer.cc @@ -160,6 +160,7 @@ OutboundTransfer::OutboundTransfer(int32_t call_id, cur_offset_in_slice_(0), callbacks_(callbacks), call_id_(call_id), + started_(false), aborted_(false) { n_payload_slices_ = n_payload_slices; @@ -186,6 +187,7 @@ void OutboundTransfer::Abort(const Status &status) { Status OutboundTransfer::SendBuffer(Socket &socket) { CHECK_LT(cur_slice_idx_, n_payload_slices_); + started_ = true; int n_iovecs = n_payload_slices_ - cur_slice_idx_; struct iovec iovec[n_iovecs]; { @@ -233,7 +235,7 @@ Status OutboundTransfer::SendBuffer(Socket &socket) { } bool OutboundTransfer::TransferStarted() const { - return cur_offset_in_slice_ != 0 || cur_slice_idx_ != 0; + return started_; } bool OutboundTransfer::TransferFinished() const { http://git-wip-us.apache.org/repos/asf/kudu/blob/de57e3c9/src/kudu/rpc/transfer.h ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/transfer.h b/src/kudu/rpc/transfer.h index 53f4c90..baa07c5 100644 --- a/src/kudu/rpc/transfer.h +++ b/src/kudu/rpc/transfer.h @@ -183,6 +183,11 @@ class OutboundTransfer : public boost::intrusive::list_base_hook<> { // In the case of call responses, kInvalidCallId int32_t call_id_; + // True if SendBuffer() has been called at least once. This can be true even if + // no bytes were sent successfully. This is needed as SSL_write() is stateful. + // Please see KUDU-2334 for details. + bool started_; + bool aborted_; DISALLOW_COPY_AND_ASSIGN(OutboundTransfer);
