This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 5c77648  Fix segmentation fault when sending messages after receiving 
an error (#326)
5c77648 is described below

commit 5c77648d0029c4b11e0e062ab73474368ecc0e87
Author: Yunze Xu <[email protected]>
AuthorDate: Mon Oct 9 11:07:48 2023 +0800

    Fix segmentation fault when sending messages after receiving an error (#326)
    
    Fixes https://github.com/apache/pulsar-client-cpp/issues/325
    
    ### Motivation
    
    https://github.com/apache/pulsar-client-cpp/pull/317 introduces a bug
    that might cause segmentation fault when sending messages after
    receiving an error, see
    
https://github.com/apache/pulsar-client-cpp/issues/325#issuecomment-1751914150
    for the detailed explanation.
    
    ### Modifications
    
    When calling `asyncWrite`, capture the `shared_ptr` instead of the
    `weak_ptr` to extend the lifetime of the `socket_` or `tlsSocket_` field
    in `ClientConnection`. Since the lifetime is extended, in some
    callbacks, check `isClosed()` before other logic.
    
    Add a `ChunkDedupTest` to reproduce this issue based on Pulsar 3.1.0.
    Run the test for 10 times to ensure it won't crash after this patch.
---
 lib/ClientConnection.cc             | 90 ++++++++++++++++---------------------
 run-unit-tests.sh                   |  9 +++-
 tests/CMakeLists.txt                |  3 ++
 tests/chunkdedup/ChunkDedupTest.cc  | 53 ++++++++++++++++++++++
 tests/chunkdedup/docker-compose.yml | 45 +++++++++++++++++++
 5 files changed, 148 insertions(+), 52 deletions(-)

diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 6d18780..b43143a 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -519,18 +519,18 @@ void ClientConnection::handleHandshake(const 
boost::system::error_code& err) {
         return;
     }
     // Send CONNECT command to broker
-    auto weakSelf = weak_from_this();
+    auto self = shared_from_this();
     asyncWrite(buffer.const_asio_buffer(),
-               customAllocWriteHandler([weakSelf, buffer](const 
boost::system::error_code& err, size_t) {
-                   auto self = weakSelf.lock();
-                   if (self) {
-                       self->handleSentPulsarConnect(err, buffer);
-                   }
+               customAllocWriteHandler([this, self, buffer](const 
boost::system::error_code& err, size_t) {
+                   handleSentPulsarConnect(err, buffer);
                }));
 }
 
 void ClientConnection::handleSentPulsarConnect(const 
boost::system::error_code& err,
                                                const SharedBuffer& buffer) {
+    if (isClosed()) {
+        return;
+    }
     if (err) {
         LOG_ERROR(cnxString_ << "Failed to establish connection: " << 
err.message());
         close();
@@ -543,6 +543,9 @@ void ClientConnection::handleSentPulsarConnect(const 
boost::system::error_code&
 
 void ClientConnection::handleSentAuthResponse(const boost::system::error_code& 
err,
                                               const SharedBuffer& buffer) {
+    if (isClosed()) {
+        return;
+    }
     if (err) {
         LOG_WARN(cnxString_ << "Failed to send auth response: " << 
err.message());
         close();
@@ -650,6 +653,9 @@ void ClientConnection::readNextCommand() {
 
 void ClientConnection::handleRead(const boost::system::error_code& err, size_t 
bytesTransferred,
                                   uint32_t minReadSize) {
+    if (isClosed()) {
+        return;
+    }
     // Update buffer write idx with new data
     incomingBuffer_.bytesWritten(bytesTransferred);
 
@@ -1085,15 +1091,10 @@ void ClientConnection::sendCommand(const SharedBuffer& 
cmd) {
 }
 
 void ClientConnection::sendCommandInternal(const SharedBuffer& cmd) {
-    auto weakSelf = weak_from_this();
+    auto self = shared_from_this();
     asyncWrite(cmd.const_asio_buffer(),
-               customAllocWriteHandler(
-                   [weakSelf, cmd](const boost::system::error_code& err, 
size_t bytesTransferred) {
-                       auto self = weakSelf.lock();
-                       if (self) {
-                           self->handleSend(err, cmd);
-                       }
-                   }));
+               customAllocWriteHandler([this, self, cmd](const 
boost::system::error_code& err,
+                                                         size_t 
bytesTransferred) { handleSend(err, cmd); }));
 }
 
 void ClientConnection::sendMessage(const std::shared_ptr<SendArguments>& args) 
{
@@ -1102,23 +1103,15 @@ void ClientConnection::sendMessage(const 
std::shared_ptr<SendArguments>& args) {
         pendingWriteBuffers_.emplace_back(args);
         return;
     }
-    auto weakSelf = weak_from_this();
-    auto sendMessageInternal = [this, weakSelf, args] {
-        auto self = weakSelf.lock();
-        if (!self) {
-            return;
-        }
+    auto self = shared_from_this();
+    auto sendMessageInternal = [this, self, args] {
         BaseCommand outgoingCmd;
         auto buffer = Commands::newSend(outgoingBuffer_, outgoingCmd, 
getChecksumType(), *args);
         // Capture the buffer because asio does not copy the buffer, if the 
buffer is destroyed before the
         // callback is called, an invalid buffer range might be passed to the 
underlying socket send.
-        asyncWrite(buffer, customAllocWriteHandler([weakSelf, buffer](const 
boost::system::error_code& err,
-                                                                      size_t 
bytesTransferred) {
-                       auto self = weakSelf.lock();
-                       if (self) {
-                           self->handleSendPair(err);
-                       }
-                   }));
+        asyncWrite(buffer, customAllocWriteHandler(
+                               [this, self, buffer](const 
boost::system::error_code& err,
+                                                    size_t bytesTransferred) { 
handleSendPair(err); }));
     };
     if (tlsSocket_) {
 #if BOOST_VERSION >= 106600
@@ -1132,6 +1125,9 @@ void ClientConnection::sendMessage(const 
std::shared_ptr<SendArguments>& args) {
 }
 
 void ClientConnection::handleSend(const boost::system::error_code& err, const 
SharedBuffer&) {
+    if (isClosed()) {
+        return;
+    }
     if (err) {
         LOG_WARN(cnxString_ << "Could not send message on connection: " << err 
<< " " << err.message());
         close(ResultDisconnected);
@@ -1141,6 +1137,9 @@ void ClientConnection::handleSend(const 
boost::system::error_code& err, const Sh
 }
 
 void ClientConnection::handleSendPair(const boost::system::error_code& err) {
+    if (isClosed()) {
+        return;
+    }
     if (err) {
         LOG_WARN(cnxString_ << "Could not send pair message on connection: " 
<< err << " " << err.message());
         close(ResultDisconnected);
@@ -1157,17 +1156,12 @@ void ClientConnection::sendPendingCommands() {
         boost::any any = pendingWriteBuffers_.front();
         pendingWriteBuffers_.pop_front();
 
-        auto weakSelf = weak_from_this();
+        auto self = shared_from_this();
         if (any.type() == typeid(SharedBuffer)) {
             SharedBuffer buffer = boost::any_cast<SharedBuffer>(any);
-            asyncWrite(
-                buffer.const_asio_buffer(),
-                customAllocWriteHandler([weakSelf, buffer](const 
boost::system::error_code& err, size_t) {
-                    auto self = weakSelf.lock();
-                    if (self) {
-                        self->handleSend(err, buffer);
-                    }
-                }));
+            asyncWrite(buffer.const_asio_buffer(),
+                       customAllocWriteHandler([this, self, buffer](const 
boost::system::error_code& err,
+                                                                    size_t) { 
handleSend(err, buffer); }));
         } else {
             assert(any.type() == typeid(std::shared_ptr<SendArguments>));
 
@@ -1178,13 +1172,9 @@ void ClientConnection::sendPendingCommands() {
 
             // Capture the buffer because asio does not copy the buffer, if 
the buffer is destroyed before the
             // callback is called, an invalid buffer range might be passed to 
the underlying socket send.
-            asyncWrite(buffer, customAllocWriteHandler(
-                                   [weakSelf, buffer](const 
boost::system::error_code& err, size_t) {
-                                       auto self = weakSelf.lock();
-                                       if (self) {
-                                           self->handleSendPair(err);
-                                       }
-                                   }));
+            asyncWrite(buffer,
+                       customAllocWriteHandler([this, self, buffer](const 
boost::system::error_code& err,
+                                                                    size_t) { 
handleSendPair(err); }));
         }
     } else {
         // No more pending writes
@@ -1334,10 +1324,11 @@ void ClientConnection::close(Result result, bool 
detach) {
     }
 
     lock.unlock();
+    int refCount = weak_from_this().use_count();
     if (!isResultRetryable(result)) {
-        LOG_ERROR(cnxString_ << "Connection closed with " << result);
+        LOG_ERROR(cnxString_ << "Connection closed with " << result << " 
(refCnt: " << refCount << ")");
     } else {
-        LOG_INFO(cnxString_ << "Connection disconnected");
+        LOG_INFO(cnxString_ << "Connection disconnected (refCnt: " << refCount 
<< ")");
     }
     // Remove the connection from the pool before completing any promise
     if (detach) {
@@ -1824,13 +1815,10 @@ void ClientConnection::handleAuthChallenge() {
         close(result);
         return;
     }
-    auto weakSelf = weak_from_this();
+    auto self = shared_from_this();
     asyncWrite(buffer.const_asio_buffer(),
-               customAllocWriteHandler([weakSelf, buffer](const 
boost::system::error_code& err, size_t) {
-                   auto self = weakSelf.lock();
-                   if (self) {
-                       self->handleSentAuthResponse(err, buffer);
-                   }
+               customAllocWriteHandler([this, self, buffer](const 
boost::system::error_code& err, size_t) {
+                   handleSentAuthResponse(err, buffer);
                }));
 }
 
diff --git a/run-unit-tests.sh b/run-unit-tests.sh
index 8ea834c..693267f 100755
--- a/run-unit-tests.sh
+++ b/run-unit-tests.sh
@@ -40,10 +40,17 @@ docker compose -f tests/oauth2/docker-compose.yml down
 
 # Run BrokerMetadata tests
 docker compose -f tests/brokermetadata/docker-compose.yml up -d
-sleep 15
+until curl http://localhost:8080/metrics > /dev/null 2>&1 ; do sleep 1; done
+sleep 5
 $CMAKE_BUILD_DIRECTORY/tests/BrokerMetadataTest
 docker compose -f tests/brokermetadata/docker-compose.yml down
 
+docker compose -f tests/chunkdedup/docker-compose.yml up -d
+until curl http://localhost:8080/metrics > /dev/null 2>&1 ; do sleep 1; done
+sleep 5
+$CMAKE_BUILD_DIRECTORY/tests/ChunkDedupTest --gtest_repeat=10
+docker compose -f tests/chunkdedup/docker-compose.yml down
+
 ./pulsar-test-service-start.sh
 
 pushd $CMAKE_BUILD_DIRECTORY/tests
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 6b46425..b373196 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -71,3 +71,6 @@ target_link_libraries(BrokerMetadataTest ${CLIENT_LIBS} 
pulsarStatic ${GTEST_LIB
 add_executable(Oauth2Test oauth2/Oauth2Test.cc)
 target_compile_options(Oauth2Test PRIVATE 
"-DTEST_ROOT_PATH=\"${CMAKE_CURRENT_SOURCE_DIR}\"")
 target_link_libraries(Oauth2Test ${CLIENT_LIBS} pulsarStatic 
${GTEST_LIBRARY_PATH})
+
+add_executable(ChunkDedupTest chunkdedup/ChunkDedupTest.cc)
+target_link_libraries(ChunkDedupTest ${CLIENT_LIBS} pulsarStatic 
${GTEST_LIBRARY_PATH})
diff --git a/tests/chunkdedup/ChunkDedupTest.cc 
b/tests/chunkdedup/ChunkDedupTest.cc
new file mode 100644
index 0000000..609511f
--- /dev/null
+++ b/tests/chunkdedup/ChunkDedupTest.cc
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+
+#include "lib/Latch.h"
+#include "lib/LogUtils.h"
+
+DECLARE_LOG_OBJECT()
+
+using namespace pulsar;
+
+// Before https://github.com/apache/pulsar/pull/20948, when message 
deduplication is enabled, sending chunks
+// to the broker will receive send error response.
+TEST(ChunkDedupTest, testSendChunks) {
+    Client client{"pulsar://localhost:6650"};
+    ProducerConfiguration conf;
+    conf.setBatchingEnabled(false);
+    conf.setChunkingEnabled(true);
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer("test-send-chunks", conf, 
producer));
+
+    Latch latch{1};
+    std::string value(1024000 /* max message size */ * 100, 'a');
+    producer.sendAsync(MessageBuilder().setContent(value).build(),
+                       [&latch](Result result, const MessageId& msgId) {
+                           LOG_INFO("Send to " << msgId << ": " << result);
+                           latch.countdown();
+                       });
+    ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
+    client.close();
+}
+
+int main(int argc, char* argv[]) {
+    ::testing::InitGoogleTest(&argc, argv);
+    return RUN_ALL_TESTS();
+}
diff --git a/tests/chunkdedup/docker-compose.yml 
b/tests/chunkdedup/docker-compose.yml
new file mode 100644
index 0000000..6aed8c4
--- /dev/null
+++ b/tests/chunkdedup/docker-compose.yml
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+version: '3'
+networks:
+  pulsar:
+    driver: bridge
+services:
+  standalone:
+    # Don't change the version here to ensure 
https://github.com/apache/pulsar/pull/20948 is not included
+    image: apachepulsar/pulsar:3.1.0
+    container_name: standalone
+    hostname: local
+    restart: "no"
+    networks:
+      - pulsar
+    environment:
+      - metadataStoreUrl=zk:localhost:2181
+      - clusterName=standalone
+      - advertisedAddress=localhost
+      - advertisedListeners=external:pulsar://localhost:6650
+      - PULSAR_MEM=-Xms512m -Xmx512m -XX:MaxDirectMemorySize=256m
+      - PULSAR_PREFIX_maxMessageSize=1024000
+      - PULSAR_PREFIX_brokerDeduplicationEnabled=true
+    ports:
+      - "6650:6650"
+      - "8080:8080"
+    command: bash -c "bin/apply-config-from-env.py conf/standalone.conf && 
exec bin/pulsar standalone -nss -nfw"
+

Reply via email to