lhotari commented on code in PR #23525:
URL: https://github.com/apache/pulsar/pull/23525#discussion_r1862065209


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -505,22 +506,27 @@ public void sendAsync(Message<?> message, SendCallback 
callback) {
         boolean compressed = false;
         // Batch will be compressed when closed
         // If a message has a delayed delivery time, we'll always send it 
individually
-        if (!isBatchMessagingEnabled() || msgMetadata.hasDeliverAtTime()) {
-            compressedPayload = applyCompression(payload);
-            compressed = true;
+        if (((!isBatchMessagingEnabled() || msgMetadata.hasDeliverAtTime()))) {
+            if (payload.readableBytes() < conf.getCompressMinMsgBodySize()) {
 
-            // validate msg-size (For batching this will be check at the batch 
completion size)
-            int compressedSize = compressedPayload.readableBytes();
-            if (compressedSize > getMaxMessageSize() && 
!this.conf.isChunkingEnabled()) {
-                compressedPayload.release();
-                String compressedStr = conf.getCompressionType() != 
CompressionType.NONE ? "Compressed" : "";
-                PulsarClientException.InvalidMessageException 
invalidMessageException =
-                        new PulsarClientException.InvalidMessageException(
-                                format("The producer %s of the topic %s sends 
a %s message with %d bytes that exceeds"
-                                                + " %d bytes",
-                        producerName, topic, compressedStr, compressedSize, 
getMaxMessageSize()));
-                completeCallbackAndReleaseSemaphore(uncompressedSize, 
callback, invalidMessageException);
-                return;
+            } else {
+                compressedPayload = applyCompression(payload);
+                compressed = true;
+
+                // validate msg-size (For batching this will be check at the 
batch completion size)
+                int compressedSize = compressedPayload.readableBytes();
+                if (compressedSize > getMaxMessageSize() && 
!this.conf.isChunkingEnabled()) {
+                    compressedPayload.release();
+                    String compressedStr = conf.getCompressionType() != 
CompressionType.NONE ? "Compressed" : "";

Review Comment:
   would it be useful to also show the compression type?
   ```suggestion
                       String compressedStr = conf.getCompressionType() != 
CompressionType.NONE ? ("compressed (" + conf.getCompressionType() + ")") : 
"uncompressed";
   ```



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java:
##########
@@ -169,9 +169,20 @@ protected ByteBuf getCompressedBatchMetadataAndPayload() {
         }
 
         int uncompressedSize = 
batchedMessageMetadataAndPayload.readableBytes();
-        ByteBuf compressedPayload = 
compressor.encode(batchedMessageMetadataAndPayload);
-        batchedMessageMetadataAndPayload.release();
-        if (compressionType != CompressionType.NONE) {
+        ByteBuf compressedPayload;
+        boolean isCompressed = false;
+        if (!isBrokerTwoPhaseCompactor && producer != null){
+            if (uncompressedSize > producer.conf.getCompressMinMsgBodySize()) {
+                compressedPayload = 
producer.applyCompression(batchedMessageMetadataAndPayload);
+                isCompressed = true;
+            } else {
+                compressedPayload = batchedMessageMetadataAndPayload;
+            }
+        } else {
+            compressedPayload = 
compressor.encode(batchedMessageMetadataAndPayload);
+            batchedMessageMetadataAndPayload.release();
+        }
+        if (compressionType != CompressionType.NONE && isCompressed) {
             messageMetadata.setCompression(compressionType);
             messageMetadata.setUncompressedSize(uncompressedSize);

Review Comment:
   move these lines after the code where compression is applied 
(`producer.applyCompression`).



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java:
##########
@@ -141,7 +141,7 @@ public boolean add(MessageImpl<?> msg, SendCallback 
callback) {
         return isBatchFull();
     }
 
-    protected ByteBuf getCompressedBatchMetadataAndPayload() {
+    protected ByteBuf getCompressedBatchMetadataAndPayload(boolean 
isBrokerTwoPhaseCompactor) {

Review Comment:
   Leaking implementation details that compaction doesn't compress messages is 
not very great. it would be better to rename `isBrokerTwoPhaseCompactor` to 
`allowCompression` and inverse the value. Add an override that passes `true` as 
the default value for `allowCompression`.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java:
##########
@@ -252,7 +263,8 @@ public OpSendMsg createOpSendMsg() throws IOException {
         if (messages.size() == 1) {
             messageMetadata.clear();
             messageMetadata.copyFrom(messages.get(0).getMessageBuilder());
-            ByteBuf encryptedPayload = 
producer.encryptMessage(messageMetadata, 
getCompressedBatchMetadataAndPayload());
+            ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata,
+                    getCompressedBatchMetadataAndPayload(false));

Review Comment:
   this change can be reverted once there's a 
`getCompressedBatchMetadataAndPayload()` method as explained in one of the 
previous comments.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java:
##########
@@ -169,9 +169,20 @@ protected ByteBuf getCompressedBatchMetadataAndPayload() {
         }
 
         int uncompressedSize = 
batchedMessageMetadataAndPayload.readableBytes();
-        ByteBuf compressedPayload = 
compressor.encode(batchedMessageMetadataAndPayload);
-        batchedMessageMetadataAndPayload.release();
-        if (compressionType != CompressionType.NONE) {
+        ByteBuf compressedPayload;
+        boolean isCompressed = false;
+        if (!isBrokerTwoPhaseCompactor && producer != null){
+            if (uncompressedSize > producer.conf.getCompressMinMsgBodySize()) {

Review Comment:
   This seems to miss the `compressionType != CompressionType.NONE` condition. 
It would be logical to have the condition here since the logic has been changed.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java:
##########
@@ -283,7 +295,8 @@ public OpSendMsg createOpSendMsg() throws IOException {
             lowestSequenceId = -1L;
             return op;
         }
-        ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, 
getCompressedBatchMetadataAndPayload());
+        ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata,
+                getCompressedBatchMetadataAndPayload(false));

Review Comment:
   this change can be reverted once there's a 
`getCompressedBatchMetadataAndPayload()` method as explained in one of the 
previous comments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to