This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a70aec0350a [improve][broker] Avoid share same random in multi-threads 
due to performance issue (#18660)
a70aec0350a is described below

commit a70aec0350a003987e42c333607a3f3626e2e014
Author: 萧易客 <[email protected]>
AuthorDate: Tue Nov 29 12:27:35 2022 +0800

    [improve][broker] Avoid share same random in multi-threads due to 
performance issue (#18660)
---
 .../broker/service/AbstractDispatcherMultipleConsumers.java    |  6 ++----
 .../java/org/apache/pulsar/testclient/PerformanceProducer.java | 10 +++++-----
 2 files changed, 7 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
index 1d01f8c3b00..90e20790fae 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
@@ -20,8 +20,8 @@ package org.apache.pulsar.broker.service;
 
 import com.carrotsearch.hppc.ObjectHashSet;
 import com.carrotsearch.hppc.ObjectSet;
-import java.util.Random;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import 
org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
@@ -45,8 +45,6 @@ public abstract class AbstractDispatcherMultipleConsumers 
extends AbstractBaseDi
                     .newUpdater(AbstractDispatcherMultipleConsumers.class, 
"isClosed");
     private volatile int isClosed = FALSE;
 
-    private Random random = new Random(42);
-
     protected AbstractDispatcherMultipleConsumers(Subscription subscription, 
ServiceConfiguration serviceConfig) {
         super(subscription, serviceConfig);
     }
@@ -157,7 +155,7 @@ public abstract class AbstractDispatcherMultipleConsumers 
extends AbstractBaseDi
             return null;
         }
 
-        return consumerList.get(random.nextInt(consumerList.size()));
+        return 
consumerList.get(ThreadLocalRandom.current().nextInt(consumerList.size()));
     }
 
 
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index 589ef2cc46a..3ff78909a68 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -49,6 +49,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -392,7 +393,6 @@ public class PerformanceProducer {
                     msgRatePerThread,
                     payloadByteList,
                     payloadBytes,
-                    random,
                     doneLatch
                 );
             });
@@ -529,7 +529,6 @@ public class PerformanceProducer {
                                     int msgRate,
                                     List<byte[]> payloadByteList,
                                     byte[] payloadBytes,
-                                    Random random,
                                     CountDownLatch doneLatch) {
         PulsarClient client = null;
         try {
@@ -626,9 +625,10 @@ public class PerformanceProducer {
                     if (arguments.payloadFilename != null) {
                         if (messageFormatter != null) {
                             payloadData = 
messageFormatter.formatMessage(arguments.producerName, totalSent,
-                                    
payloadByteList.get(random.nextInt(payloadByteList.size())));
+                                    
payloadByteList.get(ThreadLocalRandom.current().nextInt(payloadByteList.size())));
                         } else {
-                            payloadData = 
payloadByteList.get(random.nextInt(payloadByteList.size()));
+                            payloadData = payloadByteList.get(
+                                    
ThreadLocalRandom.current().nextInt(payloadByteList.size()));
                         }
                     } else {
                         payloadData = payloadBytes;
@@ -656,7 +656,7 @@ public class PerformanceProducer {
                     }
                     //generate msg key
                     if (msgKeyMode == MessageKeyGenerationMode.random) {
-                        messageBuilder.key(String.valueOf(random.nextInt()));
+                        
messageBuilder.key(String.valueOf(ThreadLocalRandom.current().nextInt()));
                     } else if (msgKeyMode == 
MessageKeyGenerationMode.autoIncrement) {
                         messageBuilder.key(String.valueOf(totalSent));
                     }

Reply via email to