This is an automated email from the ASF dual-hosted git repository.
vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 6be0922 optimise benchmark consumer, add consume fail rate option
new ad76fd3 Merge pull request #1683 from areyouok/pr_benchmark_consumer_2
6be0922 is described below
commit 6be09228f7f96e44fd1d2ce980d89f57a2f6e793
Author: huangli <[email protected]>
AuthorDate: Fri Dec 20 12:24:01 2019 +0800
optimise benchmark consumer, add consume fail rate option
---
.../rocketmq/example/benchmark/Consumer.java | 41 +++++++++++++++++-----
1 file changed, 32 insertions(+), 9 deletions(-)
diff --git
a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
index 4724a1d..08897fa 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
@@ -22,7 +22,9 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
@@ -49,15 +51,16 @@ public class Consumer {
final String topic = commandLine.hasOption('t') ?
commandLine.getOptionValue('t').trim() : "BenchmarkTest";
final String groupPrefix = commandLine.hasOption('g') ?
commandLine.getOptionValue('g').trim() : "benchmark_consumer";
- final String isPrefixEnable = commandLine.hasOption('p') ?
commandLine.getOptionValue('p').trim() : "true";
+ final String isSuffixEnable = commandLine.hasOption('p') ?
commandLine.getOptionValue('p').trim() : "true";
final String filterType = commandLine.hasOption('f') ?
commandLine.getOptionValue('f').trim() : null;
final String expression = commandLine.hasOption('e') ?
commandLine.getOptionValue('e').trim() : null;
+ final double failRate = commandLine.hasOption('r') ?
Double.parseDouble(commandLine.getOptionValue('r').trim()) : 0.0;
String group = groupPrefix;
- if (Boolean.parseBoolean(isPrefixEnable)) {
- group = groupPrefix + "_" +
Long.toString(System.currentTimeMillis() % 100);
+ if (Boolean.parseBoolean(isSuffixEnable)) {
+ group = groupPrefix + "_" + (System.currentTimeMillis() % 100);
}
- System.out.printf("topic: %s, group: %s, prefix: %s, filterType: %s,
expression: %s%n", topic, group, isPrefixEnable, filterType, expression);
+ System.out.printf("topic: %s, group: %s, suffix: %s, filterType: %s,
expression: %s%n", topic, group, isSuffixEnable, filterType, expression);
final StatsBenchmarkConsumer statsBenchmarkConsumer = new
StatsBenchmarkConsumer();
@@ -85,9 +88,15 @@ public class Consumer {
(long) (((end[1] - begin[1]) / (double) (end[0] -
begin[0])) * 1000L);
final double averageB2CRT = (end[2] - begin[2]) / (double)
(end[1] - begin[1]);
final double averageS2CRT = (end[3] - begin[3]) / (double)
(end[1] - begin[1]);
+ final long failCount = end[4] - begin[4];
+ final long b2cMax =
statsBenchmarkConsumer.getBorn2ConsumerMaxRT().get();
+ final long s2cMax =
statsBenchmarkConsumer.getStore2ConsumerMaxRT().get();
+
+ statsBenchmarkConsumer.getBorn2ConsumerMaxRT().set(0);
+ statsBenchmarkConsumer.getStore2ConsumerMaxRT().set(0);
- System.out.printf("Consume TPS: %d Average(B2C) RT: %7.3f
Average(S2C) RT: %7.3f MAX(B2C) RT: %d MAX(S2C) RT: %d%n",
- consumeTps, averageB2CRT, averageS2CRT, end[4], end[5]
+ System.out.printf("TPS: %d FAIL: %d AVG(B2C) RT: %7.3f
AVG(S2C) RT: %7.3f MAX(B2C) RT: %d MAX(S2C) RT: %d%n",
+ consumeTps, failCount, averageB2CRT, averageS2CRT,
b2cMax, s2cMax
);
}
}
@@ -144,7 +153,12 @@ public class Consumer {
compareAndSetMax(statsBenchmarkConsumer.getStore2ConsumerMaxRT(),
store2ConsumerRT);
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ if (ThreadLocalRandom.current().nextDouble() < failRate) {
+ statsBenchmarkConsumer.getFailCount().incrementAndGet();
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+ } else {
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
}
});
@@ -174,6 +188,10 @@ public class Consumer {
opt.setRequired(false);
options.addOption(opt);
+ opt = new Option("r", "fail rate", true, "consumer fail rate, default
0");
+ opt.setRequired(false);
+ options.addOption(opt);
+
return options;
}
@@ -200,14 +218,15 @@ class StatsBenchmarkConsumer {
private final AtomicLong store2ConsumerMaxRT = new AtomicLong(0L);
+ private final AtomicLong failCount = new AtomicLong(0L);
+
public Long[] createSnapshot() {
Long[] snap = new Long[] {
System.currentTimeMillis(),
this.receiveMessageTotalCount.get(),
this.born2ConsumerTotalRT.get(),
this.store2ConsumerTotalRT.get(),
- this.born2ConsumerMaxRT.get(),
- this.store2ConsumerMaxRT.get(),
+ this.failCount.get()
};
return snap;
@@ -232,4 +251,8 @@ class StatsBenchmarkConsumer {
public AtomicLong getStore2ConsumerMaxRT() {
return store2ConsumerMaxRT;
}
+
+ public AtomicLong getFailCount() {
+ return failCount;
+ }
}