This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 8d10c9601 [ISSUE #5281]Unify the command line input parameters in all
pressure testing programs (#5283)
8d10c9601 is described below
commit 8d10c9601d137be397d17003634cceb89c949a69
Author: zhangjidi2016 <[email protected]>
AuthorDate: Wed Oct 12 14:36:44 2022 +0800
[ISSUE #5281]Unify the command line input parameters in all pressure
testing programs (#5283)
Co-authored-by: zhangjidi <[email protected]>
---
.../rocketmq/example/benchmark/BatchProducer.java | 38 +++++++++++-----------
1 file changed, 19 insertions(+), 19 deletions(-)
diff --git
a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
index a09443a3a..51be4dabe 100644
---
a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
+++
b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
@@ -27,15 +27,12 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
-
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.rocketmq.acl.common.AclClientRPCHook;
-import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.log.ClientLogger;
@@ -43,8 +40,8 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -74,8 +71,6 @@ public class BatchProducer {
final int tagCount = getOptionValue(commandLine, 'l', 0);
final boolean msgTraceEnable = getOptionValue(commandLine, 'm', false);
final boolean aclEnable = getOptionValue(commandLine, 'a', false);
- final String ak = getOptionValue(commandLine, 'c', "rocketmq2");
- final String sk = getOptionValue(commandLine, 'e', "12346789");
System.out.printf("topic: %s threadCount: %d messageSize: %d
batchSize: %d keyEnable: %s propertySize: %d tagCount: %d traceEnable: %s
aclEnable: %s%n",
topic, threadCount, messageSize, batchSize, keyEnable,
propertySize, tagCount, msgTraceEnable, aclEnable);
@@ -89,7 +84,14 @@ public class BatchProducer {
final StatsBenchmarkBatchProducer statsBenchmark = new
StatsBenchmarkBatchProducer();
statsBenchmark.start();
- final DefaultMQProducer producer = initInstance(namesrv,
msgTraceEnable, aclEnable, ak, sk);
+ RPCHook rpcHook = null;
+ if (aclEnable) {
+ String ak = commandLine.hasOption("ak") ?
String.valueOf(commandLine.getOptionValue("ak")) : AclClient.ACL_ACCESS_KEY;
+ String sk = commandLine.hasOption("sk") ?
String.valueOf(commandLine.getOptionValue("sk")) : AclClient.ACL_SECRET_KEY;
+ rpcHook = AclClient.getAclRPCHook(ak, sk);
+ }
+
+ final DefaultMQProducer producer = initInstance(namesrv,
msgTraceEnable, rpcHook);
producer.start();
final InternalLogger log = ClientLogger.getLog();
@@ -202,11 +204,11 @@ public class BatchProducer {
opt.setRequired(false);
options.addOption(opt);
- opt = new Option("c", "accessKey", true, "Acl Access Key, Default:
rocketmq2");
+ opt = new Option("ak", "accessKey", true, "Acl Access Key, Default:
rocketmq2");
opt.setRequired(false);
options.addOption(opt);
- opt = new Option("e", "secretKey", true, "Acl Secret Key, Default:
123456789");
+ opt = new Option("sk", "secretKey", true, "Acl Secret Key, Default:
123456789");
opt.setRequired(false);
options.addOption(opt);
@@ -295,9 +297,7 @@ public class BatchProducer {
}
}
- private static DefaultMQProducer initInstance(String namesrv, boolean
traceEnable, boolean aclEnable, String ak,
- String sk) {
- RPCHook rpcHook = aclEnable ? new AclClientRPCHook(new
SessionCredentials(ak, sk)) : null;
+ private static DefaultMQProducer initInstance(String namesrv, boolean
traceEnable, RPCHook rpcHook) {
final DefaultMQProducer producer = new
DefaultMQProducer("benchmark_batch_producer", rpcHook, traceEnable, null);
producer.setInstanceName(Long.toString(System.currentTimeMillis()));
@@ -322,18 +322,18 @@ class StatsBenchmarkBatchProducer {
private final LongAdder sendMessageFailedCount = new LongAdder();
private final ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
- "BenchmarkTimerThread", Boolean.TRUE));
+ "BenchmarkTimerThread", Boolean.TRUE));
private final LinkedList<Long[]> snapshotList = new LinkedList<>();
public Long[] createSnapshot() {
Long[] snap = new Long[] {
- System.currentTimeMillis(),
- this.sendRequestSuccessCount.longValue(),
- this.sendRequestFailedCount.longValue(),
- this.sendMessageSuccessCount.longValue(),
- this.sendMessageFailedCount.longValue(),
- this.sendMessageSuccessTimeTotal.longValue(),
+ System.currentTimeMillis(),
+ this.sendRequestSuccessCount.longValue(),
+ this.sendRequestFailedCount.longValue(),
+ this.sendMessageSuccessCount.longValue(),
+ this.sendMessageFailedCount.longValue(),
+ this.sendMessageSuccessTimeTotal.longValue(),
};
return snap;