This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new dcc88c65f1 [ISSUE #8166] optimize: make compression type configurable
in producer clinet level
dcc88c65f1 is described below
commit dcc88c65f1f29b392fbb300001b386dbf1901afc
Author: Humkum <[email protected]>
AuthorDate: Thu May 23 13:56:30 2024 +0800
[ISSUE #8166] optimize: make compression type configurable in producer
clinet level
---
.../impl/producer/DefaultMQProducerImpl.java | 28 ++--------------
.../client/producer/DefaultMQProducer.java | 39 ++++++++++++++++++++++
.../rocketmq/example/benchmark/BatchProducer.java | 4 +--
.../rocketmq/example/benchmark/Producer.java | 4 +--
4 files changed, 45 insertions(+), 30 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 6268bcc0a1..7ef3402513 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -70,9 +70,6 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.compression.CompressionType;
-import org.apache.rocketmq.common.compression.Compressor;
-import org.apache.rocketmq.common.compression.CompressorFactory;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
@@ -118,11 +115,6 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
private MQFaultStrategy mqFaultStrategy;
private ExecutorService asyncSenderExecutor;
- // compression related
- private int compressLevel =
Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));
- private CompressionType compressType =
CompressionType.of(System.getProperty(MixAll.MESSAGE_COMPRESS_TYPE, "ZLIB"));
- private final Compressor compressor =
CompressorFactory.getCompressor(compressType);
-
// backpressure related
private Semaphore semaphoreAsyncSendNum;
private Semaphore semaphoreAsyncSendSize;
@@ -900,7 +892,7 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
boolean msgBodyCompressed = false;
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
- sysFlag |= compressType.getCompressionFlag();
+ sysFlag |=
this.defaultMQProducer.getCompressType().getCompressionFlag();
msgBodyCompressed = true;
}
@@ -1070,7 +1062,7 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
if (body != null) {
if (body.length >=
this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
try {
- byte[] data = compressor.compress(body, compressLevel);
+ byte[] data =
this.defaultMQProducer.getCompressor().compress(body,
this.defaultMQProducer.getCompressLevel());
if (data != null) {
msg.setBody(data);
return true;
@@ -1763,22 +1755,6 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
return topicPublishInfoTable;
}
- public int getCompressLevel() {
- return compressLevel;
- }
-
- public void setCompressLevel(int compressLevel) {
- this.compressLevel = compressLevel;
- }
-
- public CompressionType getCompressType() {
- return compressType;
- }
-
- public void setCompressType(CompressionType compressType) {
- this.compressType = compressType;
- }
-
public ServiceState getServiceState() {
return serviceState;
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index b350ba074d..5304887e38 100644
---
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -36,6 +36,9 @@ import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.hook.EndTransactionTraceHookImpl;
import org.apache.rocketmq.client.trace.hook.SendMessageTraceHookImpl;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.compression.CompressionType;
+import org.apache.rocketmq.common.compression.Compressor;
+import org.apache.rocketmq.common.compression.CompressorFactory;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
@@ -170,6 +173,21 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
private RPCHook rpcHook = null;
+ /**
+ * Compress level of compress algorithm.
+ */
+ private int compressLevel =
Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));
+
+ /**
+ * Compress type of compress algorithm, default using ZLIB.
+ */
+ private CompressionType compressType =
CompressionType.of(System.getProperty(MixAll.MESSAGE_COMPRESS_TYPE, "ZLIB"));
+
+ /**
+ * Compressor of compress algorithm.
+ */
+ private Compressor compressor =
CompressorFactory.getCompressor(compressType);
+
/**
* Default constructor.
*/
@@ -1344,4 +1362,25 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
super.setStartDetectorEnable(startDetectorEnable);
this.defaultMQProducerImpl.getMqFaultStrategy().setStartDetectorEnable(startDetectorEnable);
}
+
+ public int getCompressLevel() {
+ return compressLevel;
+ }
+
+ public void setCompressLevel(int compressLevel) {
+ this.compressLevel = compressLevel;
+ }
+
+ public CompressionType getCompressType() {
+ return compressType;
+ }
+
+ public void setCompressType(CompressionType compressType) {
+ this.compressType = compressType;
+ this.compressor = CompressorFactory.getCompressor(compressType);
+ }
+
+ public Compressor getCompressor() {
+ return compressor;
+ }
}
diff --git
a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
index c4a6162a5f..21a4b3b7e7 100644
---
a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
+++
b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
@@ -102,8 +102,8 @@ public class BatchProducer {
String compressType = commandLine.hasOption("ct") ?
commandLine.getOptionValue("ct").trim() : "ZLIB";
int compressLevel = commandLine.hasOption("cl") ?
Integer.parseInt(commandLine.getOptionValue("cl")) : 5;
int compressOverHowMuch = commandLine.hasOption("ch") ?
Integer.parseInt(commandLine.getOptionValue("ch")) : 4096;
-
producer.getDefaultMQProducerImpl().setCompressType(CompressionType.of(compressType));
-
producer.getDefaultMQProducerImpl().setCompressLevel(compressLevel);
+ producer.setCompressType(CompressionType.of(compressType));
+ producer.setCompressLevel(compressLevel);
producer.setCompressMsgBodyOverHowmuch(compressOverHowMuch);
System.out.printf("compressType: %s compressLevel: %s%n",
compressType, compressLevel);
} else {
diff --git
a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
index 480d16b758..a945283f57 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
@@ -160,8 +160,8 @@ public class Producer {
String compressType = commandLine.hasOption("ct") ?
commandLine.getOptionValue("ct").trim() : "ZLIB";
int compressLevel = commandLine.hasOption("cl") ?
Integer.parseInt(commandLine.getOptionValue("cl")) : 5;
int compressOverHowMuch = commandLine.hasOption("ch") ?
Integer.parseInt(commandLine.getOptionValue("ch")) : 4096;
-
producer.getDefaultMQProducerImpl().setCompressType(CompressionType.of(compressType));
-
producer.getDefaultMQProducerImpl().setCompressLevel(compressLevel);
+ producer.setCompressType(CompressionType.of(compressType));
+ producer.setCompressLevel(compressLevel);
producer.setCompressMsgBodyOverHowmuch(compressOverHowMuch);
System.out.printf("compressType: %s compressLevel: %s%n",
compressType, compressLevel);
} else {