This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ed367cf2242 KAFKA-20060 Drop core test output dependency in
jmh-benchmarks (#21310)
ed367cf2242 is described below
commit ed367cf22427277e84cf54a806f411c0561b3f09
Author: jimmy <[email protected]>
AuthorDate: Thu Jan 22 21:56:56 2026 +0800
KAFKA-20060 Drop core test output dependency in jmh-benchmarks (#21310)
[KAFKA-20060](https://issues.apache.org/jira/browse/KAFKA-20060) This PR
refactors the JMH benchmarks module to remove dependencies on test
utilities from the `kafka-server` module and replaces them with
appropriate alternatives from the test infrastructure or new utility
classes.
Reviewers: Chia-Ping Tsai <[email protected]>
---
build.gradle | 1 -
checkstyle/import-control-jmh-benchmarks.xml | 3 +
.../jmh/fetcher/ReplicaFetcherThreadBenchmark.java | 12 ++--
.../org/apache/kafka/jmh/log/StressTestLog.java | 14 ++--
.../apache/kafka/jmh/server/CheckpointBench.java | 36 +++++++---
.../kafka/jmh/server/PartitionCreationBench.java | 12 ++--
.../kafka/jmh/util/BenchmarkConfigUtils.java | 76 ++++++++++++++++++++++
7 files changed, 124 insertions(+), 30 deletions(-)
diff --git a/build.gradle b/build.gradle
index 43d421dbd7c..0306e203d94 100644
--- a/build.gradle
+++ b/build.gradle
@@ -3434,7 +3434,6 @@ project(':jmh-benchmarks') {
implementation project(':connect:transforms')
implementation project(':connect:json')
implementation project(':clients').sourceSets.test.output
- implementation project(':core').sourceSets.test.output
implementation project(':server-common').sourceSets.test.output
implementation project(':metadata').sourceSets.test.output
diff --git a/checkstyle/import-control-jmh-benchmarks.xml
b/checkstyle/import-control-jmh-benchmarks.xml
index 03943f45512..7cafa743e15 100644
--- a/checkstyle/import-control-jmh-benchmarks.xml
+++ b/checkstyle/import-control-jmh-benchmarks.xml
@@ -54,6 +54,9 @@
<allow pkg="org.apache.kafka.connect" />
<allow pkg="org.apache.kafka.network" />
<allow class="org.apache.kafka.raft.QuorumConfig"/>
+ <allow class="org.apache.kafka.raft.KRaftConfigs"/>
+ <allow class="org.apache.kafka.test.TestUtils"/>
+ <allow class="org.apache.kafka.jmh.util.BenchmarkConfigUtils"/>
<allow pkg="joptsimple"/>
<subpackage name="cache">
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index ce5c813ab1f..febac7d6a58 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -19,6 +19,7 @@ package org.apache.kafka.jmh.fetcher;
import kafka.cluster.Partition;
import kafka.log.LogManager;
+import kafka.server.AlterPartitionManager;
import kafka.server.BrokerBlockingSender;
import kafka.server.FailedPartitions;
import kafka.server.InitialFetchState;
@@ -31,7 +32,6 @@ import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import kafka.server.builders.LogManagerBuilder;
import kafka.server.builders.ReplicaManagerBuilder;
-import kafka.utils.TestUtils;
import org.apache.kafka.clients.FetchSessionHandler;
import org.apache.kafka.common.DirectoryId;
@@ -51,6 +51,7 @@ import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.jmh.util.BenchmarkConfigUtils;
import org.apache.kafka.metadata.KRaftMetadataCache;
import org.apache.kafka.metadata.LeaderRecoveryState;
import org.apache.kafka.metadata.MockConfigRepository;
@@ -120,10 +121,8 @@ public class ReplicaFetcherThreadBenchmark {
@Setup(Level.Trial)
public void setup() throws IOException {
scheduler.startup();
- KafkaConfig config =
KafkaConfig.fromProps(TestUtils.createBrokerConfig(
- 0, true, true, 9092, Option.empty(), Option.empty(),
- Option.empty(), true, false, 0, false, 0, false, 0,
Option.empty(), 1, true, 1,
- (short) 1, false));
+ Properties configs = BenchmarkConfigUtils.createDummyBrokerConfig();
+ KafkaConfig config = KafkaConfig.fromProps(configs);
LogConfig logConfig = createLogConfig();
BrokerTopicStats brokerTopicStats = new BrokerTopicStats(false);
@@ -147,6 +146,7 @@ public class ReplicaFetcherThreadBenchmark {
setTime(Time.SYSTEM).
build();
+ AlterPartitionManager alterPartitionManager =
Mockito.mock(AlterPartitionManager.class);
replicaManager = new ReplicaManagerBuilder().
setConfig(config).
setMetrics(metrics).
@@ -157,7 +157,7 @@ public class ReplicaFetcherThreadBenchmark {
setBrokerTopicStats(brokerTopicStats).
setMetadataCache(new KRaftMetadataCache(config.nodeId(), () ->
KRAFT_VERSION_1)).
setLogDirFailureChannel(new LogDirFailureChannel(logDirs.size())).
- setAlterPartitionManager(TestUtils.createAlterIsrManager()).
+ setAlterPartitionManager(alterPartitionManager).
build();
LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData>
initialFetched = new LinkedHashMap<>();
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/StressTestLog.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/StressTestLog.java
index 7815d49d27e..aa37b15bca0 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/StressTestLog.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/StressTestLog.java
@@ -16,8 +16,6 @@
*/
package org.apache.kafka.jmh.log;
-import kafka.utils.TestUtils;
-
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.config.TopicConfig;
@@ -26,6 +24,7 @@ import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.RequestLocal;
@@ -41,6 +40,7 @@ import
org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
import org.apache.kafka.storage.internals.log.UnifiedLog;
import org.apache.kafka.storage.internals.log.VerificationGuard;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
+import org.apache.kafka.test.TestUtils;
import java.io.File;
import java.nio.charset.StandardCharsets;
@@ -57,7 +57,8 @@ public class StressTestLog {
private static final AtomicBoolean RUNNING = new AtomicBoolean(true);
public static void main(String[] args) throws Exception {
- File dir = TestUtils.randomPartitionLogDir(TestUtils.tempDir());
+ File tmp = TestUtils.tempDirectory();
+ File dir = TestUtils.randomPartitionLogDir(tmp);
MockTime time = new MockTime();
Properties logProperties = new Properties();
logProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, 64 * 1024 * 1024);
@@ -157,11 +158,10 @@ public class StressTestLog {
@Override
protected void work() throws Exception {
byte[] value =
Long.toString(currentOffset).getBytes(StandardCharsets.UTF_8);
- MemoryRecords records = TestUtils.singletonRecords(value,
- null,
+ MemoryRecords records = MemoryRecords.withRecords(
+ RecordBatch.CURRENT_MAGIC_VALUE,
Compression.NONE,
- RecordBatch.NO_TIMESTAMP,
- RecordBatch.CURRENT_MAGIC_VALUE);
+ new SimpleRecord(RecordBatch.NO_TIMESTAMP, null, value));
LogAppendInfo logAppendInfo = log.appendAsLeader(records,
0,
AppendOrigin.CLIENT,
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
index 12ca5e4a0c4..10325e8ba3f 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
@@ -22,17 +22,18 @@ import kafka.server.AlterPartitionManager;
import kafka.server.KafkaConfig;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
+import kafka.server.builders.LogManagerBuilder;
import kafka.server.builders.ReplicaManagerBuilder;
-import kafka.utils.TestUtils;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.jmh.util.BenchmarkConfigUtils;
import org.apache.kafka.metadata.KRaftMetadataCache;
import org.apache.kafka.metadata.MetadataCache;
import org.apache.kafka.metadata.MockConfigRepository;
-import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.server.util.KafkaScheduler;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.server.util.Scheduler;
@@ -42,6 +43,7 @@ import org.apache.kafka.storage.internals.log.LogConfig;
import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
+import org.mockito.Mockito;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
@@ -100,17 +102,31 @@ public class CheckpointBench {
@Setup(Level.Trial)
public void setup() {
this.scheduler = new KafkaScheduler(1, true, "scheduler-thread");
- this.brokerProperties =
KafkaConfig.fromProps(TestUtils.createBrokerConfig(
- 0, true, true, 9092, Option.empty(), Option.empty(),
- Option.empty(), true, false, 0, false, 0, false, 0,
Option.empty(), 1, true, 1,
- (short) 1, false));
+ Properties configs = BenchmarkConfigUtils.createDummyBrokerConfig();
+ this.brokerProperties = KafkaConfig.fromProps(configs);
this.metrics = new Metrics();
this.time = new MockTime();
this.failureChannel = new
LogDirFailureChannel(brokerProperties.logDirs().size());
final List<File> files =
brokerProperties.logDirs().stream().map(File::new).toList();
- this.logManager =
TestUtils.createLogManager(CollectionConverters.asScala(files),
- new LogConfig(new Properties()), new MockConfigRepository(),
new CleanerConfig(1, 4 * 1024 * 1024L, 0.9d,
- 1024 * 1024, 32 * 1024 * 1024, Double.MAX_VALUE, 15 *
1000, true), time, 4, false, Option.empty(), false,
ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT);
+ this.logManager = new LogManagerBuilder().
+ setLogDirs(files).
+ setInitialOfflineDirs(List.of()).
+ setConfigRepository(new MockConfigRepository()).
+ setInitialDefaultConfig(new LogConfig(new Properties())).
+ setCleanerConfig(new CleanerConfig(1, 4 * 1024 * 1024L, 0.9d,
+ 1024 * 1024, 32 * 1024 * 1024, Double.MAX_VALUE, 15 * 1000,
true)).
+ setRecoveryThreadsPerDataDir(1).
+ setFlushCheckMs(1000L).
+ setFlushRecoveryOffsetCheckpointMs(10000L).
+ setFlushStartOffsetCheckpointMs(10000L).
+ setRetentionCheckMs(1000L).
+ setProducerStateManagerConfig(60000, false).
+ setScheduler(scheduler).
+ setBrokerTopicStats(new BrokerTopicStats(false)).
+ setLogDirFailureChannel(failureChannel).
+ setTime(Time.SYSTEM).
+ build();
+
scheduler.startup();
final BrokerTopicStats brokerTopicStats = new BrokerTopicStats(false);
final MetadataCache metadataCache =
@@ -120,7 +136,7 @@ public class CheckpointBench {
this.metrics,
this.time, "", "");
- this.alterPartitionManager = TestUtils.createAlterIsrManager();
+ this.alterPartitionManager = Mockito.mock(AlterPartitionManager.class);
this.replicaManager = new ReplicaManagerBuilder().
setConfig(brokerProperties).
setMetrics(metrics).
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
index 0128f62f4f0..d071fa3a08e 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
@@ -24,7 +24,6 @@ import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
import kafka.server.builders.LogManagerBuilder;
import kafka.server.builders.ReplicaManagerBuilder;
-import kafka.utils.TestUtils;
import org.apache.kafka.common.DirectoryId;
import org.apache.kafka.common.TopicPartition;
@@ -32,6 +31,7 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.jmh.util.BenchmarkConfigUtils;
import org.apache.kafka.metadata.ConfigRepository;
import org.apache.kafka.metadata.KRaftMetadataCache;
import org.apache.kafka.metadata.LeaderRecoveryState;
@@ -45,6 +45,7 @@ import org.apache.kafka.storage.internals.log.LogConfig;
import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
+import org.mockito.Mockito;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
@@ -107,10 +108,8 @@ public class PartitionCreationBench {
topicId = Option.empty();
this.scheduler = new KafkaScheduler(1, true, "scheduler-thread");
- this.brokerProperties =
KafkaConfig.fromProps(TestUtils.createBrokerConfig(
- 0, true, true, 9092, Option.empty(), Option.empty(),
- Option.empty(), true, false, 0, false, 0, false, 0,
Option.empty(), 1, true, 1,
- (short) 1, false));
+ Properties configs = BenchmarkConfigUtils.createDummyBrokerConfig();
+ this.brokerProperties = KafkaConfig.fromProps(configs);
this.metrics = new Metrics();
this.time = Time.SYSTEM;
this.failureChannel = new
LogDirFailureChannel(brokerProperties.logDirs().size());
@@ -141,7 +140,8 @@ public class PartitionCreationBench {
build();
scheduler.startup();
this.quotaManagers = QuotaFactory.instantiate(this.brokerProperties,
this.metrics, this.time, "", "");
- this.alterPartitionManager = TestUtils.createAlterIsrManager();
+ this.alterPartitionManager = Mockito.mock(AlterPartitionManager.class);
+
this.replicaManager = new ReplicaManagerBuilder().
setConfig(brokerProperties).
setMetrics(metrics).
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/BenchmarkConfigUtils.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/BenchmarkConfigUtils.java
new file mode 100644
index 00000000000..2e058ba707e
--- /dev/null
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/BenchmarkConfigUtils.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.util;
+
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.network.SocketServerConfigs;
+import org.apache.kafka.raft.KRaftConfigs;
+import org.apache.kafka.raft.QuorumConfig;
+import org.apache.kafka.server.config.ReplicationConfigs;
+import org.apache.kafka.server.config.ServerConfigs;
+import org.apache.kafka.server.config.ServerLogConfigs;
+import org.apache.kafka.storage.internals.log.CleanerConfig;
+import org.apache.kafka.test.TestUtils;
+
+import java.io.File;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+public class BenchmarkConfigUtils {
+
+ public static Properties createDummyBrokerConfig() {
+ Properties props = new Properties();
+
+ props.put(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG,
"true");
+ props.put(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true");
+ props.setProperty(KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_CONFIG,
String.valueOf(TimeUnit.MINUTES.toMillis(10)));
+ props.put(KRaftConfigs.NODE_ID_CONFIG, "0");
+ props.put(ServerConfigs.BROKER_ID_CONFIG, "0");
+
+ props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG,
"PLAINTEXT://localhost:9092");
+ props.put(SocketServerConfigs.LISTENERS_CONFIG,
"PLAINTEXT://localhost:9092");
+ props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER");
+ props.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
"PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT");
+
+ File dir = TestUtils.tempDirectory();
+ props.put(ServerLogConfigs.LOG_DIR_CONFIG, dir.getAbsolutePath());
+
+ props.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker");
+ props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, "0@localhost:0");
+
+ props.put(ReplicationConfigs.REPLICA_SOCKET_TIMEOUT_MS_CONFIG, "1500");
+ props.put(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG,
"1500");
+
+ props.put(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, "true");
+ props.put(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, "true");
+
+ props.put(ServerLogConfigs.LOG_DELETE_DELAY_MS_CONFIG, "1000");
+ props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP,
"2097152");
+ props.put(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG, "100");
+ props.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, "1");
+ props.put(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, "1");
+
+
props.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
+ props.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "5");
+
props.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "0");
+
+ props.put(SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG, "2");
+ props.put(ServerConfigs.BACKGROUND_THREADS_CONFIG, "2");
+
+ return props;
+ }
+}