BewareMyPower commented on code in PR #17289: URL: https://github.com/apache/pulsar/pull/17289#discussion_r964912014
########## pulsar-client-cpp/lib/ProducerImpl.cc: ########## @@ -500,9 +506,26 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba return; } - sendMessage(OpSendMsg{msgMetadata, encryptedPayload, - (chunkId == totalChunks - 1) ? callback : nullptr, producerId_, sequenceId, - conf_.getSendTimeout(), 1, uncompressedSize}); + OpSendMsg op = + OpSendMsg{msgMetadata, encryptedPayload, (chunkId == totalChunks - 1) ? callback : nullptr, + producerId_, sequenceId, conf_.getSendTimeout(), + 1, uncompressedSize}; + + if (!chunkingEnabled_) { + const uint32_t msgMetadataSize = op.metadata_.ByteSize(); + const uint32_t payloadSize = op.payload_.readableBytes(); + const uint32_t msgHeadersAndPayloadSize = msgMetadataSize + payloadSize; + if (msgHeadersAndPayloadSize > maxMessageSize) { Review Comment: ```suggestion if (msgHeadersAndPayloadSize > maxMessageSize) { lock.unlock(); ``` We can unlock it since the lock is acquired only for `sendMessage`. ########## pulsar-client-cpp/tests/ProducerTest.cc: ########## @@ -210,3 +213,84 @@ TEST(ProducerTest, testBacklogQuotasExceeded) { client.close(); } + +TEST(ProducerTest, testMaxMessageSize) { + Client client(serviceUrl); + + const std::string topic = "ProducerTest-MaxMessageSize-" + std::to_string(time(nullptr)); + + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer)); + + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topic, producer)); + + std::string msg = std::string(maxMessageSize / 2, 'a'); + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(msg).build())); + Message message; + ASSERT_EQ(ResultOk, consumer.receive(message)); + ASSERT_EQ(msg, message.getDataAsString()); + + std::string orderKey = std::string(maxMessageSize, 'a'); + ASSERT_EQ(ResultMessageTooBig, producer.send(MessageBuilder().setOrderingKey(orderKey).build())); + + ASSERT_EQ(ResultMessageTooBig, + producer.send(MessageBuilder().setContent(std::string(maxMessageSize, 'b')).build())); + + client.close(); +} + +TEST(ProducerTest, testNoBatchMaxMessageSize) { + Client client(serviceUrl); + + const std::string topic = "ProducerTest-NoBatchMaxMessageSize-" + std::to_string(time(nullptr)); + + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer)); + + Producer producer; + ProducerConfiguration conf; + conf.setBatchingEnabled(false); + ASSERT_EQ(ResultOk, client.createProducer(topic, conf, producer)); + + std::string msg = std::string(maxMessageSize / 2, 'a'); + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(msg).build())); + Message message; + ASSERT_EQ(ResultOk, consumer.receive(message)); + ASSERT_EQ(msg, message.getDataAsString()); + + std::string orderKey = std::string(maxMessageSize, 'a'); + ASSERT_EQ(ResultMessageTooBig, producer.send(MessageBuilder().setOrderingKey(orderKey).build())); + + ASSERT_EQ(ResultMessageTooBig, + producer.send(MessageBuilder().setContent(std::string(maxMessageSize, 'b')).build())); + + client.close(); +} + +TEST(ProducerTest, testChunkingMaxMessageSize) { Review Comment: This test seems to be redundant because it's covered by the previous tests. ########## pulsar-client-cpp/tests/ProducerTest.cc: ########## @@ -210,3 +213,84 @@ TEST(ProducerTest, testBacklogQuotasExceeded) { client.close(); } + +TEST(ProducerTest, testMaxMessageSize) { + Client client(serviceUrl); + + const std::string topic = "ProducerTest-MaxMessageSize-" + std::to_string(time(nullptr)); + + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer)); + + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topic, producer)); + + std::string msg = std::string(maxMessageSize / 2, 'a'); + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(msg).build())); + Message message; + ASSERT_EQ(ResultOk, consumer.receive(message)); + ASSERT_EQ(msg, message.getDataAsString()); + + std::string orderKey = std::string(maxMessageSize, 'a'); + ASSERT_EQ(ResultMessageTooBig, producer.send(MessageBuilder().setOrderingKey(orderKey).build())); + + ASSERT_EQ(ResultMessageTooBig, + producer.send(MessageBuilder().setContent(std::string(maxMessageSize, 'b')).build())); + + client.close(); +} + +TEST(ProducerTest, testNoBatchMaxMessageSize) { Review Comment: It's nearly the same with `testMaxMessageSize` except the difference of the producer configuration. You can simplify the test via `TEST_P`. See `MessageChunkingTest` for example. BTW, I think these tests could also be moved to `MessageChunkingTest`. ########## pulsar-client-cpp/lib/ProducerImpl.cc: ########## @@ -500,9 +506,26 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba return; } - sendMessage(OpSendMsg{msgMetadata, encryptedPayload, - (chunkId == totalChunks - 1) ? callback : nullptr, producerId_, sequenceId, - conf_.getSendTimeout(), 1, uncompressedSize}); + OpSendMsg op = + OpSendMsg{msgMetadata, encryptedPayload, (chunkId == totalChunks - 1) ? callback : nullptr, + producerId_, sequenceId, conf_.getSendTimeout(), + 1, uncompressedSize}; Review Comment: ```c++ OpSendMsg op{msgMetadata, encryptedPayload, (chunkId == totalChunks - 1) ? callback : nullptr, producerId_, sequenceId, conf_.getSendTimeout(), 1, uncompressedSize}; ``` Simplify the code. BTW, it can also reduce an invocation of the copy constructor, though the copy might be optimized due to the compiler. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org