This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch pip/minCompressSize
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6bf52a3adbacfc0e266d64d3545f92efffddf56f
Author: xiangying <[email protected]>
AuthorDate: Tue Oct 29 16:33:31 2024 +0800

    [improve][client] Add a producer config to improve compaction performance
---
 .../client/impl/RawBatchMessageContainerImpl.java     |  2 +-
 .../pulsar/client/impl/BatchMessageContainerImpl.java | 19 ++++++++++++++-----
 .../org/apache/pulsar/client/impl/ProducerImpl.java   |  7 ++++---
 .../client/impl/conf/ProducerConfigurationData.java   |  2 ++
 4 files changed, 21 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java
index 374f1e30c0a..8cb8d74b3f9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java
@@ -167,7 +167,7 @@ public class RawBatchMessageContainerImpl extends 
BatchMessageContainerImpl {
             }
         }
 
-        ByteBuf encryptedPayload = 
encrypt(getCompressedBatchMetadataAndPayload());
+        ByteBuf encryptedPayload = 
encrypt(getCompressedBatchMetadataAndPayload(true));
         updateAndReserveBatchAllocatedSize(encryptedPayload.capacity());
         ByteBuf metadataAndPayload = 
Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c,
                 messageMetadata, encryptedPayload);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index a3c9d1bc9ab..ed272b59c98 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -141,7 +141,7 @@ class BatchMessageContainerImpl extends 
AbstractBatchMessageContainer {
         return isBatchFull();
     }
 
-    protected ByteBuf getCompressedBatchMetadataAndPayload() {
+    protected ByteBuf getCompressedBatchMetadataAndPayload(boolean 
isBrokerTwoPhaseCompactor) {
         int batchWriteIndex = batchedMessageMetadataAndPayload.writerIndex();
         int batchReadIndex = batchedMessageMetadataAndPayload.readerIndex();
 
@@ -169,8 +169,17 @@ class BatchMessageContainerImpl extends 
AbstractBatchMessageContainer {
         }
 
         int uncompressedSize = 
batchedMessageMetadataAndPayload.readableBytes();
-        ByteBuf compressedPayload = 
compressor.encode(batchedMessageMetadataAndPayload);
-        batchedMessageMetadataAndPayload.release();
+        ByteBuf compressedPayload;
+        if (!isBrokerTwoPhaseCompactor && producer != null){
+            if (uncompressedSize > producer.conf.getCompressMinMsgBodySize()) {
+                compressedPayload = 
producer.applyCompression(batchedMessageMetadataAndPayload);
+            } else {
+                compressedPayload = batchedMessageMetadataAndPayload;
+            }
+        } else {
+            compressedPayload = 
compressor.encode(batchedMessageMetadataAndPayload);
+            batchedMessageMetadataAndPayload.release();
+        }
         if (compressionType != CompressionType.NONE) {
             messageMetadata.setCompression(compressionType);
             messageMetadata.setUncompressedSize(uncompressedSize);
@@ -252,7 +261,7 @@ class BatchMessageContainerImpl extends 
AbstractBatchMessageContainer {
         if (messages.size() == 1) {
             messageMetadata.clear();
             messageMetadata.copyFrom(messages.get(0).getMessageBuilder());
-            ByteBuf encryptedPayload = 
producer.encryptMessage(messageMetadata, 
getCompressedBatchMetadataAndPayload());
+            ByteBuf encryptedPayload = 
producer.encryptMessage(messageMetadata, 
getCompressedBatchMetadataAndPayload(false));
             updateAndReserveBatchAllocatedSize(encryptedPayload.capacity());
             ByteBufPair cmd = producer.sendMessage(producer.producerId, 
messageMetadata.getSequenceId(),
                 1, null, messageMetadata, encryptedPayload);
@@ -283,7 +292,7 @@ class BatchMessageContainerImpl extends 
AbstractBatchMessageContainer {
             lowestSequenceId = -1L;
             return op;
         }
-        ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, 
getCompressedBatchMetadataAndPayload());
+        ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, 
getCompressedBatchMetadataAndPayload(false));
         updateAndReserveBatchAllocatedSize(encryptedPayload.capacity());
         if (encryptedPayload.readableBytes() > getMaxMessageSize()) {
             producer.semaphoreRelease(messages.size());
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 6d5a8145463..1cdee641d6a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -478,7 +478,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
      * @param payload
      * @return a new payload
      */
-    private ByteBuf applyCompression(ByteBuf payload) {
+    protected ByteBuf applyCompression(ByteBuf payload) {
         ByteBuf compressedPayload = compressor.encode(payload);
         payload.release();
         return compressedPayload;
@@ -505,7 +505,8 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         boolean compressed = false;
         // Batch will be compressed when closed
         // If a message has a delayed delivery time, we'll always send it 
individually
-        if (!isBatchMessagingEnabled() || msgMetadata.hasDeliverAtTime()) {
+        if (((!isBatchMessagingEnabled() || msgMetadata.hasDeliverAtTime())
+                && payload.readableBytes() > 
conf.getCompressMinMsgBodySize())) {
             compressedPayload = applyCompression(payload);
             compressed = true;
 
@@ -737,7 +738,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         } else {
             // in this case compression has not been applied by the caller
             // but we have to compress the payload if compression is configured
-            if (!compressed) {
+            if (!compressed && chunkPayload.readableBytes() > 
conf.getCompressMinMsgBodySize()) {
                 chunkPayload = applyCompression(chunkPayload);
             }
             ByteBuf encryptedPayload = encryptMessage(msgMetadata, 
chunkPayload);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
index 6ec738bbf4c..0c770c7c9bd 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
@@ -189,6 +189,8 @@ public class ProducerConfigurationData implements 
Serializable, Cloneable {
     )
     private CompressionType compressionType = CompressionType.NONE;
 
+    private int compressMinMsgBodySize = 4 * 1024; // 4kb
+
     // Cannot use Optional<Long> since it's not serializable
     private Long initialSequenceId = null;
 

Reply via email to