This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d8f6155  Introduce strand to C++ client for exclusive control (#4750)
d8f6155 is described below

commit d8f61553dab65bb8a76f130ff48a7efc3710df2d
Author: massakam <[email protected]>
AuthorDate: Sat Jul 20 09:25:24 2019 +0900

    Introduce strand to C++ client for exclusive control (#4750)
---
 pulsar-client-cpp/lib/ClientConnection.cc | 58 ++++++++++++++++++++++++++-----
 pulsar-client-cpp/lib/ClientConnection.h  | 21 +++++++++--
 2 files changed, 68 insertions(+), 11 deletions(-)

diff --git a/pulsar-client-cpp/lib/ClientConnection.cc 
b/pulsar-client-cpp/lib/ClientConnection.cc
index d6a962a..f79cd42 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -138,6 +138,13 @@ ClientConnection::ClientConnection(const std::string& 
logicalAddress, const std:
       executor_(executor),
       resolver_(executor->createTcpResolver()),
       socket_(executor->createSocket()),
+#if BOOST_VERSION >= 107000
+      strand_(boost::asio::make_strand(executor_->io_service_.get_executor())),
+#elif BOOST_VERSION >= 106600
+      strand_(executor_->io_service_.get_executor()),
+#else
+      strand_(executor_->io_service_),
+#endif
       logicalAddress_(logicalAddress),
       physicalAddress_(physicalAddress),
       cnxString_("[<none> -> " + physicalAddress + "] "),
@@ -344,9 +351,16 @@ void ClientConnection::handleTcpConnected(const 
boost::system::error_code& err,
                     return;
                 }
             }
+#if BOOST_VERSION >= 106600
             tlsSocket_->async_handshake(
                 boost::asio::ssl::stream<tcp::socket>::client,
-                std::bind(&ClientConnection::handleHandshake, 
shared_from_this(), std::placeholders::_1));
+                boost::asio::bind_executor(strand_, 
std::bind(&ClientConnection::handleHandshake,
+                                                              
shared_from_this(), std::placeholders::_1)));
+#else
+            
tlsSocket_->async_handshake(boost::asio::ssl::stream<tcp::socket>::client,
+                                        
strand_.wrap(std::bind(&ClientConnection::handleHandshake,
+                                                               
shared_from_this(), std::placeholders::_1)));
+#endif
         } else {
             
handleHandshake(boost::system::errc::make_error_code(boost::system::errc::success));
         }
@@ -1146,31 +1160,57 @@ void ClientConnection::sendCommand(const SharedBuffer& 
cmd) {
 
     if (pendingWriteOperations_++ == 0) {
         // Write immediately to socket
-        asyncWrite(cmd.const_asio_buffer(),
-                   
customAllocWriteHandler(std::bind(&ClientConnection::handleSend, 
shared_from_this(),
-                                                     std::placeholders::_1, 
cmd)));
+        if (tlsSocket_) {
+#if BOOST_VERSION >= 106600
+            boost::asio::post(strand_,
+                              
std::bind(&ClientConnection::sendCommandInternal, shared_from_this(), cmd));
+#else
+            strand_.post(std::bind(&ClientConnection::sendCommandInternal, 
shared_from_this(), cmd));
+#endif
+        } else {
+            sendCommandInternal(cmd);
+        }
     } else {
         // Queue to send later
         pendingWriteBuffers_.push_back(cmd);
     }
 }
 
+void ClientConnection::sendCommandInternal(const SharedBuffer& cmd) {
+    asyncWrite(cmd.const_asio_buffer(),
+               customAllocWriteHandler(
+                   std::bind(&ClientConnection::handleSend, 
shared_from_this(), std::placeholders::_1, cmd)));
+}
+
 void ClientConnection::sendMessage(const OpSendMsg& opSend) {
     Lock lock(mutex_);
 
     if (pendingWriteOperations_++ == 0) {
-        PairSharedBuffer buffer = Commands::newSend(outgoingBuffer_, 
outgoingCmd_, opSend.producerId_,
-                                                    opSend.sequenceId_, 
getChecksumType(), opSend.msg_);
-
         // Write immediately to socket
-        asyncWrite(buffer, 
customAllocWriteHandler(std::bind(&ClientConnection::handleSendPair,
-                                                             
shared_from_this(), std::placeholders::_1)));
+        if (tlsSocket_) {
+#if BOOST_VERSION >= 106600
+            boost::asio::post(strand_,
+                              
std::bind(&ClientConnection::sendMessageInternal, shared_from_this(), opSend));
+#else
+            strand_.post(std::bind(&ClientConnection::sendMessageInternal, 
shared_from_this(), opSend));
+#endif
+        } else {
+            sendMessageInternal(opSend);
+        }
     } else {
         // Queue to send later
         pendingWriteBuffers_.push_back(opSend);
     }
 }
 
+void ClientConnection::sendMessageInternal(const OpSendMsg& opSend) {
+    PairSharedBuffer buffer = Commands::newSend(outgoingBuffer_, outgoingCmd_, 
opSend.producerId_,
+                                                opSend.sequenceId_, 
getChecksumType(), opSend.msg_);
+
+    asyncWrite(buffer, 
customAllocWriteHandler(std::bind(&ClientConnection::handleSendPair,
+                                                         shared_from_this(), 
std::placeholders::_1)));
+}
+
 void ClientConnection::handleSend(const boost::system::error_code& err, const 
SharedBuffer&) {
     if (err) {
         LOG_WARN(cnxString_ << "Could not send message on connection: " << err 
<< " " << err.message());
diff --git a/pulsar-client-cpp/lib/ClientConnection.h 
b/pulsar-client-cpp/lib/ClientConnection.h
index b16f8c6..e350eaf 100644
--- a/pulsar-client-cpp/lib/ClientConnection.h
+++ b/pulsar-client-cpp/lib/ClientConnection.h
@@ -24,6 +24,7 @@
 
 #include <boost/asio.hpp>
 #include <boost/asio/ssl.hpp>
+#include <boost/asio/strand.hpp>
 #include <boost/any.hpp>
 #include <mutex>
 #include <functional>
@@ -125,7 +126,9 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
                                       LookupDataResultPromisePtr promise);
 
     void sendCommand(const SharedBuffer& cmd);
+    void sendCommandInternal(const SharedBuffer& cmd);
     void sendMessage(const OpSendMsg& opSend);
+    void sendMessageInternal(const OpSendMsg& opSend);
 
     void registerProducer(int producerId, ProducerImplPtr producer);
     void registerConsumer(int consumerId, ConsumerImplPtr consumer);
@@ -220,7 +223,11 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
     template <typename ConstBufferSequence, typename WriteHandler>
     inline void asyncWrite(const ConstBufferSequence& buffers, WriteHandler 
handler) {
         if (tlsSocket_) {
-            boost::asio::async_write(*tlsSocket_, buffers, handler);
+#if BOOST_VERSION >= 106600
+            boost::asio::async_write(*tlsSocket_, buffers, 
boost::asio::bind_executor(strand_, handler));
+#else
+            boost::asio::async_write(*tlsSocket_, buffers, 
strand_.wrap(handler));
+#endif
         } else {
             boost::asio::async_write(*socket_, buffers, handler);
         }
@@ -229,7 +236,11 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
     template <typename MutableBufferSequence, typename ReadHandler>
     inline void asyncReceive(const MutableBufferSequence& buffers, ReadHandler 
handler) {
         if (tlsSocket_) {
-            tlsSocket_->async_read_some(buffers, handler);
+#if BOOST_VERSION >= 106600
+            tlsSocket_->async_read_some(buffers, 
boost::asio::bind_executor(strand_, handler));
+#else
+            tlsSocket_->async_read_some(buffers, strand_.wrap(handler));
+#endif
         } else {
             socket_->async_receive(buffers, handler);
         }
@@ -319,6 +330,12 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
     friend class PulsarFriend;
 
     bool isTlsAllowInsecureConnection_;
+
+#if BOOST_VERSION >= 106600
+    boost::asio::strand<boost::asio::io_service::executor_type> strand_;
+#else
+    boost::asio::io_service::strand strand_;
+#endif
 };
 }  // namespace pulsar
 

Reply via email to