This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch branch-3.6
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/branch-3.6 by this push:
new aba8d6d Fix buffer overflow for non-batched send when the message
metadata size exceeds 64KB (#443)
aba8d6d is described below
commit aba8d6d618e56df18966ce4d9e8084241a958e0d
Author: Yunze Xu <[email protected]>
AuthorDate: Thu Aug 29 22:10:22 2024 +0800
Fix buffer overflow for non-batched send when the message metadata size
exceeds 64KB (#443)
See https://github.com/apache/pulsar-client-python/issues/223
### Motivation
Currently a shared buffer is used to store serialized message metadata
for each send request. However, its capacity is only 64KB, when the metadata
size exceeds 64KB, buffer overflow could happen.
### Modifications
When the metadata size is too large, allocate a new buffer instead of
using the shared buffer. Add `testLargeProperties` to cover it.
(cherry picked from commit 8f269e837cbc20350e9a19505faca8e420a97d24)
---
lib/Commands.cc | 15 +++++++++++----
tests/ProducerTest.cc | 31 +++++++++++++++++++++++++++++++
2 files changed, 42 insertions(+), 4 deletions(-)
diff --git a/lib/Commands.cc b/lib/Commands.cc
index 4b10b73..84f272b 100644
--- a/lib/Commands.cc
+++ b/lib/Commands.cc
@@ -191,7 +191,7 @@ SharedBuffer Commands::newConsumerStats(uint64_t
consumerId, uint64_t requestId)
return buffer;
}
-PairSharedBuffer Commands::newSend(SharedBuffer& headers, BaseCommand& cmd,
ChecksumType checksumType,
+PairSharedBuffer Commands::newSend(SharedBuffer& originalHeaders, BaseCommand&
cmd, ChecksumType checksumType,
const SendArguments& args) {
cmd.set_type(BaseCommand::SEND);
CommandSend* send = cmd.mutable_send();
@@ -221,9 +221,16 @@ PairSharedBuffer Commands::newSend(SharedBuffer& headers,
BaseCommand& cmd, Chec
int totalSize = headerContentSize + payloadSize;
int checksumReaderIndex = -1;
- headers.reset();
- assert(headers.writableBytes() >= (4 + headerContentSize)); // totalSize
+ headerLength
- headers.writeUnsignedInt(totalSize); // External
frame
+ // By default, headers refers a static buffer whose capacity is 64KB,
which can be reused for headers to
+ // avoid frequent memory allocation. However, if users configure many
properties, the size could be great
+ // that results a buffer overflow. In this case, we can only allocate a
new larger buffer.
+ originalHeaders.reset();
+ auto headers = originalHeaders;
+ if (headers.writableBytes() < (4 /* header length */ + headerContentSize))
{
+ headers = SharedBuffer::allocate(4 + headerContentSize);
+ }
+
+ headers.writeUnsignedInt(totalSize); // External frame
// Write cmd
headers.writeUnsignedInt(cmdSize);
diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc
index 21e491d..d0d9eb4 100644
--- a/tests/ProducerTest.cc
+++ b/tests/ProducerTest.cc
@@ -683,4 +683,35 @@ TEST(ProducerTest, testFailedToCreateNewPartitionProducer)
{
client.close();
}
+TEST(ProducerTest, testLargeProperties) {
+ const std::string topic = "producer-test-large-properties-" +
std::to_string(time(nullptr));
+ Client client(serviceUrl);
+ Producer producer;
+ ProducerConfiguration conf;
+ conf.setBatchingEnabled(false);
+ ASSERT_EQ(ResultOk, client.createProducer(topic, conf, producer));
+ Consumer consumer;
+ ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));
+
+ MessageBuilder::StringMap properties;
+ constexpr int propertyCount = 20000;
+ auto builder = MessageBuilder().setContent("msg");
+ for (int i = 0; i < propertyCount; i++) {
+ builder.setProperty("key" + std::to_string(i), "value-" +
std::to_string(i));
+ }
+
+ // ASSERT_EQ(ResultOk,
+ //
producer.send(MessageBuilder().setContent("msg").setProperties(properties).build()));
+ ASSERT_EQ(ResultOk, producer.send(builder.build()));
+
+ Message msg;
+ ASSERT_EQ(ResultOk, consumer.receive(msg, 3000));
+ ASSERT_EQ(msg.getProperties().size(), propertyCount);
+ for (int i = 0; i < propertyCount; i++) {
+ auto it = msg.getProperties().find("key" + std::to_string(i));
+ ASSERT_NE(it, msg.getProperties().cend());
+ }
+ client.close();
+}
+
INSTANTIATE_TEST_CASE_P(Pulsar, ProducerTest, ::testing::Values(true, false));