This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 95ba356dd75 KAFKA-17730 ReplicaFetcherThreadBenchmark is broken
(#18382)
95ba356dd75 is described below
commit 95ba356dd759ae6d771e59b701cb255e3dd06ba2
Author: PoAn Yang <[email protected]>
AuthorDate: Thu Jan 9 23:47:27 2025 +0800
KAFKA-17730 ReplicaFetcherThreadBenchmark is broken (#18382)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../jmh/fetcher/ReplicaFetcherThreadBenchmark.java | 105 ++++++---------------
1 file changed, 30 insertions(+), 75 deletions(-)
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index b624fc07358..ae0a582d0a0 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -17,11 +17,8 @@
package org.apache.kafka.jmh.fetcher;
-import kafka.cluster.AlterPartitionListener;
-import kafka.cluster.DelayedOperations;
import kafka.cluster.Partition;
import kafka.log.LogManager;
-import kafka.server.AlterPartitionManager;
import kafka.server.BrokerBlockingSender;
import kafka.server.FailedPartitions;
import kafka.server.InitialFetchState;
@@ -36,10 +33,8 @@ import kafka.server.ReplicaQuota;
import kafka.server.builders.LogManagerBuilder;
import kafka.server.builders.ReplicaManagerBuilder;
import kafka.server.metadata.MockConfigRepository;
-import kafka.server.metadata.ZkMetadataCache;
import kafka.utils.Pool;
import kafka.utils.TestUtils;
-import kafka.zk.KafkaZkClient;
import org.apache.kafka.clients.FetchSessionHandler;
import org.apache.kafka.common.TopicIdPartition;
@@ -49,7 +44,6 @@ import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.LeaderAndIsrRequestData;
import
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
import
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
-import org.apache.kafka.common.message.UpdateMetadataRequestData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
@@ -57,11 +51,9 @@ import org.apache.kafka.common.record.BaseRecords;
import org.apache.kafka.common.record.RecordsSend;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
-import org.apache.kafka.common.requests.UpdateMetadataRequest;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.server.BrokerFeatures;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.network.BrokerEndPoint;
@@ -91,20 +83,21 @@ import org.openjdk.jmh.annotations.Warmup;
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
-import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import scala.Option;
import scala.collection.Iterator;
import scala.collection.Map;
+import scala.jdk.javaapi.CollectionConverters;
+
+import static org.apache.kafka.server.common.KRaftVersion.KRAFT_VERSION_1;
@State(Scope.Benchmark)
@Fork(value = 1)
@@ -113,7 +106,6 @@ import scala.collection.Map;
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public class ReplicaFetcherThreadBenchmark {
- private final File logDir = new File(System.getProperty("java.io.tmpdir"),
UUID.randomUUID().toString());
private final KafkaScheduler scheduler = new KafkaScheduler(1, true,
"scheduler");
private final Pool<TopicPartition, Partition> pool = new
Pool<>(Option.empty());
private final Metrics metrics = new Metrics();
@@ -127,18 +119,16 @@ public class ReplicaFetcherThreadBenchmark {
@Setup(Level.Trial)
public void setup() throws IOException {
- if (!logDir.mkdir())
- throw new IOException("error creating test directory");
-
scheduler.startup();
- Properties props = new Properties();
- props.put("zookeeper.connect", "127.0.0.1:9999");
- KafkaConfig config = new KafkaConfig(props);
+ KafkaConfig config =
KafkaConfig.fromProps(TestUtils.createBrokerConfig(
+ 0, true, true, 9092, Option.empty(), Option.empty(),
+ Option.empty(), true, false, 0, false, 0, false, 0,
Option.empty(), 1, true, 1,
+ (short) 1, false));
LogConfig logConfig = createLogConfig();
BrokerTopicStats brokerTopicStats = new BrokerTopicStats(false);
- LogDirFailureChannel logDirFailureChannel =
Mockito.mock(LogDirFailureChannel.class);
- List<File> logDirs = Collections.singletonList(logDir);
+ LogDirFailureChannel logDirFailureChannel = new
LogDirFailureChannel(config.logDirs().size());
+ List<File> logDirs =
CollectionConverters.asJava(config.logDirs()).stream().map(File::new).collect(Collectors.toList());
logManager = new LogManagerBuilder().
setLogDirs(logDirs).
setInitialOfflineDirs(Collections.emptyList()).
@@ -159,10 +149,21 @@ public class ReplicaFetcherThreadBenchmark {
setKeepPartitionMetadataFile(true).
build();
+ replicaManager = new ReplicaManagerBuilder().
+ setConfig(config).
+ setMetrics(metrics).
+ setTime(new MockTime()).
+ setScheduler(scheduler).
+ setLogManager(logManager).
+ setQuotaManagers(Mockito.mock(QuotaFactory.QuotaManagers.class)).
+ setBrokerTopicStats(brokerTopicStats).
+ setMetadataCache(MetadataCache.kRaftMetadataCache(config.nodeId(),
() -> KRAFT_VERSION_1)).
+ setLogDirFailureChannel(new LogDirFailureChannel(logDirs.size())).
+ setAlterPartitionManager(TestUtils.createAlterIsrManager()).
+ build();
+
LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData>
initialFetched = new LinkedHashMap<>();
- HashMap<String, Uuid> topicIds = new HashMap<>();
scala.collection.mutable.Map<TopicPartition, InitialFetchState>
initialFetchStates = new scala.collection.mutable.HashMap<>();
- List<UpdateMetadataRequestData.UpdateMetadataPartitionState>
updatePartitionState = new ArrayList<>();
for (int i = 0; i < partitionCount; i++) {
TopicPartition tp = new TopicPartition("topic", i);
@@ -176,15 +177,10 @@ public class ReplicaFetcherThreadBenchmark {
.setReplicas(replicas)
.setIsNew(true);
- AlterPartitionListener alterPartitionListener =
Mockito.mock(AlterPartitionListener.class);
- OffsetCheckpoints offsetCheckpoints =
Mockito.mock(OffsetCheckpoints.class);
- Mockito.when(offsetCheckpoints.fetch(logDir.getAbsolutePath(),
tp)).thenReturn(Optional.of(0L));
- AlterPartitionManager isrChannelManager =
Mockito.mock(AlterPartitionManager.class);
- Partition partition = new Partition(tp, 100,
MetadataVersion.latestTesting(),
- 0, () -> -1, Time.SYSTEM, alterPartitionListener, new
DelayedOperationsMock(topicId, tp),
- Mockito.mock(MetadataCache.class), logManager,
isrChannelManager, topicId);
+ OffsetCheckpoints checkpoints = (logDir, topicPartition) ->
Optional.of(0L);
+ Partition partition = replicaManager.createPartition(tp);
- partition.makeFollower(partitionState, offsetCheckpoints, topicId,
Option.empty());
+ partition.makeFollower(partitionState, checkpoints, topicId,
Option.empty());
pool.put(tp, partition);
initialFetchStates.put(tp, new InitialFetchState(topicId, new
BrokerEndPoint(3, "host", 3000), 0, 0));
BaseRecords fetched = new BaseRecords() {
@@ -203,39 +199,8 @@ public class ReplicaFetcherThreadBenchmark {
.setLastStableOffset(0)
.setLogStartOffset(0)
.setRecords(fetched));
-
- updatePartitionState.add(
- new
UpdateMetadataRequestData.UpdateMetadataPartitionState()
- .setTopicName("topic")
- .setPartitionIndex(i)
- .setControllerEpoch(0)
- .setLeader(0)
- .setLeaderEpoch(0)
- .setIsr(replicas)
- .setZkVersion(1)
- .setReplicas(replicas));
}
- UpdateMetadataRequest updateMetadataRequest = new
UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion(),
- 0, 0, 0, updatePartitionState, Collections.emptyList(),
topicIds).build();
-
- // TODO: fix to support raft
- ZkMetadataCache metadataCache = MetadataCache.zkMetadataCache(0,
- config.interBrokerProtocolVersion(), BrokerFeatures.createEmpty());
- metadataCache.updateMetadata(0, updateMetadataRequest);
- replicaManager = new ReplicaManagerBuilder().
- setConfig(config).
- setMetrics(metrics).
- setTime(new MockTime()).
- setZkClient(Mockito.mock(KafkaZkClient.class)).
- setScheduler(scheduler).
- setLogManager(logManager).
- setQuotaManagers(Mockito.mock(QuotaFactory.QuotaManagers.class)).
- setBrokerTopicStats(brokerTopicStats).
- setMetadataCache(metadataCache).
- setLogDirFailureChannel(new LogDirFailureChannel(logDirs.size())).
- setAlterPartitionManager(TestUtils.createAlterIsrManager()).
- build();
replicaQuota = new ReplicaQuota() {
@Override
public boolean isQuotaExceeded() {
@@ -266,7 +231,9 @@ public class ReplicaFetcherThreadBenchmark {
replicaManager.shutdown(false);
logManager.shutdown(-1L);
scheduler.shutdown();
- Utils.delete(logDir);
+ for (File dir : CollectionConverters.asJava(logManager.liveLogDirs()))
{
+ Utils.delete(dir);
+ }
}
@Benchmark
@@ -275,18 +242,6 @@ public class ReplicaFetcherThreadBenchmark {
return fetcher.fetcherStats().requestRate().count();
}
- // avoid mocked DelayedOperations to avoid mocked class affecting
benchmark results
- private static class DelayedOperationsMock extends DelayedOperations {
- DelayedOperationsMock(Option<Uuid> topicId, TopicPartition
topicPartition) {
- super(topicId, topicPartition, null, null, null, null);
- }
-
- @Override
- public int numDelayedDelete() {
- return 0;
- }
- }
-
private static LogConfig createLogConfig() {
return new LogConfig(new Properties());
}
@@ -318,7 +273,7 @@ public class ReplicaFetcherThreadBenchmark {
replicaManager,
replicaQuota,
config::interBrokerProtocolVersion,
- () -> -1
+ () -> -1L
) {
@Override
public OffsetAndEpoch
fetchEarliestOffset(TopicPartition topicPartition, int currentLeaderEpoch) {