This is an automated email from the ASF dual-hosted git repository. xiangying pushed a commit to branch pip/minCompressSize in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 6bf52a3adbacfc0e266d64d3545f92efffddf56f Author: xiangying <[email protected]> AuthorDate: Tue Oct 29 16:33:31 2024 +0800 [improve][client] Add a producer config to improve compaction performance --- .../client/impl/RawBatchMessageContainerImpl.java | 2 +- .../pulsar/client/impl/BatchMessageContainerImpl.java | 19 ++++++++++++++----- .../org/apache/pulsar/client/impl/ProducerImpl.java | 7 ++++--- .../client/impl/conf/ProducerConfigurationData.java | 2 ++ 4 files changed, 21 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java index 374f1e30c0a..8cb8d74b3f9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java @@ -167,7 +167,7 @@ public class RawBatchMessageContainerImpl extends BatchMessageContainerImpl { } } - ByteBuf encryptedPayload = encrypt(getCompressedBatchMetadataAndPayload()); + ByteBuf encryptedPayload = encrypt(getCompressedBatchMetadataAndPayload(true)); updateAndReserveBatchAllocatedSize(encryptedPayload.capacity()); ByteBuf metadataAndPayload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, encryptedPayload); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java index a3c9d1bc9ab..ed272b59c98 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java @@ -141,7 +141,7 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer { return isBatchFull(); } - protected ByteBuf getCompressedBatchMetadataAndPayload() { + protected ByteBuf getCompressedBatchMetadataAndPayload(boolean isBrokerTwoPhaseCompactor) { int batchWriteIndex = batchedMessageMetadataAndPayload.writerIndex(); int batchReadIndex = batchedMessageMetadataAndPayload.readerIndex(); @@ -169,8 +169,17 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer { } int uncompressedSize = batchedMessageMetadataAndPayload.readableBytes(); - ByteBuf compressedPayload = compressor.encode(batchedMessageMetadataAndPayload); - batchedMessageMetadataAndPayload.release(); + ByteBuf compressedPayload; + if (!isBrokerTwoPhaseCompactor && producer != null){ + if (uncompressedSize > producer.conf.getCompressMinMsgBodySize()) { + compressedPayload = producer.applyCompression(batchedMessageMetadataAndPayload); + } else { + compressedPayload = batchedMessageMetadataAndPayload; + } + } else { + compressedPayload = compressor.encode(batchedMessageMetadataAndPayload); + batchedMessageMetadataAndPayload.release(); + } if (compressionType != CompressionType.NONE) { messageMetadata.setCompression(compressionType); messageMetadata.setUncompressedSize(uncompressedSize); @@ -252,7 +261,7 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer { if (messages.size() == 1) { messageMetadata.clear(); messageMetadata.copyFrom(messages.get(0).getMessageBuilder()); - ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload()); + ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload(false)); updateAndReserveBatchAllocatedSize(encryptedPayload.capacity()); ByteBufPair cmd = producer.sendMessage(producer.producerId, messageMetadata.getSequenceId(), 1, null, messageMetadata, encryptedPayload); @@ -283,7 +292,7 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer { lowestSequenceId = -1L; return op; } - ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload()); + ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload(false)); updateAndReserveBatchAllocatedSize(encryptedPayload.capacity()); if (encryptedPayload.readableBytes() > getMaxMessageSize()) { producer.semaphoreRelease(messages.size()); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 6d5a8145463..1cdee641d6a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -478,7 +478,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne * @param payload * @return a new payload */ - private ByteBuf applyCompression(ByteBuf payload) { + protected ByteBuf applyCompression(ByteBuf payload) { ByteBuf compressedPayload = compressor.encode(payload); payload.release(); return compressedPayload; @@ -505,7 +505,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne boolean compressed = false; // Batch will be compressed when closed // If a message has a delayed delivery time, we'll always send it individually - if (!isBatchMessagingEnabled() || msgMetadata.hasDeliverAtTime()) { + if (((!isBatchMessagingEnabled() || msgMetadata.hasDeliverAtTime()) + && payload.readableBytes() > conf.getCompressMinMsgBodySize())) { compressedPayload = applyCompression(payload); compressed = true; @@ -737,7 +738,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne } else { // in this case compression has not been applied by the caller // but we have to compress the payload if compression is configured - if (!compressed) { + if (!compressed && chunkPayload.readableBytes() > conf.getCompressMinMsgBodySize()) { chunkPayload = applyCompression(chunkPayload); } ByteBuf encryptedPayload = encryptMessage(msgMetadata, chunkPayload); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java index 6ec738bbf4c..0c770c7c9bd 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java @@ -189,6 +189,8 @@ public class ProducerConfigurationData implements Serializable, Cloneable { ) private CompressionType compressionType = CompressionType.NONE; + private int compressMinMsgBodySize = 4 * 1024; // 4kb + // Cannot use Optional<Long> since it's not serializable private Long initialSequenceId = null;
