This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new f58dbc3 [ISSUE #2300] Enhancement: Benchmark support acl and msg
trace (#2301)
f58dbc3 is described below
commit f58dbc3edaf7d4136a0e3e0cd90d6f3a58198957
Author: 张旭 <[email protected]>
AuthorDate: Fri Sep 25 20:44:14 2020 +0800
[ISSUE #2300] Enhancement: Benchmark support acl and msg trace (#2301)
* [Benchmark]Support acl, add msgTraceEnable option for producer and
consumer, fix Algorithm HmacSHA1 not available using openjdk
* add apache header
* add aclEnable option
---
distribution/benchmark/runclass.sh | 2 +-
.../rocketmq/example/benchmark/AclClient.java | 33 ++++++++++++++++++++++
.../rocketmq/example/benchmark/Consumer.java | 19 +++++++++++--
.../rocketmq/example/benchmark/Producer.java | 17 +++++++++--
.../example/benchmark/TransactionProducer.java | 9 +++++-
5 files changed, 74 insertions(+), 6 deletions(-)
diff --git a/distribution/benchmark/runclass.sh
b/distribution/benchmark/runclass.sh
index 339e11a..12802dd 100644
--- a/distribution/benchmark/runclass.sh
+++ b/distribution/benchmark/runclass.sh
@@ -56,9 +56,9 @@ JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m
-XX:PermSize=128m -XX:MaxPe
JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC
-XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70
-XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0
-XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:+DisableExplicitGC"
JAVA_OPT="${JAVA_OPT} -verbose:gc
-Xloggc:${GC_LOG_DIR}/rmq_run_class_gc_%p_%t.log -XX:+PrintGCDetails"
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
-JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages"
JAVA_OPT="${JAVA_OPT} -XX:+PerfDisableSharedMem"
+JAVA_OPT="${JAVA_OPT}
-Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib:${JAVA_HOME}/lib/ext"
#JAVA_OPT="${JAVA_OPT} -Xdebug
-Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
diff --git
a/example/src/main/java/org/apache/rocketmq/example/benchmark/AclClient.java
b/example/src/main/java/org/apache/rocketmq/example/benchmark/AclClient.java
new file mode 100644
index 0000000..04ef5d5
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/AclClient.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.example.benchmark;
+
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.remoting.RPCHook;
+
+public class AclClient {
+
+ private static final String ACL_ACCESS_KEY = "rocketmq2";
+
+ private static final String ACL_SECRET_KEY = "12345678";
+
+ static RPCHook getAclRPCHook() {
+ return new AclClientRPCHook(new
SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY));
+ }
+}
diff --git
a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
index 08897fa..c0a2a8b 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
@@ -34,10 +34,12 @@ import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;
public class Consumer {
@@ -55,12 +57,16 @@ public class Consumer {
final String filterType = commandLine.hasOption('f') ?
commandLine.getOptionValue('f').trim() : null;
final String expression = commandLine.hasOption('e') ?
commandLine.getOptionValue('e').trim() : null;
final double failRate = commandLine.hasOption('r') ?
Double.parseDouble(commandLine.getOptionValue('r').trim()) : 0.0;
+ final boolean msgTraceEnable = commandLine.hasOption('m') &&
Boolean.parseBoolean(commandLine.getOptionValue('m'));
+ final boolean aclEnable = commandLine.hasOption('a') &&
Boolean.parseBoolean(commandLine.getOptionValue('a'));
+
String group = groupPrefix;
if (Boolean.parseBoolean(isSuffixEnable)) {
group = groupPrefix + "_" + (System.currentTimeMillis() % 100);
}
- System.out.printf("topic: %s, group: %s, suffix: %s, filterType: %s,
expression: %s%n", topic, group, isSuffixEnable, filterType, expression);
+ System.out.printf("topic: %s, group: %s, suffix: %s, filterType: %s,
expression: %s, msgTraceEnable: %s, aclEnable: %s%n",
+ topic, group, isSuffixEnable, filterType, expression,
msgTraceEnable, aclEnable);
final StatsBenchmarkConsumer statsBenchmarkConsumer = new
StatsBenchmarkConsumer();
@@ -111,7 +117,8 @@ public class Consumer {
}
}, 10000, 10000);
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
+ RPCHook rpcHook = aclEnable ? AclClient.getAclRPCHook() : null;
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group,
rpcHook, new AllocateMessageQueueAveragely(), msgTraceEnable, null);
if (commandLine.hasOption('n')) {
String ns = commandLine.getOptionValue('n');
consumer.setNamesrvAddr(ns);
@@ -192,6 +199,14 @@ public class Consumer {
opt.setRequired(false);
options.addOption(opt);
+ opt = new Option("m", "msgTraceEnable", true, "Message Trace Enable,
Default: false");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("a", "aclEnable", true, "Acl Enable, Default: false");
+ opt.setRequired(false);
+ options.addOption(opt);
+
return options;
}
diff --git
a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
index ce2b83f..dbad169 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
@@ -35,6 +35,7 @@ import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.srvutil.ServerUtil;
@@ -53,8 +54,11 @@ public class Producer {
final int messageSize = commandLine.hasOption('s') ?
Integer.parseInt(commandLine.getOptionValue('s')) : 128;
final boolean keyEnable = commandLine.hasOption('k') &&
Boolean.parseBoolean(commandLine.getOptionValue('k'));
final int propertySize = commandLine.hasOption('p') ?
Integer.parseInt(commandLine.getOptionValue('p')) : 0;
+ final boolean msgTraceEnable = commandLine.hasOption('m') &&
Boolean.parseBoolean(commandLine.getOptionValue('m'));
+ final boolean aclEnable = commandLine.hasOption('a') &&
Boolean.parseBoolean(commandLine.getOptionValue('a'));
- System.out.printf("topic %s threadCount %d messageSize %d keyEnable
%s%n", topic, threadCount, messageSize, keyEnable);
+ System.out.printf("topic %s threadCount %d messageSize %d keyEnable %s
traceEnable %s aclEnable %s%n",
+ topic, threadCount, messageSize, keyEnable, msgTraceEnable,
aclEnable);
final InternalLogger log = ClientLogger.getLog();
@@ -100,7 +104,8 @@ public class Producer {
}
}, 10000, 10000);
- final DefaultMQProducer producer = new
DefaultMQProducer("benchmark_producer");
+ RPCHook rpcHook = aclEnable ? AclClient.getAclRPCHook() : null;
+ final DefaultMQProducer producer = new
DefaultMQProducer("benchmark_producer", rpcHook, msgTraceEnable, null);
producer.setInstanceName(Long.toString(System.currentTimeMillis()));
if (commandLine.hasOption('n')) {
@@ -210,6 +215,14 @@ public class Producer {
opt.setRequired(false);
options.addOption(opt);
+ opt = new Option("m", "msgTraceEnable", true, "Message Trace Enable,
Default: false");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("a", "aclEnable", true, "Acl Enable, Default: false");
+ opt.setRequired(false);
+ options.addOption(opt);
+
return options;
}
diff --git
a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
index 3531eb5..951b718 100644
---
a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
+++
b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
@@ -68,6 +68,7 @@ public class TransactionProducer {
config.checkUnknownRate = commandLine.hasOption("cu") ?
Double.parseDouble(commandLine.getOptionValue("cu")) : 0.0;
config.batchId = commandLine.hasOption("b") ?
Long.parseLong(commandLine.getOptionValue("b")) : System.currentTimeMillis();
config.sendInterval = commandLine.hasOption("i") ?
Integer.parseInt(commandLine.getOptionValue("i")) : 0;
+ config.aclEnable = commandLine.hasOption('a') &&
Boolean.parseBoolean(commandLine.getOptionValue('a'));
final ExecutorService sendThreadPool =
Executors.newFixedThreadPool(config.threadCount);
@@ -122,7 +123,8 @@ public class TransactionProducer {
}, 10000, 10000);
final TransactionListener transactionCheckListener = new
TransactionListenerImpl(statsBenchmark, config);
- final TransactionMQProducer producer = new
TransactionMQProducer("benchmark_transaction_producer");
+ final TransactionMQProducer producer =
+ new TransactionMQProducer("benchmark_transaction_producer",
config.aclEnable ? AclClient.getAclRPCHook() : null);
producer.setInstanceName(Long.toString(System.currentTimeMillis()));
producer.setTransactionListener(transactionCheckListener);
producer.setDefaultTopicQueueNums(1000);
@@ -250,6 +252,10 @@ public class TransactionProducer {
opt.setRequired(false);
options.addOption(opt);
+ opt = new Option("a", "aclEnable", true, "Acl Enable, Default: false");
+ opt.setRequired(false);
+ options.addOption(opt);
+
return options;
}
}
@@ -432,6 +438,7 @@ class TxSendConfig {
double checkUnknownRate;
long batchId;
int sendInterval;
+ boolean aclEnable;
}
class LRUMap<K, V> extends LinkedHashMap<K, V> {