KUDU-2041: Fix negotiation deadlock With N threads in the negotiation threadpool, N or more concurrent client negotiation attempts could starve any incoming server negotiation tasks which used the same threadpool.
If the set of negotiation attempts forms a graph with a N cycles, the negotiation could deadlock (at least until the negotiation timeout expires) as all nodes in the system wait for a server request to complete, but all nodes have dedicated all their resources to client requests. Fix: split the server and client tasks into two separate pools. Testing: add a unit test which reproduces the issue, and passes with the fix applied. Change-Id: I38379eeaf7516d432708c2a2a285839f96c86d4f Reviewed-on: http://gerrit.cloudera.org:8080/7177 Reviewed-by: Todd Lipcon <[email protected]> Tested-by: Kudu Jenkins Reviewed-on: http://gerrit.cloudera.org:8080/7742 Reviewed-by: Henry Robinson <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/cc4816b3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/cc4816b3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/cc4816b3 Branch: refs/heads/master Commit: cc4816b3d6ae2ca00ce61b1bc442161bfbe6de3f Parents: d5670d6 Author: Henry Robinson <[email protected]> Authored: Fri Apr 14 15:54:11 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Wed Aug 30 04:04:08 2017 +0000 ---------------------------------------------------------------------- be/src/kudu/rpc/messenger.cc | 24 +++++++++++++++++++----- be/src/kudu/rpc/messenger.h | 8 ++++++-- be/src/kudu/rpc/reactor.cc | 4 +++- be/src/kudu/rpc/rpc-test-base.h | 15 +++++++++++++-- be/src/kudu/rpc/rpc-test.cc | 24 ++++++++++++++++++++++++ 5 files changed, 65 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cc4816b3/be/src/kudu/rpc/messenger.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/messenger.cc b/be/src/kudu/rpc/messenger.cc index 1cec5bf..2884fba 100644 --- a/be/src/kudu/rpc/messenger.cc +++ b/be/src/kudu/rpc/messenger.cc @@ -333,7 +333,8 @@ void Messenger::Shutdown() { // Need to shut down negotiation pool before the reactors, since the // reactors close the Connection sockets, and may race against the negotiation // threads' blocking reads & writes. - negotiation_pool_->Shutdown(); + client_negotiation_pool_->Shutdown(); + server_negotiation_pool_->Shutdown(); for (Reactor* reactor : reactors_) { reactor->Shutdown(); @@ -435,10 +436,14 @@ Messenger::Messenger(const MessengerBuilder &bld) for (int i = 0; i < bld.num_reactors_; i++) { reactors_.push_back(new Reactor(retain_self_, i, bld)); } - CHECK_OK(ThreadPoolBuilder("negotiator") - .set_min_threads(bld.min_negotiation_threads_) - .set_max_threads(bld.max_negotiation_threads_) - .Build(&negotiation_pool_)); + CHECK_OK(ThreadPoolBuilder("client-negotiator") + .set_min_threads(bld.min_negotiation_threads_) + .set_max_threads(bld.max_negotiation_threads_) + .Build(&client_negotiation_pool_)); + CHECK_OK(ThreadPoolBuilder("server-negotiator") + .set_min_threads(bld.min_negotiation_threads_) + .set_max_threads(bld.max_negotiation_threads_) + .Build(&server_negotiation_pool_)); } Messenger::~Messenger() { @@ -503,5 +508,14 @@ const scoped_refptr<RpcService> Messenger::rpc_service(const string& service_nam } } +ThreadPool* Messenger::negotiation_pool(Connection::Direction dir) { + switch (dir) { + case Connection::CLIENT: return client_negotiation_pool_.get(); + case Connection::SERVER: return server_negotiation_pool_.get(); + } + DCHECK(false) << "Unknown Connection::Direction value: " << dir; + return nullptr; +} + } // namespace rpc } // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cc4816b3/be/src/kudu/rpc/messenger.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/messenger.h b/be/src/kudu/rpc/messenger.h index 1ba76a7..9a8ebab 100644 --- a/be/src/kudu/rpc/messenger.h +++ b/be/src/kudu/rpc/messenger.h @@ -30,6 +30,7 @@ #include "kudu/gutil/gscoped_ptr.h" #include "kudu/gutil/ref_counted.h" +#include "kudu/rpc/connection.h" #include "kudu/rpc/response_callback.h" #include "kudu/security/token.pb.h" #include "kudu/util/locks.h" @@ -227,7 +228,7 @@ class Messenger { RpcAuthentication authentication() const { return authentication_; } RpcEncryption encryption() const { return encryption_; } - ThreadPool* negotiation_pool() const { return negotiation_pool_.get(); } + ThreadPool* negotiation_pool(Connection::Direction dir); RpczStore* rpcz_store() { return rpcz_store_.get(); } @@ -287,7 +288,10 @@ class Messenger { std::vector<Reactor*> reactors_; - gscoped_ptr<ThreadPool> negotiation_pool_; + // Separate client and server negotiation pools to avoid possibility of distributed + // deadlock. See KUDU-2041. + gscoped_ptr<ThreadPool> client_negotiation_pool_; + gscoped_ptr<ThreadPool> server_negotiation_pool_; std::unique_ptr<security::TlsContext> tls_context_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cc4816b3/be/src/kudu/rpc/reactor.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/reactor.cc b/be/src/kudu/rpc/reactor.cc index e235dd4..df4a661 100644 --- a/be/src/kudu/rpc/reactor.cc +++ b/be/src/kudu/rpc/reactor.cc @@ -446,7 +446,9 @@ Status ReactorThread::StartConnectionNegotiation(const scoped_refptr<Connection> TRACE("Submitting negotiation task for $0", conn->ToString()); auto authentication = reactor()->messenger()->authentication(); auto encryption = reactor()->messenger()->encryption(); - RETURN_NOT_OK(reactor()->messenger()->negotiation_pool()->SubmitClosure( + ThreadPool* negotiation_pool = + reactor()->messenger()->negotiation_pool(conn->direction()); + RETURN_NOT_OK(negotiation_pool->SubmitClosure( Bind(&Negotiation::RunNegotiation, conn, authentication, encryption, deadline))); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cc4816b3/be/src/kudu/rpc/rpc-test-base.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/rpc-test-base.h b/be/src/kudu/rpc/rpc-test-base.h index c40f546..c28218b 100644 --- a/be/src/kudu/rpc/rpc-test-base.h +++ b/be/src/kudu/rpc/rpc-test-base.h @@ -525,6 +525,11 @@ class RpcTestBase : public KuduTest { DoStartTestServer<CalculatorService>(server_addr, enable_ssl); } + void StartTestServerWithCustomMessenger(Sockaddr *server_addr, + const std::shared_ptr<Messenger>& messenger, bool enable_ssl = false) { + DoStartTestServer<GenericCalculatorService>(server_addr, enable_ssl, messenger); + } + // Start a simple socket listening on a local port, returning the address. // This isn't an RPC server -- just a plain socket which can be helpful for testing. Status StartFakeServer(Socket *listen_sock, Sockaddr *listen_addr) { @@ -548,8 +553,14 @@ class RpcTestBase : public KuduTest { } template<class ServiceClass> - void DoStartTestServer(Sockaddr *server_addr, bool enable_ssl = false) { - server_messenger_ = CreateMessenger("TestServer", n_server_reactor_threads_, enable_ssl); + void DoStartTestServer(Sockaddr *server_addr, bool enable_ssl = false, + const std::shared_ptr<Messenger>& messenger = nullptr) { + if (!messenger) { + server_messenger_ = + CreateMessenger("TestServer", n_server_reactor_threads_, enable_ssl); + } else { + server_messenger_ = messenger; + } std::shared_ptr<AcceptorPool> pool; ASSERT_OK(server_messenger_->AddAcceptorPool(Sockaddr(), &pool)); ASSERT_OK(pool->Start(2)); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cc4816b3/be/src/kudu/rpc/rpc-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/rpc-test.cc b/be/src/kudu/rpc/rpc-test.cc index 47ca528..2378892 100644 --- a/be/src/kudu/rpc/rpc-test.cc +++ b/be/src/kudu/rpc/rpc-test.cc @@ -108,6 +108,30 @@ TEST_F(TestRpc, TestConnHeaderValidation) { ASSERT_OK(serialization::ValidateConnHeader(Slice(buf, conn_hdr_len))); } +// Regression test for KUDU-2041 +TEST_P(TestRpc, TestNegotiationDeadlock) { + bool enable_ssl = GetParam(); + + // The deadlock would manifest in cases where the number of concurrent connection + // requests >= the number of threads. 1 thread and 1 cnxn to ourself is just the easiest + // way to reproduce the issue, because the server negotiation task must get queued after + // the client negotiation task if they share the same thread pool. + MessengerBuilder mb("TestRpc.TestNegotiationDeadlock"); + mb.set_min_negotiation_threads(1) + .set_max_negotiation_threads(1) + .set_metric_entity(metric_entity_); + if (enable_ssl) mb.enable_inbound_tls(); + + shared_ptr<Messenger> messenger; + CHECK_OK(mb.Build(&messenger)); + + Sockaddr server_addr; + StartTestServerWithCustomMessenger(&server_addr, messenger, enable_ssl); + + Proxy p(messenger, server_addr, GenericCalculatorService::static_service_name()); + ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName)); +} + // Test making successful RPC calls. TEST_P(TestRpc, TestCall) { // Set up server.
