This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d8f6155 Introduce strand to C++ client for exclusive control (#4750)
d8f6155 is described below
commit d8f61553dab65bb8a76f130ff48a7efc3710df2d
Author: massakam <[email protected]>
AuthorDate: Sat Jul 20 09:25:24 2019 +0900
Introduce strand to C++ client for exclusive control (#4750)
---
pulsar-client-cpp/lib/ClientConnection.cc | 58 ++++++++++++++++++++++++++-----
pulsar-client-cpp/lib/ClientConnection.h | 21 +++++++++--
2 files changed, 68 insertions(+), 11 deletions(-)
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc
b/pulsar-client-cpp/lib/ClientConnection.cc
index d6a962a..f79cd42 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -138,6 +138,13 @@ ClientConnection::ClientConnection(const std::string&
logicalAddress, const std:
executor_(executor),
resolver_(executor->createTcpResolver()),
socket_(executor->createSocket()),
+#if BOOST_VERSION >= 107000
+ strand_(boost::asio::make_strand(executor_->io_service_.get_executor())),
+#elif BOOST_VERSION >= 106600
+ strand_(executor_->io_service_.get_executor()),
+#else
+ strand_(executor_->io_service_),
+#endif
logicalAddress_(logicalAddress),
physicalAddress_(physicalAddress),
cnxString_("[<none> -> " + physicalAddress + "] "),
@@ -344,9 +351,16 @@ void ClientConnection::handleTcpConnected(const
boost::system::error_code& err,
return;
}
}
+#if BOOST_VERSION >= 106600
tlsSocket_->async_handshake(
boost::asio::ssl::stream<tcp::socket>::client,
- std::bind(&ClientConnection::handleHandshake,
shared_from_this(), std::placeholders::_1));
+ boost::asio::bind_executor(strand_,
std::bind(&ClientConnection::handleHandshake,
+
shared_from_this(), std::placeholders::_1)));
+#else
+
tlsSocket_->async_handshake(boost::asio::ssl::stream<tcp::socket>::client,
+
strand_.wrap(std::bind(&ClientConnection::handleHandshake,
+
shared_from_this(), std::placeholders::_1)));
+#endif
} else {
handleHandshake(boost::system::errc::make_error_code(boost::system::errc::success));
}
@@ -1146,31 +1160,57 @@ void ClientConnection::sendCommand(const SharedBuffer&
cmd) {
if (pendingWriteOperations_++ == 0) {
// Write immediately to socket
- asyncWrite(cmd.const_asio_buffer(),
-
customAllocWriteHandler(std::bind(&ClientConnection::handleSend,
shared_from_this(),
- std::placeholders::_1,
cmd)));
+ if (tlsSocket_) {
+#if BOOST_VERSION >= 106600
+ boost::asio::post(strand_,
+
std::bind(&ClientConnection::sendCommandInternal, shared_from_this(), cmd));
+#else
+ strand_.post(std::bind(&ClientConnection::sendCommandInternal,
shared_from_this(), cmd));
+#endif
+ } else {
+ sendCommandInternal(cmd);
+ }
} else {
// Queue to send later
pendingWriteBuffers_.push_back(cmd);
}
}
+void ClientConnection::sendCommandInternal(const SharedBuffer& cmd) {
+ asyncWrite(cmd.const_asio_buffer(),
+ customAllocWriteHandler(
+ std::bind(&ClientConnection::handleSend,
shared_from_this(), std::placeholders::_1, cmd)));
+}
+
void ClientConnection::sendMessage(const OpSendMsg& opSend) {
Lock lock(mutex_);
if (pendingWriteOperations_++ == 0) {
- PairSharedBuffer buffer = Commands::newSend(outgoingBuffer_,
outgoingCmd_, opSend.producerId_,
- opSend.sequenceId_,
getChecksumType(), opSend.msg_);
-
// Write immediately to socket
- asyncWrite(buffer,
customAllocWriteHandler(std::bind(&ClientConnection::handleSendPair,
-
shared_from_this(), std::placeholders::_1)));
+ if (tlsSocket_) {
+#if BOOST_VERSION >= 106600
+ boost::asio::post(strand_,
+
std::bind(&ClientConnection::sendMessageInternal, shared_from_this(), opSend));
+#else
+ strand_.post(std::bind(&ClientConnection::sendMessageInternal,
shared_from_this(), opSend));
+#endif
+ } else {
+ sendMessageInternal(opSend);
+ }
} else {
// Queue to send later
pendingWriteBuffers_.push_back(opSend);
}
}
+void ClientConnection::sendMessageInternal(const OpSendMsg& opSend) {
+ PairSharedBuffer buffer = Commands::newSend(outgoingBuffer_, outgoingCmd_,
opSend.producerId_,
+ opSend.sequenceId_,
getChecksumType(), opSend.msg_);
+
+ asyncWrite(buffer,
customAllocWriteHandler(std::bind(&ClientConnection::handleSendPair,
+ shared_from_this(),
std::placeholders::_1)));
+}
+
void ClientConnection::handleSend(const boost::system::error_code& err, const
SharedBuffer&) {
if (err) {
LOG_WARN(cnxString_ << "Could not send message on connection: " << err
<< " " << err.message());
diff --git a/pulsar-client-cpp/lib/ClientConnection.h
b/pulsar-client-cpp/lib/ClientConnection.h
index b16f8c6..e350eaf 100644
--- a/pulsar-client-cpp/lib/ClientConnection.h
+++ b/pulsar-client-cpp/lib/ClientConnection.h
@@ -24,6 +24,7 @@
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
+#include <boost/asio/strand.hpp>
#include <boost/any.hpp>
#include <mutex>
#include <functional>
@@ -125,7 +126,9 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
LookupDataResultPromisePtr promise);
void sendCommand(const SharedBuffer& cmd);
+ void sendCommandInternal(const SharedBuffer& cmd);
void sendMessage(const OpSendMsg& opSend);
+ void sendMessageInternal(const OpSendMsg& opSend);
void registerProducer(int producerId, ProducerImplPtr producer);
void registerConsumer(int consumerId, ConsumerImplPtr consumer);
@@ -220,7 +223,11 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
template <typename ConstBufferSequence, typename WriteHandler>
inline void asyncWrite(const ConstBufferSequence& buffers, WriteHandler
handler) {
if (tlsSocket_) {
- boost::asio::async_write(*tlsSocket_, buffers, handler);
+#if BOOST_VERSION >= 106600
+ boost::asio::async_write(*tlsSocket_, buffers,
boost::asio::bind_executor(strand_, handler));
+#else
+ boost::asio::async_write(*tlsSocket_, buffers,
strand_.wrap(handler));
+#endif
} else {
boost::asio::async_write(*socket_, buffers, handler);
}
@@ -229,7 +236,11 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
template <typename MutableBufferSequence, typename ReadHandler>
inline void asyncReceive(const MutableBufferSequence& buffers, ReadHandler
handler) {
if (tlsSocket_) {
- tlsSocket_->async_read_some(buffers, handler);
+#if BOOST_VERSION >= 106600
+ tlsSocket_->async_read_some(buffers,
boost::asio::bind_executor(strand_, handler));
+#else
+ tlsSocket_->async_read_some(buffers, strand_.wrap(handler));
+#endif
} else {
socket_->async_receive(buffers, handler);
}
@@ -319,6 +330,12 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
friend class PulsarFriend;
bool isTlsAllowInsecureConnection_;
+
+#if BOOST_VERSION >= 106600
+ boost::asio::strand<boost::asio::io_service::executor_type> strand_;
+#else
+ boost::asio::io_service::strand strand_;
+#endif
};
} // namespace pulsar