Repository: hadoop Updated Branches: refs/heads/HDFS-8707 467114d39 -> ba51b7cf9
libhdfs++: Get rid of lock in RpcConnectionImpl destructor. Contributed by James Clampffer Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ba51b7cf Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ba51b7cf Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ba51b7cf Branch: refs/heads/HDFS-8707 Commit: ba51b7cf9040231553bc6d60a21d8adf8c85640d Parents: 467114d Author: James <j...@apache.org> Authored: Fri Oct 14 10:13:24 2016 -0400 Committer: James <j...@apache.org> Committed: Fri Oct 14 10:13:24 2016 -0400 ---------------------------------------------------------------------- .../native/libhdfspp/lib/rpc/rpc_connection.h | 75 ++++++++++---------- .../native/libhdfspp/tests/rpc_engine_test.cc | 6 +- 2 files changed, 41 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba51b7cf/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h index 869be40..a6a07c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h @@ -34,9 +34,11 @@ namespace hdfs { -template <class NextLayer> +template <class Socket> class RpcConnectionImpl : public RpcConnection { public: + MEMCHECKED_CLASS(RpcConnectionImpl); + RpcConnectionImpl(RpcEngine *engine); virtual ~RpcConnectionImpl() override; @@ -55,7 +57,7 @@ public: virtual void FlushPendingRequests() override; - NextLayer &next_layer() { return next_layer_; } + Socket &TEST_get_mutable_socket() { return socket_; } void TEST_set_connected(bool connected) { connected_ = connected ? kConnected : kNotYetConnected; } @@ -63,35 +65,34 @@ public: const Options options_; ::asio::ip::tcp::endpoint current_endpoint_; std::vector<::asio::ip::tcp::endpoint> additional_endpoints_; - NextLayer next_layer_; + Socket socket_; ::asio::deadline_timer connect_timer_; void ConnectComplete(const ::asio::error_code &ec, const ::asio::ip::tcp::endpoint &remote); }; -template <class NextLayer> -RpcConnectionImpl<NextLayer>::RpcConnectionImpl(RpcEngine *engine) +template <class Socket> +RpcConnectionImpl<Socket>::RpcConnectionImpl(RpcEngine *engine) : RpcConnection(engine), options_(engine->options()), - next_layer_(engine->io_service()), + socket_(engine->io_service()), connect_timer_(engine->io_service()) { LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called &" << (void*)this); } -template <class NextLayer> -RpcConnectionImpl<NextLayer>::~RpcConnectionImpl() { +template <class Socket> +RpcConnectionImpl<Socket>::~RpcConnectionImpl() { LOG_DEBUG(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called &" << (void*)this); - std::lock_guard<std::mutex> state_lock(connection_state_lock_); if (pending_requests_.size() > 0) LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the pending queue"); if (requests_on_fly_.size() > 0) LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the requests_on_fly queue"); } -template <class NextLayer> -void RpcConnectionImpl<NextLayer>::Connect( +template <class Socket> +void RpcConnectionImpl<Socket>::Connect( const std::vector<::asio::ip::tcp::endpoint> &server, const AuthInfo & auth_info, RpcCallback &handler) { @@ -109,8 +110,8 @@ void RpcConnectionImpl<NextLayer>::Connect( this->ConnectAndFlush(server); // need "this" so compiler can infer type of CAF } -template <class NextLayer> -void RpcConnectionImpl<NextLayer>::ConnectAndFlush( +template <class Socket> +void RpcConnectionImpl<Socket>::ConnectAndFlush( const std::vector<::asio::ip::tcp::endpoint> &server) { LOG_INFO(kRPC, << "ConnectAndFlush called"); @@ -139,7 +140,7 @@ void RpcConnectionImpl<NextLayer>::ConnectAndFlush( current_endpoint_ = first_endpoint; auto shared_this = shared_from_this(); - next_layer_.async_connect(first_endpoint, [shared_this, this, first_endpoint](const ::asio::error_code &ec) { + socket_.async_connect(first_endpoint, [shared_this, this, first_endpoint](const ::asio::error_code &ec) { ConnectComplete(ec, first_endpoint); }); @@ -155,9 +156,9 @@ void RpcConnectionImpl<NextLayer>::ConnectAndFlush( }); } -template <class NextLayer> -void RpcConnectionImpl<NextLayer>::ConnectComplete(const ::asio::error_code &ec, const ::asio::ip::tcp::endpoint & remote) { - auto shared_this = RpcConnectionImpl<NextLayer>::shared_from_this(); +template <class Socket> +void RpcConnectionImpl<Socket>::ConnectComplete(const ::asio::error_code &ec, const ::asio::ip::tcp::endpoint & remote) { + auto shared_this = RpcConnectionImpl<Socket>::shared_from_this(); std::lock_guard<std::mutex> state_lock(connection_state_lock_); connect_timer_.cancel(); @@ -190,7 +191,7 @@ void RpcConnectionImpl<NextLayer>::ConnectComplete(const ::asio::error_code &ec, }); } else { LOG_DEBUG(kRPC, << "Rpc connection failed; err=" << status.ToString());; - std::string err = SafeDisconnect(get_asio_socket_ptr(&next_layer_)); + std::string err = SafeDisconnect(get_asio_socket_ptr(&socket_)); if(!err.empty()) { LOG_INFO(kRPC, << "Rpc connection failed to connect to endpoint, error closing connection: " << err); } @@ -202,7 +203,7 @@ void RpcConnectionImpl<NextLayer>::ConnectComplete(const ::asio::error_code &ec, additional_endpoints_.erase(additional_endpoints_.begin()); current_endpoint_ = next_endpoint; - next_layer_.async_connect(next_endpoint, [shared_this, this, next_endpoint](const ::asio::error_code &ec) { + socket_.async_connect(next_endpoint, [shared_this, this, next_endpoint](const ::asio::error_code &ec) { ConnectComplete(ec, next_endpoint); }); connect_timer_.expires_from_now( @@ -219,8 +220,8 @@ void RpcConnectionImpl<NextLayer>::ConnectComplete(const ::asio::error_code &ec, } } -template <class NextLayer> -void RpcConnectionImpl<NextLayer>::SendHandshake(RpcCallback &handler) { +template <class Socket> +void RpcConnectionImpl<Socket>::SendHandshake(RpcCallback &handler) { assert(lock_held(connection_state_lock_)); // Must be holding lock before calling LOG_TRACE(kRPC, << "RpcConnectionImpl::SendHandshake called"); @@ -228,7 +229,7 @@ void RpcConnectionImpl<NextLayer>::SendHandshake(RpcCallback &handler) { auto shared_this = shared_from_this(); auto handshake_packet = PrepareHandshakePacket(); - ::asio::async_write(next_layer_, asio::buffer(*handshake_packet), + ::asio::async_write(socket_, asio::buffer(*handshake_packet), [handshake_packet, handler, shared_this, this]( const ::asio::error_code &ec, size_t) { Status status = ToStatus(ec); @@ -236,15 +237,15 @@ void RpcConnectionImpl<NextLayer>::SendHandshake(RpcCallback &handler) { }); } -template <class NextLayer> -void RpcConnectionImpl<NextLayer>::SendContext(RpcCallback &handler) { +template <class Socket> +void RpcConnectionImpl<Socket>::SendContext(RpcCallback &handler) { assert(lock_held(connection_state_lock_)); // Must be holding lock before calling LOG_TRACE(kRPC, << "RpcConnectionImpl::SendContext called"); auto shared_this = shared_from_this(); auto context_packet = PrepareContextPacket(); - ::asio::async_write(next_layer_, asio::buffer(*context_packet), + ::asio::async_write(socket_, asio::buffer(*context_packet), [context_packet, handler, shared_this, this]( const ::asio::error_code &ec, size_t) { Status status = ToStatus(ec); @@ -252,8 +253,8 @@ void RpcConnectionImpl<NextLayer>::SendContext(RpcCallback &handler) { }); } -template <class NextLayer> -void RpcConnectionImpl<NextLayer>::OnSendCompleted(const ::asio::error_code &ec, +template <class Socket> +void RpcConnectionImpl<Socket>::OnSendCompleted(const ::asio::error_code &ec, size_t) { using std::placeholders::_1; using std::placeholders::_2; @@ -271,8 +272,8 @@ void RpcConnectionImpl<NextLayer>::OnSendCompleted(const ::asio::error_code &ec, FlushPendingRequests(); } -template <class NextLayer> -void RpcConnectionImpl<NextLayer>::FlushPendingRequests() { +template <class Socket> +void RpcConnectionImpl<Socket>::FlushPendingRequests() { using namespace ::std::placeholders; // Lock should be held @@ -335,7 +336,7 @@ void RpcConnectionImpl<NextLayer>::FlushPendingRequests() { this->HandleRpcTimeout(timeout_req, ec); }); - asio::async_write(next_layer_, asio::buffer(*payload), + asio::async_write(socket_, asio::buffer(*payload), [shared_this, this, payload](const ::asio::error_code &ec, size_t size) { OnSendCompleted(ec, size); @@ -352,8 +353,8 @@ void RpcConnectionImpl<NextLayer>::FlushPendingRequests() { } -template <class NextLayer> -void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code &original_ec, +template <class Socket> +void RpcConnectionImpl<Socket>::OnRecvCompleted(const ::asio::error_code &original_ec, size_t) { using std::placeholders::_1; using std::placeholders::_2; @@ -396,7 +397,7 @@ void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code &ori auto buf = ::asio::buffer(reinterpret_cast<char *>(¤t_response_state_->length_), sizeof(current_response_state_->length_)); asio::async_read( - next_layer_, buf, + socket_, buf, [shared_this, this](const ::asio::error_code &ec, size_t size) { OnRecvCompleted(ec, size); }); @@ -405,7 +406,7 @@ void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code &ori current_response_state_->length_ = ntohl(current_response_state_->length_); current_response_state_->data_.resize(current_response_state_->length_); asio::async_read( - next_layer_, ::asio::buffer(current_response_state_->data_), + socket_, ::asio::buffer(current_response_state_->data_), [shared_this, this](const ::asio::error_code &ec, size_t size) { OnRecvCompleted(ec, size); }); @@ -425,8 +426,8 @@ void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code &ori } } -template <class NextLayer> -void RpcConnectionImpl<NextLayer>::Disconnect() { +template <class Socket> +void RpcConnectionImpl<Socket>::Disconnect() { assert(lock_held(connection_state_lock_)); // Must be holding lock before calling LOG_INFO(kRPC, << "RpcConnectionImpl::Disconnect called"); @@ -434,7 +435,7 @@ void RpcConnectionImpl<NextLayer>::Disconnect() { request_over_the_wire_.reset(); if (connected_ == kConnecting || connected_ == kHandshaking || connected_ == kAuthenticating || connected_ == kConnected) { // Don't print out errors, we were expecting a disconnect here - SafeDisconnect(get_asio_socket_ptr(&next_layer_)); + SafeDisconnect(get_asio_socket_ptr(&socket_)); } connected_ = kDisconnected; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba51b7cf/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc index 3e8c93f..08218f6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc @@ -118,7 +118,7 @@ TEST(RpcEngineTest, TestRoundTrip) { RpcResponseHeaderProto h; h.set_callid(1); h.set_status(RpcResponseHeaderProto::SUCCESS); - EXPECT_CALL(conn->next_layer(), Produce()) + EXPECT_CALL(conn->TEST_get_mutable_socket(), Produce()) .WillOnce(Return(RpcResponse(h, server_resp.SerializeAsString()))); std::shared_ptr<RpcConnection> conn_ptr(conn); @@ -153,7 +153,7 @@ TEST(RpcEngineTest, TestConnectionResetAndFail) { RpcResponseHeaderProto h; h.set_callid(1); h.set_status(RpcResponseHeaderProto::SUCCESS); - EXPECT_CALL(conn->next_layer(), Produce()) + EXPECT_CALL(conn->TEST_get_mutable_socket(), Produce()) .WillOnce(Return(RpcResponse( h, "", make_error_code(::asio::error::connection_reset)))); @@ -455,7 +455,7 @@ TEST(RpcEngineTest, TestTimeout) { conn->TEST_set_connected(true); conn->StartReading(); - EXPECT_CALL(conn->next_layer(), Produce()) + EXPECT_CALL(conn->TEST_get_mutable_socket(), Produce()) .WillOnce(Return(std::make_pair(::asio::error::would_block, ""))); std::shared_ptr<RpcConnection> conn_ptr(conn); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org