This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new a0f2d32  Fix segmentation fault caused by async_receive (#330)
a0f2d32 is described below

commit a0f2d322681ce6f57a295f204a8b67813b98c5c3
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Oct 20 10:30:27 2023 +0800

    Fix segmentation fault caused by async_receive (#330)
    
    ### Motivation
    
    https://github.com/apache/pulsar-client-cpp/pull/326 fixes the possible
    segmentation fault caused by async_write, but it could still crash when
    triggering the callback of async_receive while the socket is destroyed.
    See 
https://github.com/apache/pulsar-client-cpp/issues/324#issuecomment-1767325649
    
    ### Modifications
    
    Capture the `shared_ptr` in `asyncReceive`.
---
 lib/ClientConnection.cc | 44 ++++++++++++++++----------------------------
 1 file changed, 16 insertions(+), 28 deletions(-)

diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index b43143a..8f42535 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -640,14 +640,11 @@ void ClientConnection::handleResolve(const 
boost::system::error_code& err,
 
 void ClientConnection::readNextCommand() {
     const static uint32_t minReadSize = sizeof(uint32_t);
-    auto weakSelf = weak_from_this();
+    auto self = shared_from_this();
     asyncReceive(
         incomingBuffer_.asio_buffer(),
-        customAllocReadHandler([weakSelf](const boost::system::error_code& 
err, size_t bytesTransferred) {
-            auto self = weakSelf.lock();
-            if (self) {
-                self->handleRead(err, bytesTransferred, minReadSize);
-            }
+        customAllocReadHandler([this, self](const boost::system::error_code& 
err, size_t bytesTransferred) {
+            handleRead(err, bytesTransferred, minReadSize);
         }));
 }
 
@@ -672,15 +669,12 @@ void ClientConnection::handleRead(const 
boost::system::error_code& err, size_t b
         // Read the remaining part, use a slice of buffer to write on the next
         // region
         SharedBuffer buffer = incomingBuffer_.slice(bytesTransferred);
-        auto weakSelf = weak_from_this();
+        auto self = shared_from_this();
         auto nextMinReadSize = minReadSize - bytesTransferred;
-        asyncReceive(buffer.asio_buffer(),
-                     customAllocReadHandler([weakSelf, nextMinReadSize](const 
boost::system::error_code& err,
-                                                                        size_t 
bytesTransferred) {
-                         auto self = weakSelf.lock();
-                         if (self) {
-                             self->handleRead(err, bytesTransferred, 
nextMinReadSize);
-                         }
+        asyncReceive(buffer.asio_buffer(), customAllocReadHandler([this, self, 
nextMinReadSize](
+                                                                      const 
boost::system::error_code& err,
+                                                                      size_t 
bytesTransferred) {
+                         handleRead(err, bytesTransferred, nextMinReadSize);
                      }));
     } else {
         processIncomingBuffer();
@@ -707,15 +701,12 @@ void ClientConnection::processIncomingBuffer() {
                 uint32_t newBufferSize = std::max<uint32_t>(DefaultBufferSize, 
frameSize + sizeof(uint32_t));
                 incomingBuffer_ = SharedBuffer::copyFrom(incomingBuffer_, 
newBufferSize);
             }
-            auto weakSelf = weak_from_this();
+            auto self = shared_from_this();
             asyncReceive(
                 incomingBuffer_.asio_buffer(),
-                customAllocReadHandler([weakSelf, bytesToReceive](const 
boost::system::error_code& err,
-                                                                  size_t 
bytesTransferred) {
-                    auto self = weakSelf.lock();
-                    if (self) {
-                        self->handleRead(err, bytesTransferred, 
bytesToReceive);
-                    }
+                customAllocReadHandler([this, self, bytesToReceive](const 
boost::system::error_code& err,
+                                                                    size_t 
bytesTransferred) {
+                    handleRead(err, bytesTransferred, bytesToReceive);
                 }));
             return;
         }
@@ -793,14 +784,11 @@ void ClientConnection::processIncomingBuffer() {
         // At least we need to read 4 bytes to have the complete frame size
         uint32_t minReadSize = sizeof(uint32_t) - 
incomingBuffer_.readableBytes();
 
-        auto weakSelf = weak_from_this();
+        auto self = shared_from_this();
         asyncReceive(incomingBuffer_.asio_buffer(),
-                     customAllocReadHandler([weakSelf, minReadSize](const 
boost::system::error_code& err,
-                                                                    size_t 
bytesTransferred) {
-                         auto self = weakSelf.lock();
-                         if (self) {
-                             self->handleRead(err, bytesTransferred, 
minReadSize);
-                         }
+                     customAllocReadHandler([this, self, minReadSize](const 
boost::system::error_code& err,
+                                                                      size_t 
bytesTransferred) {
+                         handleRead(err, bytesTransferred, minReadSize);
                      }));
         return;
     }

Reply via email to