This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new a0f2d32 Fix segmentation fault caused by async_receive (#330)
a0f2d32 is described below
commit a0f2d322681ce6f57a295f204a8b67813b98c5c3
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Oct 20 10:30:27 2023 +0800
Fix segmentation fault caused by async_receive (#330)
### Motivation
https://github.com/apache/pulsar-client-cpp/pull/326 fixes the possible
segmentation fault caused by async_write, but it could still crash when
triggering the callback of async_receive while the socket is destroyed.
See
https://github.com/apache/pulsar-client-cpp/issues/324#issuecomment-1767325649
### Modifications
Capture the `shared_ptr` in `asyncReceive`.
---
lib/ClientConnection.cc | 44 ++++++++++++++++----------------------------
1 file changed, 16 insertions(+), 28 deletions(-)
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index b43143a..8f42535 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -640,14 +640,11 @@ void ClientConnection::handleResolve(const
boost::system::error_code& err,
void ClientConnection::readNextCommand() {
const static uint32_t minReadSize = sizeof(uint32_t);
- auto weakSelf = weak_from_this();
+ auto self = shared_from_this();
asyncReceive(
incomingBuffer_.asio_buffer(),
- customAllocReadHandler([weakSelf](const boost::system::error_code&
err, size_t bytesTransferred) {
- auto self = weakSelf.lock();
- if (self) {
- self->handleRead(err, bytesTransferred, minReadSize);
- }
+ customAllocReadHandler([this, self](const boost::system::error_code&
err, size_t bytesTransferred) {
+ handleRead(err, bytesTransferred, minReadSize);
}));
}
@@ -672,15 +669,12 @@ void ClientConnection::handleRead(const
boost::system::error_code& err, size_t b
// Read the remaining part, use a slice of buffer to write on the next
// region
SharedBuffer buffer = incomingBuffer_.slice(bytesTransferred);
- auto weakSelf = weak_from_this();
+ auto self = shared_from_this();
auto nextMinReadSize = minReadSize - bytesTransferred;
- asyncReceive(buffer.asio_buffer(),
- customAllocReadHandler([weakSelf, nextMinReadSize](const
boost::system::error_code& err,
- size_t
bytesTransferred) {
- auto self = weakSelf.lock();
- if (self) {
- self->handleRead(err, bytesTransferred,
nextMinReadSize);
- }
+ asyncReceive(buffer.asio_buffer(), customAllocReadHandler([this, self,
nextMinReadSize](
+ const
boost::system::error_code& err,
+ size_t
bytesTransferred) {
+ handleRead(err, bytesTransferred, nextMinReadSize);
}));
} else {
processIncomingBuffer();
@@ -707,15 +701,12 @@ void ClientConnection::processIncomingBuffer() {
uint32_t newBufferSize = std::max<uint32_t>(DefaultBufferSize,
frameSize + sizeof(uint32_t));
incomingBuffer_ = SharedBuffer::copyFrom(incomingBuffer_,
newBufferSize);
}
- auto weakSelf = weak_from_this();
+ auto self = shared_from_this();
asyncReceive(
incomingBuffer_.asio_buffer(),
- customAllocReadHandler([weakSelf, bytesToReceive](const
boost::system::error_code& err,
- size_t
bytesTransferred) {
- auto self = weakSelf.lock();
- if (self) {
- self->handleRead(err, bytesTransferred,
bytesToReceive);
- }
+ customAllocReadHandler([this, self, bytesToReceive](const
boost::system::error_code& err,
+ size_t
bytesTransferred) {
+ handleRead(err, bytesTransferred, bytesToReceive);
}));
return;
}
@@ -793,14 +784,11 @@ void ClientConnection::processIncomingBuffer() {
// At least we need to read 4 bytes to have the complete frame size
uint32_t minReadSize = sizeof(uint32_t) -
incomingBuffer_.readableBytes();
- auto weakSelf = weak_from_this();
+ auto self = shared_from_this();
asyncReceive(incomingBuffer_.asio_buffer(),
- customAllocReadHandler([weakSelf, minReadSize](const
boost::system::error_code& err,
- size_t
bytesTransferred) {
- auto self = weakSelf.lock();
- if (self) {
- self->handleRead(err, bytesTransferred,
minReadSize);
- }
+ customAllocReadHandler([this, self, minReadSize](const
boost::system::error_code& err,
+ size_t
bytesTransferred) {
+ handleRead(err, bytesTransferred, minReadSize);
}));
return;
}