This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f28f98522e8 [improve][client-c++]Support include message header size 
when check maxMessageSize (#17289)
f28f98522e8 is described below

commit f28f98522e81368efd425176f3aa7818ad0d7e7b
Author: Cong Zhao <[email protected]>
AuthorDate: Thu Sep 22 22:59:32 2022 +0800

    [improve][client-c++]Support include message header size when check 
maxMessageSize (#17289)
    
    ### Motivation
    
    See: #17188
    
    ### Modifications
    
    Support include message header size when check maxMessageSize for cpp client
---
 pulsar-client-cpp/lib/ProducerImpl.cc        | 70 ++++++++++++++++++----------
 pulsar-client-cpp/tests/BasicEndToEndTest.cc |  4 +-
 pulsar-client-cpp/tests/ProducerTest.cc      | 62 ++++++++++++++++++++++++
 3 files changed, 110 insertions(+), 26 deletions(-)

diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc 
b/pulsar-client-cpp/lib/ProducerImpl.cc
index 8c87086297f..20133c50fc9 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -417,37 +417,18 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const 
Message& msg, const SendCallba
         callback(result, {});
     };
 
+    auto& msgMetadata = msg.impl_->metadata;
     const bool compressed = !canAddToBatch(msg);
     const auto payload =
         compressed ? applyCompression(uncompressedPayload, 
conf_.getCompressionType()) : uncompressedPayload;
     const auto compressedSize = static_cast<uint32_t>(payload.readableBytes());
     const auto maxMessageSize = 
static_cast<uint32_t>(ClientConnection::getMaxMessageSize());
 
-    if (compressed && compressedSize > ClientConnection::getMaxMessageSize() 
&& !chunkingEnabled_) {
-        LOG_WARN(getName() << " - compressed Message payload size " << 
payload.readableBytes()
-                           << " cannot exceed " << 
ClientConnection::getMaxMessageSize()
-                           << " bytes unless chunking is enabled");
-        handleFailedResult(ResultMessageTooBig);
-        return;
-    }
-
-    auto& msgMetadata = msg.impl_->metadata;
     if (!msgMetadata.has_replicated_from() && msgMetadata.has_producer_name()) 
{
         handleFailedResult(ResultInvalidMessage);
         return;
     }
 
-    const int totalChunks =
-        canAddToBatch(msg) ? 1 : getNumOfChunks(compressedSize, 
ClientConnection::getMaxMessageSize());
-    // Each chunk should be sent individually, so try to acquire extra permits 
for chunks.
-    for (int i = 0; i < (totalChunks - 1); i++) {
-        const auto result = canEnqueueRequest(0);  // size is 0 because the 
memory has already reserved
-        if (result != ResultOk) {
-            handleFailedResult(result);
-            return;
-        }
-    }
-
     Lock lock(mutex_);
     uint64_t sequenceId;
     if (!msgMetadata.has_sequence_id()) {
@@ -457,6 +438,31 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& 
msg, const SendCallba
     }
     setMessageMetadata(msg, sequenceId, uncompressedSize);
 
+    auto payloadChunkSize = maxMessageSize;
+    int totalChunks;
+    if (!compressed || !chunkingEnabled_) {
+        totalChunks = 1;
+    } else {
+        const auto metadataSize = 
static_cast<uint32_t>(msgMetadata.ByteSizeLong());
+        if (metadataSize >= maxMessageSize) {
+            LOG_WARN(getName() << " - metadata size " << metadataSize << " 
cannot exceed " << maxMessageSize
+                               << " bytes");
+            handleFailedResult(ResultMessageTooBig);
+            return;
+        }
+        payloadChunkSize = maxMessageSize - metadataSize;
+        totalChunks = getNumOfChunks(compressedSize, payloadChunkSize);
+    }
+
+    // Each chunk should be sent individually, so try to acquire extra permits 
for chunks.
+    for (int i = 0; i < (totalChunks - 1); i++) {
+        const auto result = canEnqueueRequest(0);  // size is 0 because the 
memory has already reserved
+        if (result != ResultOk) {
+            handleFailedResult(result);
+            return;
+        }
+    }
+
     if (canAddToBatch(msg)) {
         // Batching is enabled and the message is not delayed
         if (!batchMessageContainer_->hasEnoughSpace(msg)) {
@@ -508,7 +514,7 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& 
msg, const SendCallba
             if (sendChunks) {
                 msgMetadata.set_chunk_id(chunkId);
             }
-            const uint32_t endIndex = std::min(compressedSize, beginIndex + 
maxMessageSize);
+            const uint32_t endIndex = std::min(compressedSize, beginIndex + 
payloadChunkSize);
             auto chunkedPayload = payload.slice(beginIndex, endIndex - 
beginIndex);
             beginIndex = endIndex;
 
@@ -517,10 +523,26 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const 
Message& msg, const SendCallba
                 handleFailedResult(ResultCryptoError);
                 return;
             }
+            OpSendMsg op{msgMetadata, encryptedPayload, (chunkId == 
totalChunks - 1) ? callback : nullptr,
+                         producerId_, sequenceId,       conf_.getSendTimeout(),
+                         1,           uncompressedSize};
+
+            if (!chunkingEnabled_) {
+                const uint32_t msgMetadataSize = op.metadata_.ByteSize();
+                const uint32_t payloadSize = op.payload_.readableBytes();
+                const uint32_t msgHeadersAndPayloadSize = msgMetadataSize + 
payloadSize;
+                if (msgHeadersAndPayloadSize > maxMessageSize) {
+                    lock.unlock();
+                    releaseSemaphoreForSendOp(op);
+                    LOG_WARN(getName()
+                             << " - compressed Message size " << 
msgHeadersAndPayloadSize << " cannot exceed "
+                             << maxMessageSize << " bytes unless chunking is 
enabled");
+                    handleFailedResult(ResultMessageTooBig);
+                    return;
+                }
+            }
 
-            sendMessage(OpSendMsg{msgMetadata, encryptedPayload,
-                                  (chunkId == totalChunks - 1) ? callback : 
nullptr, producerId_, sequenceId,
-                                  conf_.getSendTimeout(), 1, 
uncompressedSize});
+            sendMessage(op);
         }
     }
 }
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc 
b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 5c4f7d21623..3431978a7ec 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -593,8 +593,8 @@ TEST(BasicEndToEndTest, testMessageTooBig) {
     result = producer.send(msg);
     ASSERT_EQ(ResultMessageTooBig, result);
 
-    // Anything up to MaxMessageSize should be allowed
-    size = ClientConnection::getMaxMessageSize();
+    // Anything up to MaxMessageSize - MetadataSize should be allowed
+    size = ClientConnection::getMaxMessageSize() - 32; /*the default message 
metadata size for string schema*/
     msg = MessageBuilder().setAllocatedContent(content, size).build();
     result = producer.send(msg);
     ASSERT_EQ(ResultOk, result);
diff --git a/pulsar-client-cpp/tests/ProducerTest.cc 
b/pulsar-client-cpp/tests/ProducerTest.cc
index 6315ac16f51..d351ee9cdbc 100644
--- a/pulsar-client-cpp/tests/ProducerTest.cc
+++ b/pulsar-client-cpp/tests/ProducerTest.cc
@@ -34,6 +34,9 @@ using namespace pulsar;
 static const std::string serviceUrl = "pulsar://localhost:6650";
 static const std::string adminUrl = "http://localhost:8080/";;
 
+// See the `maxMessageSize` config in test-conf/standalone-ssl.conf
+static constexpr size_t maxMessageSize = 1024000;
+
 TEST(ProducerTest, producerNotInitialized) {
     Producer producer;
 
@@ -211,6 +214,63 @@ TEST(ProducerTest, testBacklogQuotasExceeded) {
     client.close();
 }
 
+class ProducerTest : public ::testing::TestWithParam<bool> {};
+
+TEST_P(ProducerTest, testMaxMessageSize) {
+    Client client(serviceUrl);
+
+    const std::string topic = "ProducerTest-NoBatchMaxMessageSize-" + 
std::to_string(time(nullptr));
+
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));
+
+    Producer producer;
+    ProducerConfiguration conf;
+    conf.setBatchingEnabled(GetParam());
+    ASSERT_EQ(ResultOk, client.createProducer(topic, conf, producer));
+
+    std::string msg = std::string(maxMessageSize / 2, 'a');
+    ASSERT_EQ(ResultOk, 
producer.send(MessageBuilder().setContent(msg).build()));
+    Message message;
+    ASSERT_EQ(ResultOk, consumer.receive(message));
+    ASSERT_EQ(msg, message.getDataAsString());
+
+    std::string orderKey = std::string(maxMessageSize, 'a');
+    ASSERT_EQ(ResultMessageTooBig, 
producer.send(MessageBuilder().setOrderingKey(orderKey).build()));
+
+    ASSERT_EQ(ResultMessageTooBig,
+              
producer.send(MessageBuilder().setContent(std::string(maxMessageSize, 
'b')).build()));
+
+    client.close();
+}
+
+TEST_P(ProducerTest, testChunkingMaxMessageSize) {
+    Client client(serviceUrl);
+
+    const std::string topic = "ProducerTest-ChunkingMaxMessageSize-" + 
std::to_string(time(nullptr));
+
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));
+
+    Producer producer;
+    ProducerConfiguration conf;
+    conf.setBatchingEnabled(false);
+    conf.setChunkingEnabled(true);
+    ASSERT_EQ(ResultOk, client.createProducer(topic, conf, producer));
+
+    std::string orderKey = std::string(maxMessageSize, 'a');
+    ASSERT_EQ(ResultMessageTooBig, 
producer.send(MessageBuilder().setOrderingKey(orderKey).build()));
+
+    std::string msg = std::string(2 * maxMessageSize + 10, 'b');
+    Message message;
+    ASSERT_EQ(ResultOk, 
producer.send(MessageBuilder().setContent(msg).build()));
+    ASSERT_EQ(ResultOk, consumer.receive(message));
+    ASSERT_EQ(msg, message.getDataAsString());
+    ASSERT_LE(1L, message.getMessageId().entryId());
+
+    client.close();
+}
+
 TEST(ProducerTest, testExclusiveProducer) {
     Client client(serviceUrl);
 
@@ -234,3 +294,5 @@ TEST(ProducerTest, testExclusiveProducer) {
     producerConfiguration3.setProducerName("p-name-3");
     ASSERT_EQ(ResultProducerBusy, client.createProducer(topicName, 
producerConfiguration3, producer3));
 }
+
+INSTANTIATE_TEST_CASE_P(Pulsar, ProducerTest, ::testing::Values(true, false));

Reply via email to