This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 95ba356dd75 KAFKA-17730 ReplicaFetcherThreadBenchmark is broken 
(#18382)
95ba356dd75 is described below

commit 95ba356dd759ae6d771e59b701cb255e3dd06ba2
Author: PoAn Yang <[email protected]>
AuthorDate: Thu Jan 9 23:47:27 2025 +0800

    KAFKA-17730 ReplicaFetcherThreadBenchmark is broken (#18382)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../jmh/fetcher/ReplicaFetcherThreadBenchmark.java | 105 ++++++---------------
 1 file changed, 30 insertions(+), 75 deletions(-)

diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index b624fc07358..ae0a582d0a0 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -17,11 +17,8 @@
 
 package org.apache.kafka.jmh.fetcher;
 
-import kafka.cluster.AlterPartitionListener;
-import kafka.cluster.DelayedOperations;
 import kafka.cluster.Partition;
 import kafka.log.LogManager;
-import kafka.server.AlterPartitionManager;
 import kafka.server.BrokerBlockingSender;
 import kafka.server.FailedPartitions;
 import kafka.server.InitialFetchState;
@@ -36,10 +33,8 @@ import kafka.server.ReplicaQuota;
 import kafka.server.builders.LogManagerBuilder;
 import kafka.server.builders.ReplicaManagerBuilder;
 import kafka.server.metadata.MockConfigRepository;
-import kafka.server.metadata.ZkMetadataCache;
 import kafka.utils.Pool;
 import kafka.utils.TestUtils;
-import kafka.zk.KafkaZkClient;
 
 import org.apache.kafka.clients.FetchSessionHandler;
 import org.apache.kafka.common.TopicIdPartition;
@@ -49,7 +44,6 @@ import org.apache.kafka.common.message.FetchResponseData;
 import org.apache.kafka.common.message.LeaderAndIsrRequestData;
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
-import org.apache.kafka.common.message.UpdateMetadataRequestData;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
@@ -57,11 +51,9 @@ import org.apache.kafka.common.record.BaseRecords;
 import org.apache.kafka.common.record.RecordsSend;
 import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.requests.FetchResponse;
-import org.apache.kafka.common.requests.UpdateMetadataRequest;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.server.BrokerFeatures;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.common.OffsetAndEpoch;
 import org.apache.kafka.server.network.BrokerEndPoint;
@@ -91,20 +83,21 @@ import org.openjdk.jmh.annotations.Warmup;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import scala.Option;
 import scala.collection.Iterator;
 import scala.collection.Map;
+import scala.jdk.javaapi.CollectionConverters;
+
+import static org.apache.kafka.server.common.KRaftVersion.KRAFT_VERSION_1;
 
 @State(Scope.Benchmark)
 @Fork(value = 1)
@@ -113,7 +106,6 @@ import scala.collection.Map;
 @BenchmarkMode(Mode.AverageTime)
 @OutputTimeUnit(TimeUnit.NANOSECONDS)
 public class ReplicaFetcherThreadBenchmark {
-    private final File logDir = new File(System.getProperty("java.io.tmpdir"), 
UUID.randomUUID().toString());
     private final KafkaScheduler scheduler = new KafkaScheduler(1, true, 
"scheduler");
     private final Pool<TopicPartition, Partition> pool = new 
Pool<>(Option.empty());
     private final Metrics metrics = new Metrics();
@@ -127,18 +119,16 @@ public class ReplicaFetcherThreadBenchmark {
 
     @Setup(Level.Trial)
     public void setup() throws IOException {
-        if (!logDir.mkdir())
-            throw new IOException("error creating test directory");
-
         scheduler.startup();
-        Properties props = new Properties();
-        props.put("zookeeper.connect", "127.0.0.1:9999");
-        KafkaConfig config = new KafkaConfig(props);
+        KafkaConfig config =  
KafkaConfig.fromProps(TestUtils.createBrokerConfig(
+            0, true, true, 9092, Option.empty(), Option.empty(),
+            Option.empty(), true, false, 0, false, 0, false, 0, 
Option.empty(), 1, true, 1,
+            (short) 1, false));
         LogConfig logConfig = createLogConfig();
 
         BrokerTopicStats brokerTopicStats = new BrokerTopicStats(false);
-        LogDirFailureChannel logDirFailureChannel = 
Mockito.mock(LogDirFailureChannel.class);
-        List<File> logDirs = Collections.singletonList(logDir);
+        LogDirFailureChannel logDirFailureChannel = new 
LogDirFailureChannel(config.logDirs().size());
+        List<File> logDirs = 
CollectionConverters.asJava(config.logDirs()).stream().map(File::new).collect(Collectors.toList());
         logManager = new LogManagerBuilder().
             setLogDirs(logDirs).
             setInitialOfflineDirs(Collections.emptyList()).
@@ -159,10 +149,21 @@ public class ReplicaFetcherThreadBenchmark {
             setKeepPartitionMetadataFile(true).
             build();
 
+        replicaManager = new ReplicaManagerBuilder().
+            setConfig(config).
+            setMetrics(metrics).
+            setTime(new MockTime()).
+            setScheduler(scheduler).
+            setLogManager(logManager).
+            setQuotaManagers(Mockito.mock(QuotaFactory.QuotaManagers.class)).
+            setBrokerTopicStats(brokerTopicStats).
+            setMetadataCache(MetadataCache.kRaftMetadataCache(config.nodeId(), 
() -> KRAFT_VERSION_1)).
+            setLogDirFailureChannel(new LogDirFailureChannel(logDirs.size())).
+            setAlterPartitionManager(TestUtils.createAlterIsrManager()).
+            build();
+
         LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> 
initialFetched = new LinkedHashMap<>();
-        HashMap<String, Uuid> topicIds = new HashMap<>();
         scala.collection.mutable.Map<TopicPartition, InitialFetchState> 
initialFetchStates = new scala.collection.mutable.HashMap<>();
-        List<UpdateMetadataRequestData.UpdateMetadataPartitionState> 
updatePartitionState = new ArrayList<>();
         for (int i = 0; i < partitionCount; i++) {
             TopicPartition tp = new TopicPartition("topic", i);
 
@@ -176,15 +177,10 @@ public class ReplicaFetcherThreadBenchmark {
                     .setReplicas(replicas)
                     .setIsNew(true);
 
-            AlterPartitionListener alterPartitionListener = 
Mockito.mock(AlterPartitionListener.class);
-            OffsetCheckpoints offsetCheckpoints = 
Mockito.mock(OffsetCheckpoints.class);
-            Mockito.when(offsetCheckpoints.fetch(logDir.getAbsolutePath(), 
tp)).thenReturn(Optional.of(0L));
-            AlterPartitionManager isrChannelManager = 
Mockito.mock(AlterPartitionManager.class);
-            Partition partition = new Partition(tp, 100, 
MetadataVersion.latestTesting(),
-                    0, () -> -1, Time.SYSTEM, alterPartitionListener, new 
DelayedOperationsMock(topicId, tp),
-                    Mockito.mock(MetadataCache.class), logManager, 
isrChannelManager, topicId);
+            OffsetCheckpoints checkpoints = (logDir, topicPartition) -> 
Optional.of(0L);
+            Partition partition = replicaManager.createPartition(tp);
 
-            partition.makeFollower(partitionState, offsetCheckpoints, topicId, 
Option.empty());
+            partition.makeFollower(partitionState, checkpoints, topicId, 
Option.empty());
             pool.put(tp, partition);
             initialFetchStates.put(tp, new InitialFetchState(topicId, new 
BrokerEndPoint(3, "host", 3000), 0, 0));
             BaseRecords fetched = new BaseRecords() {
@@ -203,39 +199,8 @@ public class ReplicaFetcherThreadBenchmark {
                     .setLastStableOffset(0)
                     .setLogStartOffset(0)
                     .setRecords(fetched));
-
-            updatePartitionState.add(
-                    new 
UpdateMetadataRequestData.UpdateMetadataPartitionState()
-                            .setTopicName("topic")
-                            .setPartitionIndex(i)
-                            .setControllerEpoch(0)
-                            .setLeader(0)
-                            .setLeaderEpoch(0)
-                            .setIsr(replicas)
-                            .setZkVersion(1)
-                            .setReplicas(replicas));
         }
-        UpdateMetadataRequest updateMetadataRequest = new 
UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion(),
-                0, 0, 0, updatePartitionState, Collections.emptyList(), 
topicIds).build();
-
-        // TODO: fix to support raft
-        ZkMetadataCache metadataCache = MetadataCache.zkMetadataCache(0,
-            config.interBrokerProtocolVersion(), BrokerFeatures.createEmpty());
-        metadataCache.updateMetadata(0, updateMetadataRequest);
 
-        replicaManager = new ReplicaManagerBuilder().
-            setConfig(config).
-            setMetrics(metrics).
-            setTime(new MockTime()).
-            setZkClient(Mockito.mock(KafkaZkClient.class)).
-            setScheduler(scheduler).
-            setLogManager(logManager).
-            setQuotaManagers(Mockito.mock(QuotaFactory.QuotaManagers.class)).
-            setBrokerTopicStats(brokerTopicStats).
-            setMetadataCache(metadataCache).
-            setLogDirFailureChannel(new LogDirFailureChannel(logDirs.size())).
-            setAlterPartitionManager(TestUtils.createAlterIsrManager()).
-            build();
         replicaQuota = new ReplicaQuota() {
             @Override
             public boolean isQuotaExceeded() {
@@ -266,7 +231,9 @@ public class ReplicaFetcherThreadBenchmark {
         replicaManager.shutdown(false);
         logManager.shutdown(-1L);
         scheduler.shutdown();
-        Utils.delete(logDir);
+        for (File dir : CollectionConverters.asJava(logManager.liveLogDirs())) 
{
+            Utils.delete(dir);
+        }
     }
 
     @Benchmark
@@ -275,18 +242,6 @@ public class ReplicaFetcherThreadBenchmark {
         return fetcher.fetcherStats().requestRate().count();
     }
 
-    // avoid mocked DelayedOperations to avoid mocked class affecting 
benchmark results
-    private static class DelayedOperationsMock extends DelayedOperations {
-        DelayedOperationsMock(Option<Uuid>  topicId, TopicPartition 
topicPartition) {
-            super(topicId, topicPartition, null, null, null, null);
-        }
-
-        @Override
-        public int numDelayedDelete() {
-            return 0;
-        }
-    }
-
     private static LogConfig createLogConfig() {
         return new LogConfig(new Properties());
     }
@@ -318,7 +273,7 @@ public class ReplicaFetcherThreadBenchmark {
                             replicaManager,
                             replicaQuota,
                             config::interBrokerProtocolVersion,
-                            () -> -1
+                            () -> -1L
                     ) {
                         @Override
                         public OffsetAndEpoch 
fetchEarliestOffset(TopicPartition topicPartition, int currentLeaderEpoch) {

Reply via email to