This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f28f98522e8 [improve][client-c++]Support include message header size
when check maxMessageSize (#17289)
f28f98522e8 is described below
commit f28f98522e81368efd425176f3aa7818ad0d7e7b
Author: Cong Zhao <[email protected]>
AuthorDate: Thu Sep 22 22:59:32 2022 +0800
[improve][client-c++]Support include message header size when check
maxMessageSize (#17289)
### Motivation
See: #17188
### Modifications
Support include message header size when check maxMessageSize for cpp client
---
pulsar-client-cpp/lib/ProducerImpl.cc | 70 ++++++++++++++++++----------
pulsar-client-cpp/tests/BasicEndToEndTest.cc | 4 +-
pulsar-client-cpp/tests/ProducerTest.cc | 62 ++++++++++++++++++++++++
3 files changed, 110 insertions(+), 26 deletions(-)
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc
b/pulsar-client-cpp/lib/ProducerImpl.cc
index 8c87086297f..20133c50fc9 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -417,37 +417,18 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const
Message& msg, const SendCallba
callback(result, {});
};
+ auto& msgMetadata = msg.impl_->metadata;
const bool compressed = !canAddToBatch(msg);
const auto payload =
compressed ? applyCompression(uncompressedPayload,
conf_.getCompressionType()) : uncompressedPayload;
const auto compressedSize = static_cast<uint32_t>(payload.readableBytes());
const auto maxMessageSize =
static_cast<uint32_t>(ClientConnection::getMaxMessageSize());
- if (compressed && compressedSize > ClientConnection::getMaxMessageSize()
&& !chunkingEnabled_) {
- LOG_WARN(getName() << " - compressed Message payload size " <<
payload.readableBytes()
- << " cannot exceed " <<
ClientConnection::getMaxMessageSize()
- << " bytes unless chunking is enabled");
- handleFailedResult(ResultMessageTooBig);
- return;
- }
-
- auto& msgMetadata = msg.impl_->metadata;
if (!msgMetadata.has_replicated_from() && msgMetadata.has_producer_name())
{
handleFailedResult(ResultInvalidMessage);
return;
}
- const int totalChunks =
- canAddToBatch(msg) ? 1 : getNumOfChunks(compressedSize,
ClientConnection::getMaxMessageSize());
- // Each chunk should be sent individually, so try to acquire extra permits
for chunks.
- for (int i = 0; i < (totalChunks - 1); i++) {
- const auto result = canEnqueueRequest(0); // size is 0 because the
memory has already reserved
- if (result != ResultOk) {
- handleFailedResult(result);
- return;
- }
- }
-
Lock lock(mutex_);
uint64_t sequenceId;
if (!msgMetadata.has_sequence_id()) {
@@ -457,6 +438,31 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message&
msg, const SendCallba
}
setMessageMetadata(msg, sequenceId, uncompressedSize);
+ auto payloadChunkSize = maxMessageSize;
+ int totalChunks;
+ if (!compressed || !chunkingEnabled_) {
+ totalChunks = 1;
+ } else {
+ const auto metadataSize =
static_cast<uint32_t>(msgMetadata.ByteSizeLong());
+ if (metadataSize >= maxMessageSize) {
+ LOG_WARN(getName() << " - metadata size " << metadataSize << "
cannot exceed " << maxMessageSize
+ << " bytes");
+ handleFailedResult(ResultMessageTooBig);
+ return;
+ }
+ payloadChunkSize = maxMessageSize - metadataSize;
+ totalChunks = getNumOfChunks(compressedSize, payloadChunkSize);
+ }
+
+ // Each chunk should be sent individually, so try to acquire extra permits
for chunks.
+ for (int i = 0; i < (totalChunks - 1); i++) {
+ const auto result = canEnqueueRequest(0); // size is 0 because the
memory has already reserved
+ if (result != ResultOk) {
+ handleFailedResult(result);
+ return;
+ }
+ }
+
if (canAddToBatch(msg)) {
// Batching is enabled and the message is not delayed
if (!batchMessageContainer_->hasEnoughSpace(msg)) {
@@ -508,7 +514,7 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message&
msg, const SendCallba
if (sendChunks) {
msgMetadata.set_chunk_id(chunkId);
}
- const uint32_t endIndex = std::min(compressedSize, beginIndex +
maxMessageSize);
+ const uint32_t endIndex = std::min(compressedSize, beginIndex +
payloadChunkSize);
auto chunkedPayload = payload.slice(beginIndex, endIndex -
beginIndex);
beginIndex = endIndex;
@@ -517,10 +523,26 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const
Message& msg, const SendCallba
handleFailedResult(ResultCryptoError);
return;
}
+ OpSendMsg op{msgMetadata, encryptedPayload, (chunkId ==
totalChunks - 1) ? callback : nullptr,
+ producerId_, sequenceId, conf_.getSendTimeout(),
+ 1, uncompressedSize};
+
+ if (!chunkingEnabled_) {
+ const uint32_t msgMetadataSize = op.metadata_.ByteSize();
+ const uint32_t payloadSize = op.payload_.readableBytes();
+ const uint32_t msgHeadersAndPayloadSize = msgMetadataSize +
payloadSize;
+ if (msgHeadersAndPayloadSize > maxMessageSize) {
+ lock.unlock();
+ releaseSemaphoreForSendOp(op);
+ LOG_WARN(getName()
+ << " - compressed Message size " <<
msgHeadersAndPayloadSize << " cannot exceed "
+ << maxMessageSize << " bytes unless chunking is
enabled");
+ handleFailedResult(ResultMessageTooBig);
+ return;
+ }
+ }
- sendMessage(OpSendMsg{msgMetadata, encryptedPayload,
- (chunkId == totalChunks - 1) ? callback :
nullptr, producerId_, sequenceId,
- conf_.getSendTimeout(), 1,
uncompressedSize});
+ sendMessage(op);
}
}
}
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 5c4f7d21623..3431978a7ec 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -593,8 +593,8 @@ TEST(BasicEndToEndTest, testMessageTooBig) {
result = producer.send(msg);
ASSERT_EQ(ResultMessageTooBig, result);
- // Anything up to MaxMessageSize should be allowed
- size = ClientConnection::getMaxMessageSize();
+ // Anything up to MaxMessageSize - MetadataSize should be allowed
+ size = ClientConnection::getMaxMessageSize() - 32; /*the default message
metadata size for string schema*/
msg = MessageBuilder().setAllocatedContent(content, size).build();
result = producer.send(msg);
ASSERT_EQ(ResultOk, result);
diff --git a/pulsar-client-cpp/tests/ProducerTest.cc
b/pulsar-client-cpp/tests/ProducerTest.cc
index 6315ac16f51..d351ee9cdbc 100644
--- a/pulsar-client-cpp/tests/ProducerTest.cc
+++ b/pulsar-client-cpp/tests/ProducerTest.cc
@@ -34,6 +34,9 @@ using namespace pulsar;
static const std::string serviceUrl = "pulsar://localhost:6650";
static const std::string adminUrl = "http://localhost:8080/";
+// See the `maxMessageSize` config in test-conf/standalone-ssl.conf
+static constexpr size_t maxMessageSize = 1024000;
+
TEST(ProducerTest, producerNotInitialized) {
Producer producer;
@@ -211,6 +214,63 @@ TEST(ProducerTest, testBacklogQuotasExceeded) {
client.close();
}
+class ProducerTest : public ::testing::TestWithParam<bool> {};
+
+TEST_P(ProducerTest, testMaxMessageSize) {
+ Client client(serviceUrl);
+
+ const std::string topic = "ProducerTest-NoBatchMaxMessageSize-" +
std::to_string(time(nullptr));
+
+ Consumer consumer;
+ ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));
+
+ Producer producer;
+ ProducerConfiguration conf;
+ conf.setBatchingEnabled(GetParam());
+ ASSERT_EQ(ResultOk, client.createProducer(topic, conf, producer));
+
+ std::string msg = std::string(maxMessageSize / 2, 'a');
+ ASSERT_EQ(ResultOk,
producer.send(MessageBuilder().setContent(msg).build()));
+ Message message;
+ ASSERT_EQ(ResultOk, consumer.receive(message));
+ ASSERT_EQ(msg, message.getDataAsString());
+
+ std::string orderKey = std::string(maxMessageSize, 'a');
+ ASSERT_EQ(ResultMessageTooBig,
producer.send(MessageBuilder().setOrderingKey(orderKey).build()));
+
+ ASSERT_EQ(ResultMessageTooBig,
+
producer.send(MessageBuilder().setContent(std::string(maxMessageSize,
'b')).build()));
+
+ client.close();
+}
+
+TEST_P(ProducerTest, testChunkingMaxMessageSize) {
+ Client client(serviceUrl);
+
+ const std::string topic = "ProducerTest-ChunkingMaxMessageSize-" +
std::to_string(time(nullptr));
+
+ Consumer consumer;
+ ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));
+
+ Producer producer;
+ ProducerConfiguration conf;
+ conf.setBatchingEnabled(false);
+ conf.setChunkingEnabled(true);
+ ASSERT_EQ(ResultOk, client.createProducer(topic, conf, producer));
+
+ std::string orderKey = std::string(maxMessageSize, 'a');
+ ASSERT_EQ(ResultMessageTooBig,
producer.send(MessageBuilder().setOrderingKey(orderKey).build()));
+
+ std::string msg = std::string(2 * maxMessageSize + 10, 'b');
+ Message message;
+ ASSERT_EQ(ResultOk,
producer.send(MessageBuilder().setContent(msg).build()));
+ ASSERT_EQ(ResultOk, consumer.receive(message));
+ ASSERT_EQ(msg, message.getDataAsString());
+ ASSERT_LE(1L, message.getMessageId().entryId());
+
+ client.close();
+}
+
TEST(ProducerTest, testExclusiveProducer) {
Client client(serviceUrl);
@@ -234,3 +294,5 @@ TEST(ProducerTest, testExclusiveProducer) {
producerConfiguration3.setProducerName("p-name-3");
ASSERT_EQ(ResultProducerBusy, client.createProducer(topicName,
producerConfiguration3, producer3));
}
+
+INSTANTIATE_TEST_CASE_P(Pulsar, ProducerTest, ::testing::Values(true, false));