BewareMyPower commented on code in PR #17289:
URL: https://github.com/apache/pulsar/pull/17289#discussion_r964912014


##########
pulsar-client-cpp/lib/ProducerImpl.cc:
##########
@@ -500,9 +506,26 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& 
msg, const SendCallba
                 return;
             }
 
-            sendMessage(OpSendMsg{msgMetadata, encryptedPayload,
-                                  (chunkId == totalChunks - 1) ? callback : 
nullptr, producerId_, sequenceId,
-                                  conf_.getSendTimeout(), 1, 
uncompressedSize});
+            OpSendMsg op =
+                OpSendMsg{msgMetadata, encryptedPayload, (chunkId == 
totalChunks - 1) ? callback : nullptr,
+                          producerId_, sequenceId,       
conf_.getSendTimeout(),
+                          1,           uncompressedSize};
+
+            if (!chunkingEnabled_) {
+                const uint32_t msgMetadataSize = op.metadata_.ByteSize();
+                const uint32_t payloadSize = op.payload_.readableBytes();
+                const uint32_t msgHeadersAndPayloadSize = msgMetadataSize + 
payloadSize;
+                if (msgHeadersAndPayloadSize > maxMessageSize) {

Review Comment:
   ```suggestion
                   if (msgHeadersAndPayloadSize > maxMessageSize) {
                       lock.unlock();
   ```
   
   We can unlock it since the lock is acquired only for `sendMessage`.



##########
pulsar-client-cpp/tests/ProducerTest.cc:
##########
@@ -210,3 +213,84 @@ TEST(ProducerTest, testBacklogQuotasExceeded) {
 
     client.close();
 }
+
+TEST(ProducerTest, testMaxMessageSize) {
+    Client client(serviceUrl);
+
+    const std::string topic = "ProducerTest-MaxMessageSize-" + 
std::to_string(time(nullptr));
+
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+
+    std::string msg = std::string(maxMessageSize / 2, 'a');
+    ASSERT_EQ(ResultOk, 
producer.send(MessageBuilder().setContent(msg).build()));
+    Message message;
+    ASSERT_EQ(ResultOk, consumer.receive(message));
+    ASSERT_EQ(msg, message.getDataAsString());
+
+    std::string orderKey = std::string(maxMessageSize, 'a');
+    ASSERT_EQ(ResultMessageTooBig, 
producer.send(MessageBuilder().setOrderingKey(orderKey).build()));
+
+    ASSERT_EQ(ResultMessageTooBig,
+              
producer.send(MessageBuilder().setContent(std::string(maxMessageSize, 
'b')).build()));
+
+    client.close();
+}
+
+TEST(ProducerTest, testNoBatchMaxMessageSize) {
+    Client client(serviceUrl);
+
+    const std::string topic = "ProducerTest-NoBatchMaxMessageSize-" + 
std::to_string(time(nullptr));
+
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));
+
+    Producer producer;
+    ProducerConfiguration conf;
+    conf.setBatchingEnabled(false);
+    ASSERT_EQ(ResultOk, client.createProducer(topic, conf, producer));
+
+    std::string msg = std::string(maxMessageSize / 2, 'a');
+    ASSERT_EQ(ResultOk, 
producer.send(MessageBuilder().setContent(msg).build()));
+    Message message;
+    ASSERT_EQ(ResultOk, consumer.receive(message));
+    ASSERT_EQ(msg, message.getDataAsString());
+
+    std::string orderKey = std::string(maxMessageSize, 'a');
+    ASSERT_EQ(ResultMessageTooBig, 
producer.send(MessageBuilder().setOrderingKey(orderKey).build()));
+
+    ASSERT_EQ(ResultMessageTooBig,
+              
producer.send(MessageBuilder().setContent(std::string(maxMessageSize, 
'b')).build()));
+
+    client.close();
+}
+
+TEST(ProducerTest, testChunkingMaxMessageSize) {

Review Comment:
   This test seems to be redundant because it's covered by the previous tests.



##########
pulsar-client-cpp/tests/ProducerTest.cc:
##########
@@ -210,3 +213,84 @@ TEST(ProducerTest, testBacklogQuotasExceeded) {
 
     client.close();
 }
+
+TEST(ProducerTest, testMaxMessageSize) {
+    Client client(serviceUrl);
+
+    const std::string topic = "ProducerTest-MaxMessageSize-" + 
std::to_string(time(nullptr));
+
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+
+    std::string msg = std::string(maxMessageSize / 2, 'a');
+    ASSERT_EQ(ResultOk, 
producer.send(MessageBuilder().setContent(msg).build()));
+    Message message;
+    ASSERT_EQ(ResultOk, consumer.receive(message));
+    ASSERT_EQ(msg, message.getDataAsString());
+
+    std::string orderKey = std::string(maxMessageSize, 'a');
+    ASSERT_EQ(ResultMessageTooBig, 
producer.send(MessageBuilder().setOrderingKey(orderKey).build()));
+
+    ASSERT_EQ(ResultMessageTooBig,
+              
producer.send(MessageBuilder().setContent(std::string(maxMessageSize, 
'b')).build()));
+
+    client.close();
+}
+
+TEST(ProducerTest, testNoBatchMaxMessageSize) {

Review Comment:
   It's nearly the same with `testMaxMessageSize` except the difference of the 
producer configuration. You can simplify the test via `TEST_P`. See 
`MessageChunkingTest` for example. BTW, I think these tests could also be moved 
to `MessageChunkingTest`.



##########
pulsar-client-cpp/lib/ProducerImpl.cc:
##########
@@ -500,9 +506,26 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& 
msg, const SendCallba
                 return;
             }
 
-            sendMessage(OpSendMsg{msgMetadata, encryptedPayload,
-                                  (chunkId == totalChunks - 1) ? callback : 
nullptr, producerId_, sequenceId,
-                                  conf_.getSendTimeout(), 1, 
uncompressedSize});
+            OpSendMsg op =
+                OpSendMsg{msgMetadata, encryptedPayload, (chunkId == 
totalChunks - 1) ? callback : nullptr,
+                          producerId_, sequenceId,       
conf_.getSendTimeout(),
+                          1,           uncompressedSize};

Review Comment:
   ```c++
               OpSendMsg op{msgMetadata, encryptedPayload, (chunkId == 
totalChunks - 1) ? callback : nullptr,
                            producerId_, sequenceId,       
conf_.getSendTimeout(),
                            1,           uncompressedSize};
   ```
   
   Simplify the code. BTW, it can also reduce an invocation of the copy 
constructor, though the copy might be optimized due to the compiler.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to