This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch branch-3.6
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/branch-3.6 by this push:
     new aba8d6d  Fix buffer overflow for non-batched send when the message 
metadata size exceeds 64KB (#443)
aba8d6d is described below

commit aba8d6d618e56df18966ce4d9e8084241a958e0d
Author: Yunze Xu <[email protected]>
AuthorDate: Thu Aug 29 22:10:22 2024 +0800

    Fix buffer overflow for non-batched send when the message metadata size 
exceeds 64KB (#443)
    
    See https://github.com/apache/pulsar-client-python/issues/223
    
    ### Motivation
    
    Currently a shared buffer is used to store serialized message metadata
    for each send request. However, its capacity is only 64KB, when the metadata
    size exceeds 64KB, buffer overflow could happen.
    
    ### Modifications
    
    When the metadata size is too large, allocate a new buffer instead of
    using the shared buffer. Add `testLargeProperties` to cover it.
    
    (cherry picked from commit 8f269e837cbc20350e9a19505faca8e420a97d24)
---
 lib/Commands.cc       | 15 +++++++++++----
 tests/ProducerTest.cc | 31 +++++++++++++++++++++++++++++++
 2 files changed, 42 insertions(+), 4 deletions(-)

diff --git a/lib/Commands.cc b/lib/Commands.cc
index 4b10b73..84f272b 100644
--- a/lib/Commands.cc
+++ b/lib/Commands.cc
@@ -191,7 +191,7 @@ SharedBuffer Commands::newConsumerStats(uint64_t 
consumerId, uint64_t requestId)
     return buffer;
 }
 
-PairSharedBuffer Commands::newSend(SharedBuffer& headers, BaseCommand& cmd, 
ChecksumType checksumType,
+PairSharedBuffer Commands::newSend(SharedBuffer& originalHeaders, BaseCommand& 
cmd, ChecksumType checksumType,
                                    const SendArguments& args) {
     cmd.set_type(BaseCommand::SEND);
     CommandSend* send = cmd.mutable_send();
@@ -221,9 +221,16 @@ PairSharedBuffer Commands::newSend(SharedBuffer& headers, 
BaseCommand& cmd, Chec
     int totalSize = headerContentSize + payloadSize;
     int checksumReaderIndex = -1;
 
-    headers.reset();
-    assert(headers.writableBytes() >= (4 + headerContentSize));  // totalSize 
+ headerLength
-    headers.writeUnsignedInt(totalSize);                         // External 
frame
+    // By default, headers refers a static buffer whose capacity is 64KB, 
which can be reused for headers to
+    // avoid frequent memory allocation. However, if users configure many 
properties, the size could be great
+    // that results a buffer overflow. In this case, we can only allocate a 
new larger buffer.
+    originalHeaders.reset();
+    auto headers = originalHeaders;
+    if (headers.writableBytes() < (4 /* header length */ + headerContentSize)) 
{
+        headers = SharedBuffer::allocate(4 + headerContentSize);
+    }
+
+    headers.writeUnsignedInt(totalSize);  // External frame
 
     // Write cmd
     headers.writeUnsignedInt(cmdSize);
diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc
index 21e491d..d0d9eb4 100644
--- a/tests/ProducerTest.cc
+++ b/tests/ProducerTest.cc
@@ -683,4 +683,35 @@ TEST(ProducerTest, testFailedToCreateNewPartitionProducer) 
{
     client.close();
 }
 
+TEST(ProducerTest, testLargeProperties) {
+    const std::string topic = "producer-test-large-properties-" + 
std::to_string(time(nullptr));
+    Client client(serviceUrl);
+    Producer producer;
+    ProducerConfiguration conf;
+    conf.setBatchingEnabled(false);
+    ASSERT_EQ(ResultOk, client.createProducer(topic, conf, producer));
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));
+
+    MessageBuilder::StringMap properties;
+    constexpr int propertyCount = 20000;
+    auto builder = MessageBuilder().setContent("msg");
+    for (int i = 0; i < propertyCount; i++) {
+        builder.setProperty("key" + std::to_string(i), "value-" + 
std::to_string(i));
+    }
+
+    // ASSERT_EQ(ResultOk,
+    // 
producer.send(MessageBuilder().setContent("msg").setProperties(properties).build()));
+    ASSERT_EQ(ResultOk, producer.send(builder.build()));
+
+    Message msg;
+    ASSERT_EQ(ResultOk, consumer.receive(msg, 3000));
+    ASSERT_EQ(msg.getProperties().size(), propertyCount);
+    for (int i = 0; i < propertyCount; i++) {
+        auto it = msg.getProperties().find("key" + std::to_string(i));
+        ASSERT_NE(it, msg.getProperties().cend());
+    }
+    client.close();
+}
+
 INSTANTIATE_TEST_CASE_P(Pulsar, ProducerTest, ::testing::Values(true, false));

Reply via email to