This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2b26db0d38f Switch to SplittableRandom in ProducerPerformance utility 
(#13482)
2b26db0d38f is described below

commit 2b26db0d38f7245505812e4cb3fa622fc07ba6c8
Author: Robert Young <[email protected]>
AuthorDate: Fri Mar 31 19:52:10 2023 +1300

    Switch to SplittableRandom in ProducerPerformance utility (#13482)
    
    Why:
    Using java.util.Random to generate every byte sent from the 
ProducerPerformance
    appears to be a limiting factor. Throughput of the ProducerPerformance 
script is
    higher with a file of records as compared to randomly generated records.
    
    On my machine a single thread can generate ~100MB/second of uppercase 
letters using
    java.util.Random and ~300MB/sec using java.util.SplittableRandom. This is a 
limit on
    throughput.
    
    Note: you can optimise further by expanding it from 26 letters to 32 letter 
generated
    as it is more efficient to generate a nicely distributed int when the bound 
is a
    power of two.
    
    Reviewers: Luke Chen <[email protected]>
---
 .../src/main/java/org/apache/kafka/tools/ProducerPerformance.java | 7 ++++---
 .../test/java/org/apache/kafka/tools/ProducerPerformanceTest.java | 8 ++++----
 2 files changed, 8 insertions(+), 7 deletions(-)

diff --git 
a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java 
b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
index d8a0f260691..fd15ddd1b6b 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
@@ -27,8 +27,8 @@ import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
-import java.util.Random;
 import java.util.Arrays;
+import java.util.SplittableRandom;
 
 import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;
 import org.apache.kafka.clients.producer.Callback;
@@ -92,7 +92,8 @@ public class ProducerPerformance {
             if (recordSize != null) {
                 payload = new byte[recordSize];
             }
-            Random random = new Random(0);
+            // not threadsafe, do not share with other threads
+            SplittableRandom random = new SplittableRandom(0);
             ProducerRecord<byte[], byte[]> record;
             stats = new Stats(numRecords, 5000);
             long startMs = System.currentTimeMillis();
@@ -169,7 +170,7 @@ public class ProducerPerformance {
     Stats stats;
 
     static byte[] generateRandomPayload(Integer recordSize, List<byte[]> 
payloadByteList, byte[] payload,
-            Random random) {
+            SplittableRandom random) {
         if (!payloadByteList.isEmpty()) {
             payload = 
payloadByteList.get(random.nextInt(payloadByteList.size()));
         } else if (recordSize != null) {
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java 
b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java
index 70717938845..f97e34dda9c 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java
@@ -36,7 +36,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
-import java.util.Random;
+import java.util.SplittableRandom;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -177,7 +177,7 @@ public class ProducerPerformanceTest {
         List<byte[]> payloadByteList = new ArrayList<>();
         payloadByteList.add(byteArray);
         byte[] payload = null;
-        Random random = new Random(0);
+        SplittableRandom random = new SplittableRandom(0);
 
         payload = ProducerPerformance.generateRandomPayload(recordSize, 
payloadByteList, payload, random);
         assertEquals(inputString, new String(payload));
@@ -188,7 +188,7 @@ public class ProducerPerformanceTest {
         Integer recordSize = 100;
         byte[] payload = new byte[recordSize];
         List<byte[]> payloadByteList = new ArrayList<>();
-        Random random = new Random(0);
+        SplittableRandom random = new SplittableRandom(0);
 
         payload = ProducerPerformance.generateRandomPayload(recordSize, 
payloadByteList, payload, random);
         for (byte b : payload) {
@@ -201,7 +201,7 @@ public class ProducerPerformanceTest {
         Integer recordSize = null;
         byte[] payload = null;
         List<byte[]> payloadByteList = new ArrayList<>();
-        Random random = new Random(0);
+        SplittableRandom random = new SplittableRandom(0);
 
         IllegalArgumentException thrown = 
assertThrows(IllegalArgumentException.class, () -> 
ProducerPerformance.generateRandomPayload(recordSize, payloadByteList, payload, 
random));
         assertEquals("no payload File Path or record Size provided", 
thrown.getMessage());

Reply via email to