This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ed367cf2242 KAFKA-20060 Drop core test output dependency in 
jmh-benchmarks (#21310)
ed367cf2242 is described below

commit ed367cf22427277e84cf54a806f411c0561b3f09
Author: jimmy <[email protected]>
AuthorDate: Thu Jan 22 21:56:56 2026 +0800

    KAFKA-20060 Drop core test output dependency in jmh-benchmarks (#21310)
    
    [KAFKA-20060](https://issues.apache.org/jira/browse/KAFKA-20060) This PR
    refactors the JMH benchmarks module to remove dependencies on test
    utilities from the `kafka-server` module and replaces them with
    appropriate alternatives from the test infrastructure or new utility
    classes.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 build.gradle                                       |  1 -
 checkstyle/import-control-jmh-benchmarks.xml       |  3 +
 .../jmh/fetcher/ReplicaFetcherThreadBenchmark.java | 12 ++--
 .../org/apache/kafka/jmh/log/StressTestLog.java    | 14 ++--
 .../apache/kafka/jmh/server/CheckpointBench.java   | 36 +++++++---
 .../kafka/jmh/server/PartitionCreationBench.java   | 12 ++--
 .../kafka/jmh/util/BenchmarkConfigUtils.java       | 76 ++++++++++++++++++++++
 7 files changed, 124 insertions(+), 30 deletions(-)

diff --git a/build.gradle b/build.gradle
index 43d421dbd7c..0306e203d94 100644
--- a/build.gradle
+++ b/build.gradle
@@ -3434,7 +3434,6 @@ project(':jmh-benchmarks') {
     implementation project(':connect:transforms')
     implementation project(':connect:json')
     implementation project(':clients').sourceSets.test.output
-    implementation project(':core').sourceSets.test.output
     implementation project(':server-common').sourceSets.test.output
     implementation project(':metadata').sourceSets.test.output
 
diff --git a/checkstyle/import-control-jmh-benchmarks.xml 
b/checkstyle/import-control-jmh-benchmarks.xml
index 03943f45512..7cafa743e15 100644
--- a/checkstyle/import-control-jmh-benchmarks.xml
+++ b/checkstyle/import-control-jmh-benchmarks.xml
@@ -54,6 +54,9 @@
     <allow pkg="org.apache.kafka.connect" />
     <allow pkg="org.apache.kafka.network" />
     <allow class="org.apache.kafka.raft.QuorumConfig"/>
+    <allow class="org.apache.kafka.raft.KRaftConfigs"/>
+    <allow class="org.apache.kafka.test.TestUtils"/>
+    <allow class="org.apache.kafka.jmh.util.BenchmarkConfigUtils"/>
     <allow pkg="joptsimple"/>
 
     <subpackage name="cache">
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index ce5c813ab1f..febac7d6a58 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -19,6 +19,7 @@ package org.apache.kafka.jmh.fetcher;
 
 import kafka.cluster.Partition;
 import kafka.log.LogManager;
+import kafka.server.AlterPartitionManager;
 import kafka.server.BrokerBlockingSender;
 import kafka.server.FailedPartitions;
 import kafka.server.InitialFetchState;
@@ -31,7 +32,6 @@ import kafka.server.ReplicaManager;
 import kafka.server.ReplicaQuota;
 import kafka.server.builders.LogManagerBuilder;
 import kafka.server.builders.ReplicaManagerBuilder;
-import kafka.utils.TestUtils;
 
 import org.apache.kafka.clients.FetchSessionHandler;
 import org.apache.kafka.common.DirectoryId;
@@ -51,6 +51,7 @@ import org.apache.kafka.common.requests.FetchResponse;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.jmh.util.BenchmarkConfigUtils;
 import org.apache.kafka.metadata.KRaftMetadataCache;
 import org.apache.kafka.metadata.LeaderRecoveryState;
 import org.apache.kafka.metadata.MockConfigRepository;
@@ -120,10 +121,8 @@ public class ReplicaFetcherThreadBenchmark {
     @Setup(Level.Trial)
     public void setup() throws IOException {
         scheduler.startup();
-        KafkaConfig config =  
KafkaConfig.fromProps(TestUtils.createBrokerConfig(
-            0, true, true, 9092, Option.empty(), Option.empty(),
-            Option.empty(), true, false, 0, false, 0, false, 0, 
Option.empty(), 1, true, 1,
-            (short) 1, false));
+        Properties configs = BenchmarkConfigUtils.createDummyBrokerConfig();
+        KafkaConfig config =  KafkaConfig.fromProps(configs);
         LogConfig logConfig = createLogConfig();
 
         BrokerTopicStats brokerTopicStats = new BrokerTopicStats(false);
@@ -147,6 +146,7 @@ public class ReplicaFetcherThreadBenchmark {
             setTime(Time.SYSTEM).
             build();
 
+        AlterPartitionManager alterPartitionManager = 
Mockito.mock(AlterPartitionManager.class);
         replicaManager = new ReplicaManagerBuilder().
             setConfig(config).
             setMetrics(metrics).
@@ -157,7 +157,7 @@ public class ReplicaFetcherThreadBenchmark {
             setBrokerTopicStats(brokerTopicStats).
             setMetadataCache(new KRaftMetadataCache(config.nodeId(), () -> 
KRAFT_VERSION_1)).
             setLogDirFailureChannel(new LogDirFailureChannel(logDirs.size())).
-            setAlterPartitionManager(TestUtils.createAlterIsrManager()).
+            setAlterPartitionManager(alterPartitionManager).
             build();
 
         LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> 
initialFetched = new LinkedHashMap<>();
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/StressTestLog.java 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/StressTestLog.java
index 7815d49d27e..aa37b15bca0 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/StressTestLog.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/StressTestLog.java
@@ -16,8 +16,6 @@
  */
 package org.apache.kafka.jmh.log;
 
-import kafka.utils.TestUtils;
-
 import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
 import org.apache.kafka.common.compress.Compression;
 import org.apache.kafka.common.config.TopicConfig;
@@ -26,6 +24,7 @@ import org.apache.kafka.common.record.FileRecords;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.server.common.RequestLocal;
@@ -41,6 +40,7 @@ import 
org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
 import org.apache.kafka.storage.internals.log.UnifiedLog;
 import org.apache.kafka.storage.internals.log.VerificationGuard;
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
+import org.apache.kafka.test.TestUtils;
 
 import java.io.File;
 import java.nio.charset.StandardCharsets;
@@ -57,7 +57,8 @@ public class StressTestLog {
     private static final AtomicBoolean RUNNING = new AtomicBoolean(true);
 
     public static void main(String[] args) throws Exception {
-        File dir = TestUtils.randomPartitionLogDir(TestUtils.tempDir());
+        File tmp = TestUtils.tempDirectory();
+        File dir = TestUtils.randomPartitionLogDir(tmp);
         MockTime time = new MockTime();
         Properties logProperties = new Properties();
         logProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, 64 * 1024 * 1024);
@@ -157,11 +158,10 @@ public class StressTestLog {
         @Override
         protected void work() throws Exception {
             byte[] value = 
Long.toString(currentOffset).getBytes(StandardCharsets.UTF_8);
-            MemoryRecords records = TestUtils.singletonRecords(value,
-                    null,
+            MemoryRecords records = MemoryRecords.withRecords(
+                    RecordBatch.CURRENT_MAGIC_VALUE,
                     Compression.NONE,
-                    RecordBatch.NO_TIMESTAMP,
-                    RecordBatch.CURRENT_MAGIC_VALUE);
+                    new SimpleRecord(RecordBatch.NO_TIMESTAMP, null, value));
             LogAppendInfo logAppendInfo = log.appendAsLeader(records,
                     0,
                     AppendOrigin.CLIENT,
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
index 12ca5e4a0c4..10325e8ba3f 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
@@ -22,17 +22,18 @@ import kafka.server.AlterPartitionManager;
 import kafka.server.KafkaConfig;
 import kafka.server.QuotaFactory;
 import kafka.server.ReplicaManager;
+import kafka.server.builders.LogManagerBuilder;
 import kafka.server.builders.ReplicaManagerBuilder;
-import kafka.utils.TestUtils;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.jmh.util.BenchmarkConfigUtils;
 import org.apache.kafka.metadata.KRaftMetadataCache;
 import org.apache.kafka.metadata.MetadataCache;
 import org.apache.kafka.metadata.MockConfigRepository;
-import org.apache.kafka.server.config.ServerLogConfigs;
 import org.apache.kafka.server.util.KafkaScheduler;
 import org.apache.kafka.server.util.MockTime;
 import org.apache.kafka.server.util.Scheduler;
@@ -42,6 +43,7 @@ import org.apache.kafka.storage.internals.log.LogConfig;
 import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
 
+import org.mockito.Mockito;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.Fork;
 import org.openjdk.jmh.annotations.Level;
@@ -100,17 +102,31 @@ public class CheckpointBench {
     @Setup(Level.Trial)
     public void setup() {
         this.scheduler = new KafkaScheduler(1, true, "scheduler-thread");
-        this.brokerProperties = 
KafkaConfig.fromProps(TestUtils.createBrokerConfig(
-                0, true, true, 9092, Option.empty(), Option.empty(),
-                Option.empty(), true, false, 0, false, 0, false, 0, 
Option.empty(), 1, true, 1,
-                (short) 1, false));
+        Properties configs = BenchmarkConfigUtils.createDummyBrokerConfig();
+        this.brokerProperties = KafkaConfig.fromProps(configs);
         this.metrics = new Metrics();
         this.time = new MockTime();
         this.failureChannel = new 
LogDirFailureChannel(brokerProperties.logDirs().size());
         final List<File> files = 
brokerProperties.logDirs().stream().map(File::new).toList();
-        this.logManager = 
TestUtils.createLogManager(CollectionConverters.asScala(files),
-                new LogConfig(new Properties()), new MockConfigRepository(), 
new CleanerConfig(1, 4 * 1024 * 1024L, 0.9d,
-                        1024 * 1024, 32 * 1024 * 1024, Double.MAX_VALUE, 15 * 
1000, true), time, 4, false, Option.empty(), false, 
ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT);
+        this.logManager = new LogManagerBuilder().
+            setLogDirs(files).
+            setInitialOfflineDirs(List.of()).
+            setConfigRepository(new MockConfigRepository()).
+            setInitialDefaultConfig(new LogConfig(new Properties())).
+            setCleanerConfig(new CleanerConfig(1, 4 * 1024 * 1024L, 0.9d,
+                1024 * 1024, 32 * 1024 * 1024, Double.MAX_VALUE, 15 * 1000, 
true)).
+            setRecoveryThreadsPerDataDir(1).
+            setFlushCheckMs(1000L).
+            setFlushRecoveryOffsetCheckpointMs(10000L).
+            setFlushStartOffsetCheckpointMs(10000L).
+            setRetentionCheckMs(1000L).
+            setProducerStateManagerConfig(60000, false).
+            setScheduler(scheduler).
+            setBrokerTopicStats(new BrokerTopicStats(false)).
+            setLogDirFailureChannel(failureChannel).
+            setTime(Time.SYSTEM).
+            build();
+
         scheduler.startup();
         final BrokerTopicStats brokerTopicStats = new BrokerTopicStats(false);
         final MetadataCache metadataCache =
@@ -120,7 +136,7 @@ public class CheckpointBench {
                         this.metrics,
                         this.time, "", "");
 
-        this.alterPartitionManager = TestUtils.createAlterIsrManager();
+        this.alterPartitionManager = Mockito.mock(AlterPartitionManager.class);
         this.replicaManager = new ReplicaManagerBuilder().
             setConfig(brokerProperties).
             setMetrics(metrics).
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
index 0128f62f4f0..d071fa3a08e 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
@@ -24,7 +24,6 @@ import kafka.server.QuotaFactory;
 import kafka.server.ReplicaManager;
 import kafka.server.builders.LogManagerBuilder;
 import kafka.server.builders.ReplicaManagerBuilder;
-import kafka.utils.TestUtils;
 
 import org.apache.kafka.common.DirectoryId;
 import org.apache.kafka.common.TopicPartition;
@@ -32,6 +31,7 @@ import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.jmh.util.BenchmarkConfigUtils;
 import org.apache.kafka.metadata.ConfigRepository;
 import org.apache.kafka.metadata.KRaftMetadataCache;
 import org.apache.kafka.metadata.LeaderRecoveryState;
@@ -45,6 +45,7 @@ import org.apache.kafka.storage.internals.log.LogConfig;
 import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
 
+import org.mockito.Mockito;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Fork;
@@ -107,10 +108,8 @@ public class PartitionCreationBench {
             topicId = Option.empty();
 
         this.scheduler = new KafkaScheduler(1, true, "scheduler-thread");
-        this.brokerProperties = 
KafkaConfig.fromProps(TestUtils.createBrokerConfig(
-                0, true, true, 9092, Option.empty(), Option.empty(),
-                Option.empty(), true, false, 0, false, 0, false, 0, 
Option.empty(), 1, true, 1,
-                (short) 1, false));
+        Properties configs = BenchmarkConfigUtils.createDummyBrokerConfig();
+        this.brokerProperties = KafkaConfig.fromProps(configs);
         this.metrics = new Metrics();
         this.time = Time.SYSTEM;
         this.failureChannel = new 
LogDirFailureChannel(brokerProperties.logDirs().size());
@@ -141,7 +140,8 @@ public class PartitionCreationBench {
             build();
         scheduler.startup();
         this.quotaManagers = QuotaFactory.instantiate(this.brokerProperties, 
this.metrics, this.time, "", "");
-        this.alterPartitionManager = TestUtils.createAlterIsrManager();
+        this.alterPartitionManager = Mockito.mock(AlterPartitionManager.class);
+
         this.replicaManager = new ReplicaManagerBuilder().
             setConfig(brokerProperties).
             setMetrics(metrics).
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/BenchmarkConfigUtils.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/BenchmarkConfigUtils.java
new file mode 100644
index 00000000000..2e058ba707e
--- /dev/null
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/BenchmarkConfigUtils.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.util;
+
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.network.SocketServerConfigs;
+import org.apache.kafka.raft.KRaftConfigs;
+import org.apache.kafka.raft.QuorumConfig;
+import org.apache.kafka.server.config.ReplicationConfigs;
+import org.apache.kafka.server.config.ServerConfigs;
+import org.apache.kafka.server.config.ServerLogConfigs;
+import org.apache.kafka.storage.internals.log.CleanerConfig;
+import org.apache.kafka.test.TestUtils;
+
+import java.io.File;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+public class BenchmarkConfigUtils {
+
+    public static Properties createDummyBrokerConfig() {
+        Properties props = new Properties();
+
+        props.put(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG, 
"true");
+        props.put(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true");
+        props.setProperty(KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_CONFIG, 
String.valueOf(TimeUnit.MINUTES.toMillis(10)));
+        props.put(KRaftConfigs.NODE_ID_CONFIG, "0");
+        props.put(ServerConfigs.BROKER_ID_CONFIG, "0");
+
+        props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, 
"PLAINTEXT://localhost:9092");
+        props.put(SocketServerConfigs.LISTENERS_CONFIG, 
"PLAINTEXT://localhost:9092");
+        props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER");
+        props.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, 
"PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT");
+
+        File dir = TestUtils.tempDirectory();
+        props.put(ServerLogConfigs.LOG_DIR_CONFIG, dir.getAbsolutePath());
+
+        props.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker");
+        props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, "0@localhost:0");
+
+        props.put(ReplicationConfigs.REPLICA_SOCKET_TIMEOUT_MS_CONFIG, "1500");
+        props.put(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG, 
"1500");
+
+        props.put(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, "true");
+        props.put(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, "true");
+
+        props.put(ServerLogConfigs.LOG_DELETE_DELAY_MS_CONFIG, "1000");
+        props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 
"2097152");
+        props.put(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG, "100");
+        props.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, "1");
+        props.put(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, "1");
+
+        
props.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
+        props.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "5");
+        
props.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "0");
+
+        props.put(SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG, "2");
+        props.put(ServerConfigs.BACKGROUND_THREADS_CONFIG, "2");
+
+        return props;
+    }
+}

Reply via email to