This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1abb2b4 make pulsar-perf ioThread number configurable (#8090)
1abb2b4 is described below
commit 1abb2b4f7d93792d45b41026488567f037eb78e7
Author: hangc0276 <[email protected]>
AuthorDate: Tue Sep 22 07:46:27 2020 +0800
make pulsar-perf ioThread number configurable (#8090)
### Motivation
In pulser-perf, the default pulsar client ioThread number is
`Runtime.getRuntime().availableProcessors()` and can't be configured in
commandline. When running pulsar-perf producer, it may cause message enqueue
competition and lead to high latency.
### Changes
1. make ioThread number configurable in command line
2. change the default ioThead number from
`Runtime.getRuntime().availableProcessors()` to `1`
---
.../java/org/apache/pulsar/testclient/PerformanceConsumer.java | 6 +++++-
.../java/org/apache/pulsar/testclient/PerformanceProducer.java | 8 ++++++--
.../main/java/org/apache/pulsar/testclient/PerformanceReader.java | 6 +++++-
3 files changed, 16 insertions(+), 4 deletions(-)
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index f118c12..552b146 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -158,6 +158,10 @@ public class PerformanceConsumer {
@Parameter(names = { "-time",
"--test-duration" }, description = "Test duration in secs. If
0, it will keep consuming")
public long testTime = 0;
+
+ @Parameter(names = {"-ioThreads", "--num-io-threads"}, description =
"Set the number of threads to be " +
+ "used for handling connections to brokers, default is 1
thread")
+ public int ioThreads = 1;
}
public static void main(String[] args) throws Exception {
@@ -260,7 +264,7 @@ public class PerformanceConsumer {
.serviceUrl(arguments.serviceURL) //
.connectionsPerBroker(arguments.maxConnections) //
.statsInterval(arguments.statsIntervalSeconds,
TimeUnit.SECONDS) //
- .ioThreads(Runtime.getRuntime().availableProcessors()) //
+ .ioThreads(arguments.ioThreads) //
.tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
if (isNotBlank(arguments.authPluginClassName)) {
clientBuilder.authentication(arguments.authPluginClassName,
arguments.authParams);
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index 812cf0d..9153812 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -211,6 +211,10 @@ public class PerformanceProducer {
@Parameter(names = {"-mk", "--message-key-generation-mode"},
description = "The generation mode of message key" +
", valid options are: [autoIncrement, random]")
public String messageKeyGenerationMode = null;
+
+ @Parameter(names = {"-ioThreads", "--num-io-threads"}, description =
"Set the number of threads to be " +
+ "used for handling connections to brokers, default is 1
thread")
+ public int ioThreads = 1;
}
static class EncKeyReader implements CryptoKeyReader {
@@ -426,7 +430,7 @@ public class PerformanceProducer {
ClientBuilder clientBuilder = PulsarClient.builder() //
.serviceUrl(arguments.serviceURL) //
.connectionsPerBroker(arguments.maxConnections) //
- .ioThreads(Runtime.getRuntime().availableProcessors()) //
+ .ioThreads(arguments.ioThreads) //
.statsInterval(arguments.statsIntervalSeconds,
TimeUnit.SECONDS) //
.tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
@@ -628,6 +632,6 @@ public class PerformanceProducer {
private static final Logger log =
LoggerFactory.getLogger(PerformanceProducer.class);
public enum MessageKeyGenerationMode {
- autoIncrement,random;
+ autoIncrement,random
}
}
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
index 694b667..0d21196 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
@@ -117,6 +117,10 @@ public class PerformanceReader {
@Parameter(names = { "-time",
"--test-duration" }, description = "Test duration in secs. If
0, it will keep consuming")
public long testTime = 0;
+
+ @Parameter(names = {"-ioThreads", "--num-io-threads"}, description =
"Set the number of threads to be " +
+ "used for handling connections to brokers, default is 1
thread")
+ public int ioThreads = 1;
}
public static void main(String[] args) throws Exception {
@@ -211,7 +215,7 @@ public class PerformanceReader {
.serviceUrl(arguments.serviceURL) //
.connectionsPerBroker(arguments.maxConnections) //
.statsInterval(arguments.statsIntervalSeconds,
TimeUnit.SECONDS) //
- .ioThreads(Runtime.getRuntime().availableProcessors()) //
+ .ioThreads(arguments.ioThreads) //
.enableTls(arguments.useTls) //
.tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);