This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new f58dbc3  [ISSUE #2300] Enhancement: Benchmark support acl and msg 
trace (#2301)
f58dbc3 is described below

commit f58dbc3edaf7d4136a0e3e0cd90d6f3a58198957
Author: 张旭 <[email protected]>
AuthorDate: Fri Sep 25 20:44:14 2020 +0800

    [ISSUE #2300] Enhancement: Benchmark support acl and msg trace (#2301)
    
    * [Benchmark]Support acl, add msgTraceEnable option for producer and 
consumer, fix Algorithm HmacSHA1 not available using openjdk
    
    * add apache header
    
    * add aclEnable option
---
 distribution/benchmark/runclass.sh                 |  2 +-
 .../rocketmq/example/benchmark/AclClient.java      | 33 ++++++++++++++++++++++
 .../rocketmq/example/benchmark/Consumer.java       | 19 +++++++++++--
 .../rocketmq/example/benchmark/Producer.java       | 17 +++++++++--
 .../example/benchmark/TransactionProducer.java     |  9 +++++-
 5 files changed, 74 insertions(+), 6 deletions(-)

diff --git a/distribution/benchmark/runclass.sh 
b/distribution/benchmark/runclass.sh
index 339e11a..12802dd 100644
--- a/distribution/benchmark/runclass.sh
+++ b/distribution/benchmark/runclass.sh
@@ -56,9 +56,9 @@ JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m 
-XX:PermSize=128m -XX:MaxPe
 JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC 
-XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 
-XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 
-XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:+DisableExplicitGC"
 JAVA_OPT="${JAVA_OPT} -verbose:gc 
-Xloggc:${GC_LOG_DIR}/rmq_run_class_gc_%p_%t.log -XX:+PrintGCDetails"
 JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
-JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib"
 JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages"
 JAVA_OPT="${JAVA_OPT} -XX:+PerfDisableSharedMem"
+JAVA_OPT="${JAVA_OPT} 
-Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib:${JAVA_HOME}/lib/ext"
 #JAVA_OPT="${JAVA_OPT} -Xdebug 
-Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
 JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
 
diff --git 
a/example/src/main/java/org/apache/rocketmq/example/benchmark/AclClient.java 
b/example/src/main/java/org/apache/rocketmq/example/benchmark/AclClient.java
new file mode 100644
index 0000000..04ef5d5
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/AclClient.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.example.benchmark;
+
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.remoting.RPCHook;
+
+public class AclClient {
+
+    private static final String ACL_ACCESS_KEY = "rocketmq2";
+
+    private static final String ACL_SECRET_KEY = "12345678";
+
+    static RPCHook getAclRPCHook() {
+        return new AclClientRPCHook(new 
SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY));
+    }
+}
diff --git 
a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java 
b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
index 08897fa..c0a2a8b 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
@@ -34,10 +34,12 @@ import org.apache.rocketmq.client.consumer.MessageSelector;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import 
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import 
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.filter.ExpressionType;
 import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.srvutil.ServerUtil;
 
 public class Consumer {
@@ -55,12 +57,16 @@ public class Consumer {
         final String filterType = commandLine.hasOption('f') ? 
commandLine.getOptionValue('f').trim() : null;
         final String expression = commandLine.hasOption('e') ? 
commandLine.getOptionValue('e').trim() : null;
         final double failRate = commandLine.hasOption('r') ? 
Double.parseDouble(commandLine.getOptionValue('r').trim()) : 0.0;
+        final boolean msgTraceEnable = commandLine.hasOption('m') && 
Boolean.parseBoolean(commandLine.getOptionValue('m'));
+        final boolean aclEnable = commandLine.hasOption('a') && 
Boolean.parseBoolean(commandLine.getOptionValue('a'));
+
         String group = groupPrefix;
         if (Boolean.parseBoolean(isSuffixEnable)) {
             group = groupPrefix + "_" + (System.currentTimeMillis() % 100);
         }
 
-        System.out.printf("topic: %s, group: %s, suffix: %s, filterType: %s, 
expression: %s%n", topic, group, isSuffixEnable, filterType, expression);
+        System.out.printf("topic: %s, group: %s, suffix: %s, filterType: %s, 
expression: %s, msgTraceEnable: %s, aclEnable: %s%n",
+            topic, group, isSuffixEnable, filterType, expression, 
msgTraceEnable, aclEnable);
 
         final StatsBenchmarkConsumer statsBenchmarkConsumer = new 
StatsBenchmarkConsumer();
 
@@ -111,7 +117,8 @@ public class Consumer {
             }
         }, 10000, 10000);
 
-        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
+        RPCHook rpcHook = aclEnable ? AclClient.getAclRPCHook() : null;
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group, 
rpcHook, new AllocateMessageQueueAveragely(), msgTraceEnable, null);
         if (commandLine.hasOption('n')) {
             String ns = commandLine.getOptionValue('n');
             consumer.setNamesrvAddr(ns);
@@ -192,6 +199,14 @@ public class Consumer {
         opt.setRequired(false);
         options.addOption(opt);
 
+        opt = new Option("m", "msgTraceEnable", true, "Message Trace Enable, 
Default: false");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("a", "aclEnable", true, "Acl Enable, Default: false");
+        opt.setRequired(false);
+        options.addOption(opt);
+
         return options;
     }
 
diff --git 
a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java 
b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
index ce2b83f..dbad169 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
@@ -35,6 +35,7 @@ import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.srvutil.ServerUtil;
@@ -53,8 +54,11 @@ public class Producer {
         final int messageSize = commandLine.hasOption('s') ? 
Integer.parseInt(commandLine.getOptionValue('s')) : 128;
         final boolean keyEnable = commandLine.hasOption('k') && 
Boolean.parseBoolean(commandLine.getOptionValue('k'));
         final int propertySize = commandLine.hasOption('p') ? 
Integer.parseInt(commandLine.getOptionValue('p')) : 0;
+        final boolean msgTraceEnable = commandLine.hasOption('m') && 
Boolean.parseBoolean(commandLine.getOptionValue('m'));
+        final boolean aclEnable = commandLine.hasOption('a') && 
Boolean.parseBoolean(commandLine.getOptionValue('a'));
 
-        System.out.printf("topic %s threadCount %d messageSize %d keyEnable 
%s%n", topic, threadCount, messageSize, keyEnable);
+        System.out.printf("topic %s threadCount %d messageSize %d keyEnable %s 
traceEnable %s aclEnable %s%n",
+            topic, threadCount, messageSize, keyEnable, msgTraceEnable, 
aclEnable);
 
         final InternalLogger log = ClientLogger.getLog();
 
@@ -100,7 +104,8 @@ public class Producer {
             }
         }, 10000, 10000);
 
-        final DefaultMQProducer producer = new 
DefaultMQProducer("benchmark_producer");
+        RPCHook rpcHook = aclEnable ? AclClient.getAclRPCHook() : null;
+        final DefaultMQProducer producer = new 
DefaultMQProducer("benchmark_producer", rpcHook, msgTraceEnable, null);
         producer.setInstanceName(Long.toString(System.currentTimeMillis()));
 
         if (commandLine.hasOption('n')) {
@@ -210,6 +215,14 @@ public class Producer {
         opt.setRequired(false);
         options.addOption(opt);
 
+        opt = new Option("m", "msgTraceEnable", true, "Message Trace Enable, 
Default: false");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("a", "aclEnable", true, "Acl Enable, Default: false");
+        opt.setRequired(false);
+        options.addOption(opt);
+
         return options;
     }
 
diff --git 
a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
 
b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
index 3531eb5..951b718 100644
--- 
a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
+++ 
b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
@@ -68,6 +68,7 @@ public class TransactionProducer {
         config.checkUnknownRate = commandLine.hasOption("cu") ? 
Double.parseDouble(commandLine.getOptionValue("cu")) : 0.0;
         config.batchId = commandLine.hasOption("b") ? 
Long.parseLong(commandLine.getOptionValue("b")) : System.currentTimeMillis();
         config.sendInterval = commandLine.hasOption("i") ? 
Integer.parseInt(commandLine.getOptionValue("i")) : 0;
+        config.aclEnable = commandLine.hasOption('a') && 
Boolean.parseBoolean(commandLine.getOptionValue('a'));
 
         final ExecutorService sendThreadPool = 
Executors.newFixedThreadPool(config.threadCount);
 
@@ -122,7 +123,8 @@ public class TransactionProducer {
         }, 10000, 10000);
 
         final TransactionListener transactionCheckListener = new 
TransactionListenerImpl(statsBenchmark, config);
-        final TransactionMQProducer producer = new 
TransactionMQProducer("benchmark_transaction_producer");
+        final TransactionMQProducer producer =
+            new TransactionMQProducer("benchmark_transaction_producer", 
config.aclEnable ? AclClient.getAclRPCHook() : null);
         producer.setInstanceName(Long.toString(System.currentTimeMillis()));
         producer.setTransactionListener(transactionCheckListener);
         producer.setDefaultTopicQueueNums(1000);
@@ -250,6 +252,10 @@ public class TransactionProducer {
         opt.setRequired(false);
         options.addOption(opt);
 
+        opt = new Option("a", "aclEnable", true, "Acl Enable, Default: false");
+        opt.setRequired(false);
+        options.addOption(opt);
+
         return options;
     }
 }
@@ -432,6 +438,7 @@ class TxSendConfig {
     double checkUnknownRate;
     long batchId;
     int sendInterval;
+    boolean aclEnable;
 }
 
 class LRUMap<K, V> extends LinkedHashMap<K, V> {

Reply via email to