Repository: hadoop
Updated Branches:
  refs/heads/HDFS-8707 467114d39 -> ba51b7cf9


libhdfs++: Get rid of lock in RpcConnectionImpl destructor.  Contributed by 
James Clampffer


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ba51b7cf
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ba51b7cf
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ba51b7cf

Branch: refs/heads/HDFS-8707
Commit: ba51b7cf9040231553bc6d60a21d8adf8c85640d
Parents: 467114d
Author: James <j...@apache.org>
Authored: Fri Oct 14 10:13:24 2016 -0400
Committer: James <j...@apache.org>
Committed: Fri Oct 14 10:13:24 2016 -0400

----------------------------------------------------------------------
 .../native/libhdfspp/lib/rpc/rpc_connection.h   | 75 ++++++++++----------
 .../native/libhdfspp/tests/rpc_engine_test.cc   |  6 +-
 2 files changed, 41 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba51b7cf/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
index 869be40..a6a07c4 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
@@ -34,9 +34,11 @@
 
 namespace hdfs {
 
-template <class NextLayer>
+template <class Socket>
 class RpcConnectionImpl : public RpcConnection {
 public:
+  MEMCHECKED_CLASS(RpcConnectionImpl);
+
   RpcConnectionImpl(RpcEngine *engine);
   virtual ~RpcConnectionImpl() override;
 
@@ -55,7 +57,7 @@ public:
   virtual void FlushPendingRequests() override;
 
 
-  NextLayer &next_layer() { return next_layer_; }
+  Socket &TEST_get_mutable_socket() { return socket_; }
 
   void TEST_set_connected(bool connected) { connected_ = connected ? 
kConnected : kNotYetConnected; }
 
@@ -63,35 +65,34 @@ public:
   const Options options_;
   ::asio::ip::tcp::endpoint current_endpoint_;
   std::vector<::asio::ip::tcp::endpoint> additional_endpoints_;
-  NextLayer next_layer_;
+  Socket socket_;
   ::asio::deadline_timer connect_timer_;
 
   void ConnectComplete(const ::asio::error_code &ec, const 
::asio::ip::tcp::endpoint &remote);
 };
 
-template <class NextLayer>
-RpcConnectionImpl<NextLayer>::RpcConnectionImpl(RpcEngine *engine)
+template <class Socket>
+RpcConnectionImpl<Socket>::RpcConnectionImpl(RpcEngine *engine)
     : RpcConnection(engine),
       options_(engine->options()),
-      next_layer_(engine->io_service()),
+      socket_(engine->io_service()),
       connect_timer_(engine->io_service())
 {
       LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called &" << 
(void*)this);
 }
 
-template <class NextLayer>
-RpcConnectionImpl<NextLayer>::~RpcConnectionImpl() {
+template <class Socket>
+RpcConnectionImpl<Socket>::~RpcConnectionImpl() {
   LOG_DEBUG(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called &" << 
(void*)this);
 
-  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
   if (pending_requests_.size() > 0)
     LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items 
in the pending queue");
   if (requests_on_fly_.size() > 0)
     LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items 
in the requests_on_fly queue");
 }
 
-template <class NextLayer>
-void RpcConnectionImpl<NextLayer>::Connect(
+template <class Socket>
+void RpcConnectionImpl<Socket>::Connect(
     const std::vector<::asio::ip::tcp::endpoint> &server,
     const AuthInfo & auth_info,
     RpcCallback &handler) {
@@ -109,8 +110,8 @@ void RpcConnectionImpl<NextLayer>::Connect(
   this->ConnectAndFlush(server);  // need "this" so compiler can infer type of 
CAF
 }
 
-template <class NextLayer>
-void RpcConnectionImpl<NextLayer>::ConnectAndFlush(
+template <class Socket>
+void RpcConnectionImpl<Socket>::ConnectAndFlush(
     const std::vector<::asio::ip::tcp::endpoint> &server) {
 
   LOG_INFO(kRPC, << "ConnectAndFlush called");
@@ -139,7 +140,7 @@ void RpcConnectionImpl<NextLayer>::ConnectAndFlush(
   current_endpoint_ = first_endpoint;
 
   auto shared_this = shared_from_this();
-  next_layer_.async_connect(first_endpoint, [shared_this, this, 
first_endpoint](const ::asio::error_code &ec) {
+  socket_.async_connect(first_endpoint, [shared_this, this, 
first_endpoint](const ::asio::error_code &ec) {
     ConnectComplete(ec, first_endpoint);
   });
 
@@ -155,9 +156,9 @@ void RpcConnectionImpl<NextLayer>::ConnectAndFlush(
   });
 }
 
-template <class NextLayer>
-void RpcConnectionImpl<NextLayer>::ConnectComplete(const ::asio::error_code 
&ec, const ::asio::ip::tcp::endpoint & remote) {
-  auto shared_this = RpcConnectionImpl<NextLayer>::shared_from_this();
+template <class Socket>
+void RpcConnectionImpl<Socket>::ConnectComplete(const ::asio::error_code &ec, 
const ::asio::ip::tcp::endpoint & remote) {
+  auto shared_this = RpcConnectionImpl<Socket>::shared_from_this();
   std::lock_guard<std::mutex> state_lock(connection_state_lock_);
   connect_timer_.cancel();
 
@@ -190,7 +191,7 @@ void RpcConnectionImpl<NextLayer>::ConnectComplete(const 
::asio::error_code &ec,
     });
   } else {
     LOG_DEBUG(kRPC, << "Rpc connection failed; err=" << status.ToString());;
-    std::string err = SafeDisconnect(get_asio_socket_ptr(&next_layer_));
+    std::string err = SafeDisconnect(get_asio_socket_ptr(&socket_));
     if(!err.empty()) {
       LOG_INFO(kRPC, << "Rpc connection failed to connect to endpoint, error 
closing connection: " << err);
     }
@@ -202,7 +203,7 @@ void RpcConnectionImpl<NextLayer>::ConnectComplete(const 
::asio::error_code &ec,
       additional_endpoints_.erase(additional_endpoints_.begin());
       current_endpoint_ = next_endpoint;
 
-      next_layer_.async_connect(next_endpoint, [shared_this, this, 
next_endpoint](const ::asio::error_code &ec) {
+      socket_.async_connect(next_endpoint, [shared_this, this, 
next_endpoint](const ::asio::error_code &ec) {
         ConnectComplete(ec, next_endpoint);
       });
       connect_timer_.expires_from_now(
@@ -219,8 +220,8 @@ void RpcConnectionImpl<NextLayer>::ConnectComplete(const 
::asio::error_code &ec,
   }
 }
 
-template <class NextLayer>
-void RpcConnectionImpl<NextLayer>::SendHandshake(RpcCallback &handler) {
+template <class Socket>
+void RpcConnectionImpl<Socket>::SendHandshake(RpcCallback &handler) {
   assert(lock_held(connection_state_lock_));  // Must be holding lock before 
calling
 
   LOG_TRACE(kRPC, << "RpcConnectionImpl::SendHandshake called");
@@ -228,7 +229,7 @@ void 
RpcConnectionImpl<NextLayer>::SendHandshake(RpcCallback &handler) {
 
   auto shared_this = shared_from_this();
   auto handshake_packet = PrepareHandshakePacket();
-  ::asio::async_write(next_layer_, asio::buffer(*handshake_packet),
+  ::asio::async_write(socket_, asio::buffer(*handshake_packet),
                       [handshake_packet, handler, shared_this, this](
                           const ::asio::error_code &ec, size_t) {
                         Status status = ToStatus(ec);
@@ -236,15 +237,15 @@ void 
RpcConnectionImpl<NextLayer>::SendHandshake(RpcCallback &handler) {
                       });
 }
 
-template <class NextLayer>
-void RpcConnectionImpl<NextLayer>::SendContext(RpcCallback &handler) {
+template <class Socket>
+void RpcConnectionImpl<Socket>::SendContext(RpcCallback &handler) {
   assert(lock_held(connection_state_lock_));  // Must be holding lock before 
calling
 
   LOG_TRACE(kRPC, << "RpcConnectionImpl::SendContext called");
 
   auto shared_this = shared_from_this();
   auto context_packet = PrepareContextPacket();
-  ::asio::async_write(next_layer_, asio::buffer(*context_packet),
+  ::asio::async_write(socket_, asio::buffer(*context_packet),
                       [context_packet, handler, shared_this, this](
                           const ::asio::error_code &ec, size_t) {
                         Status status = ToStatus(ec);
@@ -252,8 +253,8 @@ void RpcConnectionImpl<NextLayer>::SendContext(RpcCallback 
&handler) {
                       });
 }
 
-template <class NextLayer>
-void RpcConnectionImpl<NextLayer>::OnSendCompleted(const ::asio::error_code 
&ec,
+template <class Socket>
+void RpcConnectionImpl<Socket>::OnSendCompleted(const ::asio::error_code &ec,
                                                    size_t) {
   using std::placeholders::_1;
   using std::placeholders::_2;
@@ -271,8 +272,8 @@ void RpcConnectionImpl<NextLayer>::OnSendCompleted(const 
::asio::error_code &ec,
   FlushPendingRequests();
 }
 
-template <class NextLayer>
-void RpcConnectionImpl<NextLayer>::FlushPendingRequests() {
+template <class Socket>
+void RpcConnectionImpl<Socket>::FlushPendingRequests() {
   using namespace ::std::placeholders;
 
   // Lock should be held
@@ -335,7 +336,7 @@ void RpcConnectionImpl<NextLayer>::FlushPendingRequests() {
           this->HandleRpcTimeout(timeout_req, ec);
     });
 
-    asio::async_write(next_layer_, asio::buffer(*payload),
+    asio::async_write(socket_, asio::buffer(*payload),
                       [shared_this, this, payload](const ::asio::error_code 
&ec,
                                                    size_t size) {
                         OnSendCompleted(ec, size);
@@ -352,8 +353,8 @@ void RpcConnectionImpl<NextLayer>::FlushPendingRequests() {
 }
 
 
-template <class NextLayer>
-void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code 
&original_ec,
+template <class Socket>
+void RpcConnectionImpl<Socket>::OnRecvCompleted(const ::asio::error_code 
&original_ec,
                                                    size_t) {
   using std::placeholders::_1;
   using std::placeholders::_2;
@@ -396,7 +397,7 @@ void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const 
::asio::error_code &ori
     auto buf = ::asio::buffer(reinterpret_cast<char 
*>(&current_response_state_->length_),
                               sizeof(current_response_state_->length_));
     asio::async_read(
-        next_layer_, buf,
+        socket_, buf,
         [shared_this, this](const ::asio::error_code &ec, size_t size) {
           OnRecvCompleted(ec, size);
         });
@@ -405,7 +406,7 @@ void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const 
::asio::error_code &ori
     current_response_state_->length_ = ntohl(current_response_state_->length_);
     current_response_state_->data_.resize(current_response_state_->length_);
     asio::async_read(
-        next_layer_, ::asio::buffer(current_response_state_->data_),
+        socket_, ::asio::buffer(current_response_state_->data_),
         [shared_this, this](const ::asio::error_code &ec, size_t size) {
           OnRecvCompleted(ec, size);
         });
@@ -425,8 +426,8 @@ void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const 
::asio::error_code &ori
   }
 }
 
-template <class NextLayer>
-void RpcConnectionImpl<NextLayer>::Disconnect() {
+template <class Socket>
+void RpcConnectionImpl<Socket>::Disconnect() {
   assert(lock_held(connection_state_lock_));  // Must be holding lock before 
calling
 
   LOG_INFO(kRPC, << "RpcConnectionImpl::Disconnect called");
@@ -434,7 +435,7 @@ void RpcConnectionImpl<NextLayer>::Disconnect() {
   request_over_the_wire_.reset();
   if (connected_ == kConnecting || connected_ == kHandshaking || connected_ == 
kAuthenticating || connected_ == kConnected) {
     // Don't print out errors, we were expecting a disconnect here
-    SafeDisconnect(get_asio_socket_ptr(&next_layer_));
+    SafeDisconnect(get_asio_socket_ptr(&socket_));
   }
   connected_ = kDisconnected;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba51b7cf/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
index 3e8c93f..08218f6 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
@@ -118,7 +118,7 @@ TEST(RpcEngineTest, TestRoundTrip) {
   RpcResponseHeaderProto h;
   h.set_callid(1);
   h.set_status(RpcResponseHeaderProto::SUCCESS);
-  EXPECT_CALL(conn->next_layer(), Produce())
+  EXPECT_CALL(conn->TEST_get_mutable_socket(), Produce())
       .WillOnce(Return(RpcResponse(h, server_resp.SerializeAsString())));
 
   std::shared_ptr<RpcConnection> conn_ptr(conn);
@@ -153,7 +153,7 @@ TEST(RpcEngineTest, TestConnectionResetAndFail) {
   RpcResponseHeaderProto h;
   h.set_callid(1);
   h.set_status(RpcResponseHeaderProto::SUCCESS);
-  EXPECT_CALL(conn->next_layer(), Produce())
+  EXPECT_CALL(conn->TEST_get_mutable_socket(), Produce())
       .WillOnce(Return(RpcResponse(
           h, "", make_error_code(::asio::error::connection_reset))));
 
@@ -455,7 +455,7 @@ TEST(RpcEngineTest, TestTimeout) {
   conn->TEST_set_connected(true);
   conn->StartReading();
 
-    EXPECT_CALL(conn->next_layer(), Produce())
+    EXPECT_CALL(conn->TEST_get_mutable_socket(), Produce())
         .WillOnce(Return(std::make_pair(::asio::error::would_block, "")));
 
   std::shared_ptr<RpcConnection> conn_ptr(conn);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to