This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a70aec0350a [improve][broker] Avoid share same random in multi-threads
due to performance issue (#18660)
a70aec0350a is described below
commit a70aec0350a003987e42c333607a3f3626e2e014
Author: 萧易客 <[email protected]>
AuthorDate: Tue Nov 29 12:27:35 2022 +0800
[improve][broker] Avoid share same random in multi-threads due to
performance issue (#18660)
---
.../broker/service/AbstractDispatcherMultipleConsumers.java | 6 ++----
.../java/org/apache/pulsar/testclient/PerformanceProducer.java | 10 +++++-----
2 files changed, 7 insertions(+), 9 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
index 1d01f8c3b00..90e20790fae 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
@@ -20,8 +20,8 @@ package org.apache.pulsar.broker.service;
import com.carrotsearch.hppc.ObjectHashSet;
import com.carrotsearch.hppc.ObjectSet;
-import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.pulsar.broker.ServiceConfiguration;
import
org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
@@ -45,8 +45,6 @@ public abstract class AbstractDispatcherMultipleConsumers
extends AbstractBaseDi
.newUpdater(AbstractDispatcherMultipleConsumers.class,
"isClosed");
private volatile int isClosed = FALSE;
- private Random random = new Random(42);
-
protected AbstractDispatcherMultipleConsumers(Subscription subscription,
ServiceConfiguration serviceConfig) {
super(subscription, serviceConfig);
}
@@ -157,7 +155,7 @@ public abstract class AbstractDispatcherMultipleConsumers
extends AbstractBaseDi
return null;
}
- return consumerList.get(random.nextInt(consumerList.size()));
+ return
consumerList.get(ThreadLocalRandom.current().nextInt(consumerList.size()));
}
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index 589ef2cc46a..3ff78909a68 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -49,6 +49,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -392,7 +393,6 @@ public class PerformanceProducer {
msgRatePerThread,
payloadByteList,
payloadBytes,
- random,
doneLatch
);
});
@@ -529,7 +529,6 @@ public class PerformanceProducer {
int msgRate,
List<byte[]> payloadByteList,
byte[] payloadBytes,
- Random random,
CountDownLatch doneLatch) {
PulsarClient client = null;
try {
@@ -626,9 +625,10 @@ public class PerformanceProducer {
if (arguments.payloadFilename != null) {
if (messageFormatter != null) {
payloadData =
messageFormatter.formatMessage(arguments.producerName, totalSent,
-
payloadByteList.get(random.nextInt(payloadByteList.size())));
+
payloadByteList.get(ThreadLocalRandom.current().nextInt(payloadByteList.size())));
} else {
- payloadData =
payloadByteList.get(random.nextInt(payloadByteList.size()));
+ payloadData = payloadByteList.get(
+
ThreadLocalRandom.current().nextInt(payloadByteList.size()));
}
} else {
payloadData = payloadBytes;
@@ -656,7 +656,7 @@ public class PerformanceProducer {
}
//generate msg key
if (msgKeyMode == MessageKeyGenerationMode.random) {
- messageBuilder.key(String.valueOf(random.nextInt()));
+
messageBuilder.key(String.valueOf(ThreadLocalRandom.current().nextInt()));
} else if (msgKeyMode ==
MessageKeyGenerationMode.autoIncrement) {
messageBuilder.key(String.valueOf(totalSent));
}