This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0e4d8b3e868 KAFKA-17569 Rewrite TestLinearWriteSpeed by Java (#17736)
0e4d8b3e868 is described below

commit 0e4d8b3e8688d8c4babcf5f5718d946c636a5390
Author: TengYao Chi <[email protected]>
AuthorDate: Tue Nov 26 23:43:01 2024 +0800

    KAFKA-17569 Rewrite TestLinearWriteSpeed by Java (#17736)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 build.gradle                                       |   1 +
 checkstyle/import-control-jmh-benchmarks.xml       |   1 +
 .../scala/other/kafka/TestLinearWriteSpeed.scala   | 254 ---------------
 .../apache/kafka/jmh/log/TestLinearWriteSpeed.java | 342 +++++++++++++++++++++
 4 files changed, 344 insertions(+), 254 deletions(-)

diff --git a/build.gradle b/build.gradle
index 39ae8a26744..338bc9aa0cf 100644
--- a/build.gradle
+++ b/build.gradle
@@ -3318,6 +3318,7 @@ project(':jmh-benchmarks') {
     implementation project(':metadata')
     implementation project(':storage')
     implementation project(':streams')
+    implementation project(':transaction-coordinator')
     implementation project(':core')
     implementation project(':connect:api')
     implementation project(':connect:transforms')
diff --git a/checkstyle/import-control-jmh-benchmarks.xml 
b/checkstyle/import-control-jmh-benchmarks.xml
index 6840d786926..4469ccf3bbe 100644
--- a/checkstyle/import-control-jmh-benchmarks.xml
+++ b/checkstyle/import-control-jmh-benchmarks.xml
@@ -58,6 +58,7 @@
     <allow pkg="org.apache.kafka.image"/>
     <allow pkg="org.apache.kafka.metadata"/>
     <allow pkg="org.apache.kafka.timeline" />
+    <allow pkg="org.apache.kafka.coordinator.transaction"/>
     <allow pkg="org.apache.kafka.connect" />
     <allow pkg="org.apache.kafka.network" />
     <allow class="org.apache.kafka.raft.QuorumConfig"/>
diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala 
b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
deleted file mode 100755
index adef2c63809..00000000000
--- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
+++ /dev/null
@@ -1,254 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka
-
-import java.io._
-import java.nio._
-import java.nio.channels._
-import java.nio.file.StandardOpenOption
-import java.util.{Properties, Random}
-import joptsimple._
-import kafka.log._
-import org.apache.kafka.common.compress.{Compression, GzipCompression, 
Lz4Compression, ZstdCompression}
-import org.apache.kafka.common.config.TopicConfig
-import org.apache.kafka.common.record._
-import org.apache.kafka.common.utils.{Exit, Time, Utils}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfig
-import org.apache.kafka.server.util.{KafkaScheduler, Scheduler}
-import org.apache.kafka.server.util.CommandLineUtils
-import org.apache.kafka.storage.internals.log.{LogConfig, 
LogDirFailureChannel, ProducerStateManagerConfig}
-import org.apache.kafka.storage.log.metrics.BrokerTopicStats
-
-import scala.math.max
-
-/**
- * This test does linear writes using either a kafka log or a file and 
measures throughput and latency.
- */
-object TestLinearWriteSpeed {
-
-  def main(args: Array[String]): Unit = {
-    val parser = new OptionParser(false)
-    val dirOpt = parser.accepts("dir", "The directory to write to.")
-                           .withRequiredArg
-                           .describedAs("path")
-                           .ofType(classOf[java.lang.String])
-                           .defaultsTo(System.getProperty("java.io.tmpdir"))
-    val bytesOpt = parser.accepts("bytes", "REQUIRED: The total number of 
bytes to write.")
-                           .withRequiredArg
-                           .describedAs("num_bytes")
-                           .ofType(classOf[java.lang.Long])
-    val sizeOpt = parser.accepts("size", "REQUIRED: The size of each write.")
-                           .withRequiredArg
-                           .describedAs("num_bytes")
-                           .ofType(classOf[java.lang.Integer])
-    val messageSizeOpt = parser.accepts("message-size", "REQUIRED: The size of 
each message in the message set.")
-                           .withRequiredArg
-                           .describedAs("num_bytes")
-                           .ofType(classOf[java.lang.Integer])
-                           .defaultsTo(1024)
-    val filesOpt = parser.accepts("files", "REQUIRED: The number of logs or 
files.")
-                           .withRequiredArg
-                           .describedAs("num_files")
-                           .ofType(classOf[java.lang.Integer])
-                           .defaultsTo(1)
-    val reportingIntervalOpt = parser.accepts("reporting-interval", "The 
number of ms between updates.")
-                           .withRequiredArg
-                           .describedAs("ms")
-                           .ofType(classOf[java.lang.Long])
-                           .defaultsTo(1000L)
-    val maxThroughputOpt = parser.accepts("max-throughput-mb", "The maximum 
throughput.")
-                           .withRequiredArg
-                           .describedAs("mb")
-                           .ofType(classOf[java.lang.Integer])
-                           .defaultsTo(Integer.MAX_VALUE)
-    val flushIntervalOpt = parser.accepts("flush-interval", "The number of 
messages between flushes")
-                           .withRequiredArg()
-                           .describedAs("message_count")
-                           .ofType(classOf[java.lang.Long])
-                           .defaultsTo(Long.MaxValue)
-    val compressionCodecOpt = parser.accepts("compression", "The compression 
codec to use")
-                            .withRequiredArg
-                            .describedAs("codec")
-                            .ofType(classOf[java.lang.String])
-                            .defaultsTo(CompressionType.NONE.name)
-    val compressionLevelOpt = parser.accepts("level", "The compression level 
to use")
-                            .withRequiredArg
-                            .describedAs("level")
-                            .ofType(classOf[java.lang.Integer])
-                            .defaultsTo(0)
-    val mmapOpt = parser.accepts("mmap", "Do writes to memory-mapped files.")
-    val channelOpt = parser.accepts("channel", "Do writes to file channels.")
-    val logOpt = parser.accepts("log", "Do writes to kafka logs.")
-
-    val options = parser.parse(args : _*)
-
-    CommandLineUtils.checkRequiredArgs(parser, options, bytesOpt, sizeOpt, 
filesOpt)
-
-    var bytesToWrite = options.valueOf(bytesOpt).longValue
-    val bufferSize = options.valueOf(sizeOpt).intValue
-    val numFiles = options.valueOf(filesOpt).intValue
-    val reportingInterval = options.valueOf(reportingIntervalOpt).longValue
-    val dir = options.valueOf(dirOpt)
-    val maxThroughputBytes = options.valueOf(maxThroughputOpt).intValue * 
1024L * 1024L
-    val buffer = ByteBuffer.allocate(bufferSize)
-    val messageSize = options.valueOf(messageSizeOpt).intValue
-    val flushInterval = options.valueOf(flushIntervalOpt).longValue
-    val compressionType = 
CompressionType.forName(options.valueOf(compressionCodecOpt))
-    val compressionBuilder = Compression.of(compressionType)
-    val compressionLevel = options.valueOf(compressionLevelOpt)
-    compressionType match {
-      case CompressionType.GZIP => 
compressionBuilder.asInstanceOf[GzipCompression.Builder].level(compressionLevel)
-      case CompressionType.LZ4 => 
compressionBuilder.asInstanceOf[Lz4Compression.Builder].level(compressionLevel)
-      case CompressionType.ZSTD => 
compressionBuilder.asInstanceOf[ZstdCompression.Builder].level(compressionLevel)
-      case _ => //Noop
-    }
-    val compression = compressionBuilder.build()
-    val rand = new Random
-    rand.nextBytes(buffer.array)
-    val numMessages = bufferSize / (messageSize + Records.LOG_OVERHEAD)
-    val createTime = System.currentTimeMillis
-    val messageSet = {
-      val records = (0 until numMessages).map(_ => new 
SimpleRecord(createTime, null, new Array[Byte](messageSize)))
-      MemoryRecords.withRecords(compression, records: _*)
-    }
-
-    val writables = new Array[Writable](numFiles)
-    val scheduler = new KafkaScheduler(1)
-    scheduler.startup()
-    for (i <- 0 until numFiles) {
-      if (options.has(mmapOpt)) {
-        writables(i) = new MmapWritable(new File(dir, "kafka-test-" + i + 
".dat"), bytesToWrite / numFiles, buffer)
-      } else if (options.has(channelOpt)) {
-        writables(i) = new ChannelWritable(new File(dir, "kafka-test-" + i + 
".dat"), buffer)
-      } else if (options.has(logOpt)) {
-        val segmentSize = rand.nextInt(512)*1024*1024 + 64*1024*1024 // vary 
size to avoid herd effect
-        val logProperties = new Properties()
-        logProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, segmentSize: 
java.lang.Integer)
-        logProperties.put(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG, 
flushInterval: java.lang.Long)
-        writables(i) = new LogWritable(new File(dir, "kafka-test-" + i), new 
LogConfig(logProperties), scheduler, messageSet)
-      } else {
-        System.err.println("Must specify what to write to with one of --log, 
--channel, or --mmap")
-        Exit.exit(1)
-      }
-    }
-    bytesToWrite = (bytesToWrite / numFiles) * numFiles
-
-    println("%10s\t%10s\t%10s".format("mb_sec", "avg_latency", "max_latency"))
-
-    val beginTest = System.nanoTime
-    var maxLatency = 0L
-    var totalLatency = 0L
-    var count = 0L
-    var written = 0L
-    var totalWritten = 0L
-    var lastReport = beginTest
-    while (totalWritten + bufferSize < bytesToWrite) {
-      val start = System.nanoTime
-      val writeSize = writables((count % numFiles).toInt.abs).write()
-      val elapsed = System.nanoTime - start
-      maxLatency = max(elapsed, maxLatency)
-      totalLatency += elapsed
-      written += writeSize
-      count += 1
-      totalWritten += writeSize
-      if ((start - lastReport)/(1000.0*1000.0) > 
reportingInterval.doubleValue) {
-        val elapsedSecs = (start - lastReport) / (1000.0*1000.0*1000.0)
-        val mb = written / (1024.0*1024.0)
-        println("%10.3f\t%10.3f\t%10.3f".format(mb / elapsedSecs, totalLatency 
/ count.toDouble / (1000.0*1000.0), maxLatency / (1000.0 * 1000.0)))
-        lastReport = start
-        written = 0
-        maxLatency = 0L
-        totalLatency = 0L
-      } else if (written > maxThroughputBytes * (reportingInterval / 1000.0)) {
-        // if we have written enough, just sit out this reporting interval
-        val lastReportMs = lastReport / (1000*1000)
-        val now = System.nanoTime / (1000*1000)
-        val sleepMs = lastReportMs + reportingInterval - now
-        if (sleepMs > 0)
-          Thread.sleep(sleepMs)
-      }
-    }
-    val elapsedSecs = (System.nanoTime - beginTest) / (1000.0*1000.0*1000.0)
-    println((bytesToWrite / (1024.0 * 1024.0 * elapsedSecs)).toString + " MB 
per sec")
-    scheduler.shutdown()
-  }
-
-  trait Writable {
-    def write(): Int
-    def close(): Unit
-  }
-
-  class MmapWritable(val file: File, size: Long, val content: ByteBuffer) 
extends Writable {
-    file.deleteOnExit()
-    val raf = new RandomAccessFile(file, "rw")
-    raf.setLength(size)
-    val buffer = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, 
raf.length())
-    def write(): Int = {
-      buffer.put(content)
-      content.rewind()
-      content.limit()
-    }
-    def close(): Unit = {
-      raf.close()
-      Utils.delete(file)
-    }
-  }
-
-  class ChannelWritable(val file: File, val content: ByteBuffer) extends 
Writable {
-    file.deleteOnExit()
-    val channel: FileChannel = FileChannel.open(file.toPath, 
StandardOpenOption.CREATE, StandardOpenOption.READ,
-      StandardOpenOption.WRITE)
-    def write(): Int = {
-      channel.write(content)
-      content.rewind()
-      content.limit()
-    }
-    def close(): Unit = {
-      channel.close()
-      Utils.delete(file)
-    }
-  }
-
-  class LogWritable(val dir: File, config: LogConfig, scheduler: Scheduler, 
val messages: MemoryRecords) extends Writable {
-    Utils.delete(dir)
-    val log: UnifiedLog = UnifiedLog(
-      dir = dir,
-      config = config,
-      logStartOffset = 0L,
-      recoveryPoint = 0L,
-      scheduler = scheduler,
-      brokerTopicStats = new BrokerTopicStats,
-      time = Time.SYSTEM,
-      maxTransactionTimeoutMs = 5 * 60 * 1000,
-      producerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false),
-      producerIdExpirationCheckIntervalMs = 
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
-      logDirFailureChannel = new LogDirFailureChannel(10),
-      topicId = None,
-      keepPartitionMetadataFile = true
-    )
-    def write(): Int = {
-      log.appendAsLeader(messages, leaderEpoch = 0)
-      messages.sizeInBytes
-    }
-    def close(): Unit = {
-      log.close()
-      Utils.delete(log.dir)
-    }
-  }
-
-}
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/TestLinearWriteSpeed.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/TestLinearWriteSpeed.java
new file mode 100644
index 00000000000..b8642fd48a5
--- /dev/null
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/TestLinearWriteSpeed.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.log;
+
+import kafka.log.UnifiedLog;
+
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.compress.GzipCompression;
+import org.apache.kafka.common.compress.Lz4Compression;
+import org.apache.kafka.common.compress.ZstdCompression;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.utils.CopyOnWriteMap;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.common.RequestLocal;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.KafkaScheduler;
+import org.apache.kafka.server.util.Scheduler;
+import org.apache.kafka.storage.internals.log.AppendOrigin;
+import org.apache.kafka.storage.internals.log.LogConfig;
+import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
+import org.apache.kafka.storage.internals.log.LogOffsetsListener;
+import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
+import org.apache.kafka.storage.internals.log.VerificationGuard;
+import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ThreadLocalRandom;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import scala.Option;
+
+public class TestLinearWriteSpeed {
+
+    public static void main(String[] args) throws Exception {
+        OptionParser parser = new OptionParser();
+
+        OptionSpec<String> dirOpt = parser.accepts("dir", "The directory to 
write to.")
+            .withRequiredArg()
+            .describedAs("path")
+            .ofType(String.class)
+            .defaultsTo(System.getProperty("java.io.tmpdir"));
+
+        OptionSpec<Long> bytesOpt = parser.accepts("bytes", "REQUIRED: The 
total number of bytes to write.")
+            .withRequiredArg()
+            .describedAs("num_bytes")
+            .ofType(Long.class);
+
+        OptionSpec<Integer> sizeOpt = parser.accepts("size", "REQUIRED: The 
size of each write.")
+            .withRequiredArg()
+            .describedAs("num_bytes")
+            .ofType(Integer.class);
+
+        OptionSpec<Integer> messageSizeOpt = parser.accepts("message-size", 
"REQUIRED: The size of each message in the message set.")
+            .withRequiredArg()
+            .describedAs("num_bytes")
+            .ofType(Integer.class)
+            .defaultsTo(1024);
+
+        OptionSpec<Integer> filesOpt = parser.accepts("files", "REQUIRED: The 
number of logs or files.")
+            .withRequiredArg()
+            .describedAs("num_files")
+            .ofType(Integer.class)
+            .defaultsTo(1);
+
+        OptionSpec<Long> reportingIntervalOpt = 
parser.accepts("reporting-interval", "The number of ms between updates.")
+            .withRequiredArg()
+            .describedAs("ms")
+            .ofType(Long.class)
+            .defaultsTo(1000L);
+
+        OptionSpec<Integer> maxThroughputOpt = 
parser.accepts("max-throughput-mb", "The maximum throughput.")
+            .withRequiredArg()
+            .describedAs("mb")
+            .ofType(Integer.class)
+            .defaultsTo(Integer.MAX_VALUE);
+
+        OptionSpec<Long> flushIntervalOpt = parser.accepts("flush-interval", 
"The number of messages between flushes")
+            .withRequiredArg()
+            .describedAs("message_count")
+            .ofType(Long.class)
+            .defaultsTo(Long.MAX_VALUE);
+
+        OptionSpec<String> compressionCodecOpt = parser.accepts("compression", 
"The compression codec to use")
+            .withRequiredArg()
+            .describedAs("codec")
+            .ofType(String.class)
+            .defaultsTo(CompressionType.NONE.name);
+
+        OptionSpec<Integer> compressionLevelOpt = parser.accepts("level", "The 
compression level to use")
+            .withRequiredArg()
+            .describedAs("level")
+            .ofType(Integer.class)
+            .defaultsTo(0);
+
+        OptionSpec<Void> mmapOpt = parser.accepts("mmap", "Do writes to 
memory-mapped files.");
+        OptionSpec<Void> channelOpt = parser.accepts("channel", "Do writes to 
file channels.");
+        OptionSpec<Void> logOpt = parser.accepts("log", "Do writes to kafka 
logs.");
+        OptionSet options = parser.parse(args);
+        CommandLineUtils.checkRequiredArgs(parser, options, bytesOpt, sizeOpt, 
filesOpt);
+
+        long bytesToWrite = options.valueOf(bytesOpt);
+        int bufferSize = options.valueOf(sizeOpt);
+        int numFiles = options.valueOf(filesOpt);
+        long reportingInterval = options.valueOf(reportingIntervalOpt);
+        String dir = options.valueOf(dirOpt);
+        long maxThroughputBytes = options.valueOf(maxThroughputOpt) * 1024L * 
1024L;
+        ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
+        int messageSize = options.valueOf(messageSizeOpt);
+        long flushInterval = options.valueOf(flushIntervalOpt);
+        CompressionType compressionType = 
CompressionType.forName(options.valueOf(compressionCodecOpt));
+        Compression.Builder<? extends Compression> compressionBuilder = 
Compression.of(compressionType);
+        int compressionLevel = options.valueOf(compressionLevelOpt);
+
+        setupCompression(compressionType, compressionBuilder, 
compressionLevel);
+
+        ThreadLocalRandom.current().nextBytes(buffer.array());
+        int numMessages = bufferSize / (messageSize + Records.LOG_OVERHEAD);
+        long createTime = System.currentTimeMillis();
+
+        List<SimpleRecord> recordsList = new ArrayList<>();
+        for (int i = 0; i < numMessages; i++) {
+            recordsList.add(new SimpleRecord(createTime, null, new 
byte[messageSize]));
+        }
+
+        MemoryRecords messageSet = MemoryRecords.withRecords(Compression.NONE, 
recordsList.toArray(new SimpleRecord[0]));
+        Writable[] writables = new Writable[numFiles];
+        KafkaScheduler scheduler = new KafkaScheduler(1);
+        scheduler.startup();
+
+        for (int i = 0; i < numFiles; i++) {
+            if (options.has(mmapOpt)) {
+                writables[i] = new MmapWritable(new File(dir, "kafka-test-" + 
i + ".dat"), bytesToWrite / numFiles, buffer);
+            } else if (options.has(channelOpt)) {
+                writables[i] = new ChannelWritable(new File(dir, "kafka-test-" 
+ i + ".dat"), buffer);
+            } else if (options.has(logOpt)) {
+                int segmentSize = ThreadLocalRandom.current().nextInt(512) * 
1024 * 1024 + 64 * 1024 * 1024;
+                Properties logProperties = new Properties();
+                logProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, 
Integer.toString(segmentSize));
+                logProperties.put(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG, 
Long.toString(flushInterval));
+                LogConfig logConfig = new LogConfig(logProperties);
+                writables[i] = new LogWritable(new File(dir, "kafka-test-" + 
i), logConfig, scheduler, messageSet);
+            } else {
+                System.err.println("Must specify what to write to with one of 
--log, --channel, or --mmap");
+                Exit.exit(1);
+            }
+        }
+        bytesToWrite = (bytesToWrite / numFiles) * numFiles;
+
+        System.out.printf("%10s\t%10s\t%10s%n", "mb_sec", "avg_latency", 
"max_latency");
+
+        long beginTest = System.nanoTime();
+        long maxLatency = 0L;
+        long totalLatency = 0L;
+        long count = 0L;
+        long written = 0L;
+        long totalWritten = 0L;
+        long lastReport = beginTest;
+
+        while (totalWritten + bufferSize < bytesToWrite) {
+            long start = System.nanoTime();
+            int writeSize = writables[(int) (count % numFiles)].write();
+            long elapsed = System.nanoTime() - start;
+            maxLatency = Math.max(elapsed, maxLatency);
+            totalLatency += elapsed;
+            written += writeSize;
+            count += 1;
+            totalWritten += writeSize;
+            if ((start - lastReport) / (1000.0 * 1000.0) > reportingInterval) {
+                double elapsedSecs = (start - lastReport) / (1000.0 * 1000.0 * 
1000.0);
+                double mb = written / (1024.0 * 1024.0);
+                System.out.printf("%10.3f\t%10.3f\t%10.3f%n", mb / 
elapsedSecs, (totalLatency / (double) count) / (1000.0 * 1000.0), maxLatency / 
(1000.0 * 1000.0));
+                lastReport = start;
+                written = 0;
+                maxLatency = 0L;
+                totalLatency = 0L;
+            } else if (written > maxThroughputBytes * (reportingInterval / 
1000.0)) {
+                long lastReportMs = lastReport / (1000 * 1000);
+                long now = System.nanoTime() / (1000 * 1000);
+                long sleepMs = lastReportMs + reportingInterval - now;
+                if (sleepMs > 0)
+                    Thread.sleep(sleepMs);
+            }
+        }
+        double elapsedSecs = (System.nanoTime() - beginTest) / (1000.0 * 
1000.0 * 1000.0);
+        System.out.println((bytesToWrite / (1024.0 * 1024.0 * elapsedSecs)) + 
" MB per sec");
+        scheduler.shutdown();
+    }
+
+    private static void setupCompression(CompressionType compressionType,
+                                         Compression.Builder<? extends 
Compression> compressionBuilder,
+                                         int compressionLevel) {
+        switch (compressionType) {
+            case GZIP:
+                ((GzipCompression.Builder) 
compressionBuilder).level(compressionLevel);
+                break;
+            case LZ4:
+                ((Lz4Compression.Builder) 
compressionBuilder).level(compressionLevel);
+                break;
+            case ZSTD:
+                ((ZstdCompression.Builder) 
compressionBuilder).level(compressionLevel);
+                break;
+            default:
+                break;
+        }
+    }
+
+    interface Writable {
+        int write() throws IOException;
+
+        void close() throws IOException;
+    }
+
+    static class MmapWritable implements Writable {
+        File file;
+        ByteBuffer content;
+        RandomAccessFile raf;
+        MappedByteBuffer buffer;
+
+        public MmapWritable(File file, long size, ByteBuffer content) throws 
IOException {
+            this.file = file;
+            this.content = content;
+            file.deleteOnExit();
+            raf = new RandomAccessFile(file, "rw");
+            raf.setLength(size);
+            buffer = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, 
raf.length());
+        }
+
+        public int write() {
+            buffer.put(content);
+            content.rewind();
+            return content.limit();
+        }
+
+        public void close() throws IOException {
+            raf.close();
+            Utils.delete(file);
+        }
+    }
+
+    static class ChannelWritable implements Writable {
+        File file;
+        ByteBuffer content;
+        FileChannel channel;
+
+        public ChannelWritable(File file, ByteBuffer content) throws 
IOException {
+            this.file = file;
+            this.content = content;
+            file.deleteOnExit();
+            channel = FileChannel.open(file.toPath(), 
StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
+        }
+
+        public int write() throws IOException {
+            channel.write(content);
+            content.rewind();
+            return content.limit();
+        }
+
+        public void close() throws IOException {
+            channel.close();
+            Utils.delete(file);
+        }
+    }
+
+    static class LogWritable implements Writable {
+        MemoryRecords messages;
+        UnifiedLog log;
+
+        public LogWritable(File dir, LogConfig config, Scheduler scheduler, 
MemoryRecords messages) throws IOException {
+            this.messages = messages;
+            Utils.delete(dir);
+            this.log = UnifiedLog.apply(
+                dir,
+                config,
+                0L,
+                0L,
+                scheduler,
+                new BrokerTopicStats(),
+                Time.SYSTEM,
+                5 * 60 * 1000,
+                new 
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false),
+                
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+                new LogDirFailureChannel(10),
+                true,
+                Option.empty(),
+                true,
+                new CopyOnWriteMap<>(),
+                false,
+                LogOffsetsListener.NO_OP_OFFSETS_LISTENER
+            );
+        }
+
+        public int write() {
+            log.appendAsLeader(
+                messages,
+                0,
+                AppendOrigin.CLIENT,
+                MetadataVersion.latestProduction(),
+                RequestLocal.noCaching(),
+                VerificationGuard.SENTINEL
+            );
+            return messages.sizeInBytes();
+        }
+
+        public void close() throws IOException {
+            log.close();
+            Utils.delete(log.dir());
+        }
+    }
+}

Reply via email to