This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 5c77648 Fix segmentation fault when sending messages after receiving
an error (#326)
5c77648 is described below
commit 5c77648d0029c4b11e0e062ab73474368ecc0e87
Author: Yunze Xu <[email protected]>
AuthorDate: Mon Oct 9 11:07:48 2023 +0800
Fix segmentation fault when sending messages after receiving an error (#326)
Fixes https://github.com/apache/pulsar-client-cpp/issues/325
### Motivation
https://github.com/apache/pulsar-client-cpp/pull/317 introduces a bug
that might cause segmentation fault when sending messages after
receiving an error, see
https://github.com/apache/pulsar-client-cpp/issues/325#issuecomment-1751914150
for the detailed explanation.
### Modifications
When calling `asyncWrite`, capture the `shared_ptr` instead of the
`weak_ptr` to extend the lifetime of the `socket_` or `tlsSocket_` field
in `ClientConnection`. Since the lifetime is extended, in some
callbacks, check `isClosed()` before other logic.
Add a `ChunkDedupTest` to reproduce this issue based on Pulsar 3.1.0.
Run the test for 10 times to ensure it won't crash after this patch.
---
lib/ClientConnection.cc | 90 ++++++++++++++++---------------------
run-unit-tests.sh | 9 +++-
tests/CMakeLists.txt | 3 ++
tests/chunkdedup/ChunkDedupTest.cc | 53 ++++++++++++++++++++++
tests/chunkdedup/docker-compose.yml | 45 +++++++++++++++++++
5 files changed, 148 insertions(+), 52 deletions(-)
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 6d18780..b43143a 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -519,18 +519,18 @@ void ClientConnection::handleHandshake(const
boost::system::error_code& err) {
return;
}
// Send CONNECT command to broker
- auto weakSelf = weak_from_this();
+ auto self = shared_from_this();
asyncWrite(buffer.const_asio_buffer(),
- customAllocWriteHandler([weakSelf, buffer](const
boost::system::error_code& err, size_t) {
- auto self = weakSelf.lock();
- if (self) {
- self->handleSentPulsarConnect(err, buffer);
- }
+ customAllocWriteHandler([this, self, buffer](const
boost::system::error_code& err, size_t) {
+ handleSentPulsarConnect(err, buffer);
}));
}
void ClientConnection::handleSentPulsarConnect(const
boost::system::error_code& err,
const SharedBuffer& buffer) {
+ if (isClosed()) {
+ return;
+ }
if (err) {
LOG_ERROR(cnxString_ << "Failed to establish connection: " <<
err.message());
close();
@@ -543,6 +543,9 @@ void ClientConnection::handleSentPulsarConnect(const
boost::system::error_code&
void ClientConnection::handleSentAuthResponse(const boost::system::error_code&
err,
const SharedBuffer& buffer) {
+ if (isClosed()) {
+ return;
+ }
if (err) {
LOG_WARN(cnxString_ << "Failed to send auth response: " <<
err.message());
close();
@@ -650,6 +653,9 @@ void ClientConnection::readNextCommand() {
void ClientConnection::handleRead(const boost::system::error_code& err, size_t
bytesTransferred,
uint32_t minReadSize) {
+ if (isClosed()) {
+ return;
+ }
// Update buffer write idx with new data
incomingBuffer_.bytesWritten(bytesTransferred);
@@ -1085,15 +1091,10 @@ void ClientConnection::sendCommand(const SharedBuffer&
cmd) {
}
void ClientConnection::sendCommandInternal(const SharedBuffer& cmd) {
- auto weakSelf = weak_from_this();
+ auto self = shared_from_this();
asyncWrite(cmd.const_asio_buffer(),
- customAllocWriteHandler(
- [weakSelf, cmd](const boost::system::error_code& err,
size_t bytesTransferred) {
- auto self = weakSelf.lock();
- if (self) {
- self->handleSend(err, cmd);
- }
- }));
+ customAllocWriteHandler([this, self, cmd](const
boost::system::error_code& err,
+ size_t
bytesTransferred) { handleSend(err, cmd); }));
}
void ClientConnection::sendMessage(const std::shared_ptr<SendArguments>& args)
{
@@ -1102,23 +1103,15 @@ void ClientConnection::sendMessage(const
std::shared_ptr<SendArguments>& args) {
pendingWriteBuffers_.emplace_back(args);
return;
}
- auto weakSelf = weak_from_this();
- auto sendMessageInternal = [this, weakSelf, args] {
- auto self = weakSelf.lock();
- if (!self) {
- return;
- }
+ auto self = shared_from_this();
+ auto sendMessageInternal = [this, self, args] {
BaseCommand outgoingCmd;
auto buffer = Commands::newSend(outgoingBuffer_, outgoingCmd,
getChecksumType(), *args);
// Capture the buffer because asio does not copy the buffer, if the
buffer is destroyed before the
// callback is called, an invalid buffer range might be passed to the
underlying socket send.
- asyncWrite(buffer, customAllocWriteHandler([weakSelf, buffer](const
boost::system::error_code& err,
- size_t
bytesTransferred) {
- auto self = weakSelf.lock();
- if (self) {
- self->handleSendPair(err);
- }
- }));
+ asyncWrite(buffer, customAllocWriteHandler(
+ [this, self, buffer](const
boost::system::error_code& err,
+ size_t bytesTransferred) {
handleSendPair(err); }));
};
if (tlsSocket_) {
#if BOOST_VERSION >= 106600
@@ -1132,6 +1125,9 @@ void ClientConnection::sendMessage(const
std::shared_ptr<SendArguments>& args) {
}
void ClientConnection::handleSend(const boost::system::error_code& err, const
SharedBuffer&) {
+ if (isClosed()) {
+ return;
+ }
if (err) {
LOG_WARN(cnxString_ << "Could not send message on connection: " << err
<< " " << err.message());
close(ResultDisconnected);
@@ -1141,6 +1137,9 @@ void ClientConnection::handleSend(const
boost::system::error_code& err, const Sh
}
void ClientConnection::handleSendPair(const boost::system::error_code& err) {
+ if (isClosed()) {
+ return;
+ }
if (err) {
LOG_WARN(cnxString_ << "Could not send pair message on connection: "
<< err << " " << err.message());
close(ResultDisconnected);
@@ -1157,17 +1156,12 @@ void ClientConnection::sendPendingCommands() {
boost::any any = pendingWriteBuffers_.front();
pendingWriteBuffers_.pop_front();
- auto weakSelf = weak_from_this();
+ auto self = shared_from_this();
if (any.type() == typeid(SharedBuffer)) {
SharedBuffer buffer = boost::any_cast<SharedBuffer>(any);
- asyncWrite(
- buffer.const_asio_buffer(),
- customAllocWriteHandler([weakSelf, buffer](const
boost::system::error_code& err, size_t) {
- auto self = weakSelf.lock();
- if (self) {
- self->handleSend(err, buffer);
- }
- }));
+ asyncWrite(buffer.const_asio_buffer(),
+ customAllocWriteHandler([this, self, buffer](const
boost::system::error_code& err,
+ size_t) {
handleSend(err, buffer); }));
} else {
assert(any.type() == typeid(std::shared_ptr<SendArguments>));
@@ -1178,13 +1172,9 @@ void ClientConnection::sendPendingCommands() {
// Capture the buffer because asio does not copy the buffer, if
the buffer is destroyed before the
// callback is called, an invalid buffer range might be passed to
the underlying socket send.
- asyncWrite(buffer, customAllocWriteHandler(
- [weakSelf, buffer](const
boost::system::error_code& err, size_t) {
- auto self = weakSelf.lock();
- if (self) {
- self->handleSendPair(err);
- }
- }));
+ asyncWrite(buffer,
+ customAllocWriteHandler([this, self, buffer](const
boost::system::error_code& err,
+ size_t) {
handleSendPair(err); }));
}
} else {
// No more pending writes
@@ -1334,10 +1324,11 @@ void ClientConnection::close(Result result, bool
detach) {
}
lock.unlock();
+ int refCount = weak_from_this().use_count();
if (!isResultRetryable(result)) {
- LOG_ERROR(cnxString_ << "Connection closed with " << result);
+ LOG_ERROR(cnxString_ << "Connection closed with " << result << "
(refCnt: " << refCount << ")");
} else {
- LOG_INFO(cnxString_ << "Connection disconnected");
+ LOG_INFO(cnxString_ << "Connection disconnected (refCnt: " << refCount
<< ")");
}
// Remove the connection from the pool before completing any promise
if (detach) {
@@ -1824,13 +1815,10 @@ void ClientConnection::handleAuthChallenge() {
close(result);
return;
}
- auto weakSelf = weak_from_this();
+ auto self = shared_from_this();
asyncWrite(buffer.const_asio_buffer(),
- customAllocWriteHandler([weakSelf, buffer](const
boost::system::error_code& err, size_t) {
- auto self = weakSelf.lock();
- if (self) {
- self->handleSentAuthResponse(err, buffer);
- }
+ customAllocWriteHandler([this, self, buffer](const
boost::system::error_code& err, size_t) {
+ handleSentAuthResponse(err, buffer);
}));
}
diff --git a/run-unit-tests.sh b/run-unit-tests.sh
index 8ea834c..693267f 100755
--- a/run-unit-tests.sh
+++ b/run-unit-tests.sh
@@ -40,10 +40,17 @@ docker compose -f tests/oauth2/docker-compose.yml down
# Run BrokerMetadata tests
docker compose -f tests/brokermetadata/docker-compose.yml up -d
-sleep 15
+until curl http://localhost:8080/metrics > /dev/null 2>&1 ; do sleep 1; done
+sleep 5
$CMAKE_BUILD_DIRECTORY/tests/BrokerMetadataTest
docker compose -f tests/brokermetadata/docker-compose.yml down
+docker compose -f tests/chunkdedup/docker-compose.yml up -d
+until curl http://localhost:8080/metrics > /dev/null 2>&1 ; do sleep 1; done
+sleep 5
+$CMAKE_BUILD_DIRECTORY/tests/ChunkDedupTest --gtest_repeat=10
+docker compose -f tests/chunkdedup/docker-compose.yml down
+
./pulsar-test-service-start.sh
pushd $CMAKE_BUILD_DIRECTORY/tests
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 6b46425..b373196 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -71,3 +71,6 @@ target_link_libraries(BrokerMetadataTest ${CLIENT_LIBS}
pulsarStatic ${GTEST_LIB
add_executable(Oauth2Test oauth2/Oauth2Test.cc)
target_compile_options(Oauth2Test PRIVATE
"-DTEST_ROOT_PATH=\"${CMAKE_CURRENT_SOURCE_DIR}\"")
target_link_libraries(Oauth2Test ${CLIENT_LIBS} pulsarStatic
${GTEST_LIBRARY_PATH})
+
+add_executable(ChunkDedupTest chunkdedup/ChunkDedupTest.cc)
+target_link_libraries(ChunkDedupTest ${CLIENT_LIBS} pulsarStatic
${GTEST_LIBRARY_PATH})
diff --git a/tests/chunkdedup/ChunkDedupTest.cc
b/tests/chunkdedup/ChunkDedupTest.cc
new file mode 100644
index 0000000..609511f
--- /dev/null
+++ b/tests/chunkdedup/ChunkDedupTest.cc
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+
+#include "lib/Latch.h"
+#include "lib/LogUtils.h"
+
+DECLARE_LOG_OBJECT()
+
+using namespace pulsar;
+
+// Before https://github.com/apache/pulsar/pull/20948, when message
deduplication is enabled, sending chunks
+// to the broker will receive send error response.
+TEST(ChunkDedupTest, testSendChunks) {
+ Client client{"pulsar://localhost:6650"};
+ ProducerConfiguration conf;
+ conf.setBatchingEnabled(false);
+ conf.setChunkingEnabled(true);
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer("test-send-chunks", conf,
producer));
+
+ Latch latch{1};
+ std::string value(1024000 /* max message size */ * 100, 'a');
+ producer.sendAsync(MessageBuilder().setContent(value).build(),
+ [&latch](Result result, const MessageId& msgId) {
+ LOG_INFO("Send to " << msgId << ": " << result);
+ latch.countdown();
+ });
+ ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
+ client.close();
+}
+
+int main(int argc, char* argv[]) {
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/tests/chunkdedup/docker-compose.yml
b/tests/chunkdedup/docker-compose.yml
new file mode 100644
index 0000000..6aed8c4
--- /dev/null
+++ b/tests/chunkdedup/docker-compose.yml
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+version: '3'
+networks:
+ pulsar:
+ driver: bridge
+services:
+ standalone:
+ # Don't change the version here to ensure
https://github.com/apache/pulsar/pull/20948 is not included
+ image: apachepulsar/pulsar:3.1.0
+ container_name: standalone
+ hostname: local
+ restart: "no"
+ networks:
+ - pulsar
+ environment:
+ - metadataStoreUrl=zk:localhost:2181
+ - clusterName=standalone
+ - advertisedAddress=localhost
+ - advertisedListeners=external:pulsar://localhost:6650
+ - PULSAR_MEM=-Xms512m -Xmx512m -XX:MaxDirectMemorySize=256m
+ - PULSAR_PREFIX_maxMessageSize=1024000
+ - PULSAR_PREFIX_brokerDeduplicationEnabled=true
+ ports:
+ - "6650:6650"
+ - "8080:8080"
+ command: bash -c "bin/apply-config-from-env.py conf/standalone.conf &&
exec bin/pulsar standalone -nss -nfw"
+