This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 93068b4a1bb MINOR: Fix the compression arguments in TestLinearWriteSpeed (#20349) 93068b4a1bb is described below commit 93068b4a1bb345d1f151692c2727e0830ab08938 Author: Maros Orsak <maros.orsak...@gmail.com> AuthorDate: Sun Aug 17 13:21:51 2025 +0200 MINOR: Fix the compression arguments in TestLinearWriteSpeed (#20349) This PR fixes a problem related to `TestLinearWriteSpeed`. During my work on KIP-780, I discovered that benchmarks for `TestLinearWriteSpeed` do not account for compression algorithms. It always uses `Compression.NONE` when creating records. The problem was introduced in this PR [1]. [1] - https://github.com/apache/kafka/pull/17736 Reviewers: Ken Huang <s7133...@gmail.com>, Mickael Maison <mickael.mai...@gmail.com>, Chia-Ping Tsai <chia7...@gmail.com> --- .../org/apache/kafka/jmh/log/TestLinearWriteSpeed.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/TestLinearWriteSpeed.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/TestLinearWriteSpeed.java index 668ddfcfff8..32c8d87062e 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/TestLinearWriteSpeed.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/TestLinearWriteSpeed.java @@ -81,13 +81,13 @@ public class TestLinearWriteSpeed { .describedAs("num_bytes") .ofType(Integer.class); - OptionSpec<Integer> messageSizeOpt = parser.accepts("message-size", "REQUIRED: The size of each message in the message set.") + OptionSpec<Integer> messageSizeOpt = parser.accepts("message-size", "The size of each message in the message set.") .withRequiredArg() .describedAs("num_bytes") .ofType(Integer.class) .defaultsTo(1024); - OptionSpec<Integer> filesOpt = parser.accepts("files", "REQUIRED: The number of logs or files.") + OptionSpec<Integer> filesOpt = parser.accepts("files", "The number of logs or files.") .withRequiredArg() .describedAs("num_files") .ofType(Integer.class) @@ -120,14 +120,13 @@ public class TestLinearWriteSpeed { OptionSpec<Integer> compressionLevelOpt = parser.accepts("level", "The compression level to use") .withRequiredArg() .describedAs("level") - .ofType(Integer.class) - .defaultsTo(0); + .ofType(Integer.class); OptionSpec<Void> mmapOpt = parser.accepts("mmap", "Do writes to memory-mapped files."); OptionSpec<Void> channelOpt = parser.accepts("channel", "Do writes to file channels."); OptionSpec<Void> logOpt = parser.accepts("log", "Do writes to kafka logs."); OptionSet options = parser.parse(args); - CommandLineUtils.checkRequiredArgs(parser, options, bytesOpt, sizeOpt, filesOpt); + CommandLineUtils.checkRequiredArgs(parser, options, bytesOpt, sizeOpt); long bytesToWrite = options.valueOf(bytesOpt); int bufferSize = options.valueOf(sizeOpt); @@ -140,9 +139,10 @@ public class TestLinearWriteSpeed { long flushInterval = options.valueOf(flushIntervalOpt); CompressionType compressionType = CompressionType.forName(options.valueOf(compressionCodecOpt)); Compression.Builder<? extends Compression> compressionBuilder = Compression.of(compressionType); - int compressionLevel = options.valueOf(compressionLevelOpt); + Integer compressionLevel = options.valueOf(compressionLevelOpt); - setupCompression(compressionType, compressionBuilder, compressionLevel); + if (compressionLevel != null) setupCompression(compressionType, compressionBuilder, compressionLevel); + Compression compression = compressionBuilder.build(); ThreadLocalRandom.current().nextBytes(buffer.array()); int numMessages = bufferSize / (messageSize + Records.LOG_OVERHEAD); @@ -153,7 +153,7 @@ public class TestLinearWriteSpeed { recordsList.add(new SimpleRecord(createTime, null, new byte[messageSize])); } - MemoryRecords messageSet = MemoryRecords.withRecords(Compression.NONE, recordsList.toArray(new SimpleRecord[0])); + MemoryRecords messageSet = MemoryRecords.withRecords(compression, recordsList.toArray(new SimpleRecord[0])); Writable[] writables = new Writable[numFiles]; KafkaScheduler scheduler = new KafkaScheduler(1); scheduler.startup(); @@ -222,7 +222,7 @@ public class TestLinearWriteSpeed { private static void setupCompression(CompressionType compressionType, Compression.Builder<? extends Compression> compressionBuilder, - int compressionLevel) { + Integer compressionLevel) { switch (compressionType) { case GZIP: ((GzipCompression.Builder) compressionBuilder).level(compressionLevel);