This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 93068b4a1bb MINOR: Fix the compression arguments in 
TestLinearWriteSpeed (#20349)
93068b4a1bb is described below

commit 93068b4a1bb345d1f151692c2727e0830ab08938
Author: Maros Orsak <maros.orsak...@gmail.com>
AuthorDate: Sun Aug 17 13:21:51 2025 +0200

    MINOR: Fix the compression arguments in TestLinearWriteSpeed (#20349)
    
    This PR fixes a problem related to `TestLinearWriteSpeed`.  During my
    work on KIP-780, I discovered that benchmarks for `TestLinearWriteSpeed`
    do not account for compression algorithms. It always uses
    `Compression.NONE` when creating records. The problem was introduced in
    this PR [1].
    
    [1] - https://github.com/apache/kafka/pull/17736
    
    Reviewers: Ken Huang <s7133...@gmail.com>, Mickael Maison
    <mickael.mai...@gmail.com>, Chia-Ping Tsai <chia7...@gmail.com>
---
 .../org/apache/kafka/jmh/log/TestLinearWriteSpeed.java | 18 +++++++++---------
 1 file changed, 9 insertions(+), 9 deletions(-)

diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/TestLinearWriteSpeed.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/TestLinearWriteSpeed.java
index 668ddfcfff8..32c8d87062e 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/TestLinearWriteSpeed.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/TestLinearWriteSpeed.java
@@ -81,13 +81,13 @@ public class TestLinearWriteSpeed {
             .describedAs("num_bytes")
             .ofType(Integer.class);
 
-        OptionSpec<Integer> messageSizeOpt = parser.accepts("message-size", 
"REQUIRED: The size of each message in the message set.")
+        OptionSpec<Integer> messageSizeOpt = parser.accepts("message-size", 
"The size of each message in the message set.")
             .withRequiredArg()
             .describedAs("num_bytes")
             .ofType(Integer.class)
             .defaultsTo(1024);
 
-        OptionSpec<Integer> filesOpt = parser.accepts("files", "REQUIRED: The 
number of logs or files.")
+        OptionSpec<Integer> filesOpt = parser.accepts("files", "The number of 
logs or files.")
             .withRequiredArg()
             .describedAs("num_files")
             .ofType(Integer.class)
@@ -120,14 +120,13 @@ public class TestLinearWriteSpeed {
         OptionSpec<Integer> compressionLevelOpt = parser.accepts("level", "The 
compression level to use")
             .withRequiredArg()
             .describedAs("level")
-            .ofType(Integer.class)
-            .defaultsTo(0);
+            .ofType(Integer.class);
 
         OptionSpec<Void> mmapOpt = parser.accepts("mmap", "Do writes to 
memory-mapped files.");
         OptionSpec<Void> channelOpt = parser.accepts("channel", "Do writes to 
file channels.");
         OptionSpec<Void> logOpt = parser.accepts("log", "Do writes to kafka 
logs.");
         OptionSet options = parser.parse(args);
-        CommandLineUtils.checkRequiredArgs(parser, options, bytesOpt, sizeOpt, 
filesOpt);
+        CommandLineUtils.checkRequiredArgs(parser, options, bytesOpt, sizeOpt);
 
         long bytesToWrite = options.valueOf(bytesOpt);
         int bufferSize = options.valueOf(sizeOpt);
@@ -140,9 +139,10 @@ public class TestLinearWriteSpeed {
         long flushInterval = options.valueOf(flushIntervalOpt);
         CompressionType compressionType = 
CompressionType.forName(options.valueOf(compressionCodecOpt));
         Compression.Builder<? extends Compression> compressionBuilder = 
Compression.of(compressionType);
-        int compressionLevel = options.valueOf(compressionLevelOpt);
+        Integer compressionLevel = options.valueOf(compressionLevelOpt);
 
-        setupCompression(compressionType, compressionBuilder, 
compressionLevel);
+        if (compressionLevel != null) setupCompression(compressionType, 
compressionBuilder, compressionLevel);
+        Compression compression = compressionBuilder.build();
 
         ThreadLocalRandom.current().nextBytes(buffer.array());
         int numMessages = bufferSize / (messageSize + Records.LOG_OVERHEAD);
@@ -153,7 +153,7 @@ public class TestLinearWriteSpeed {
             recordsList.add(new SimpleRecord(createTime, null, new 
byte[messageSize]));
         }
 
-        MemoryRecords messageSet = MemoryRecords.withRecords(Compression.NONE, 
recordsList.toArray(new SimpleRecord[0]));
+        MemoryRecords messageSet = MemoryRecords.withRecords(compression, 
recordsList.toArray(new SimpleRecord[0]));
         Writable[] writables = new Writable[numFiles];
         KafkaScheduler scheduler = new KafkaScheduler(1);
         scheduler.startup();
@@ -222,7 +222,7 @@ public class TestLinearWriteSpeed {
 
     private static void setupCompression(CompressionType compressionType,
                                          Compression.Builder<? extends 
Compression> compressionBuilder,
-                                         int compressionLevel) {
+                                         Integer compressionLevel) {
         switch (compressionType) {
             case GZIP:
                 ((GzipCompression.Builder) 
compressionBuilder).level(compressionLevel);

Reply via email to