This is an automated email from the ASF dual-hosted git repository.

clolov pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit f1b36343bae7199ff0b55ffc4fd3deb3b7d12d52
Author: TengYao Chi <[email protected]>
AuthorDate: Fri Jan 24 01:34:36 2025 +0800

    KAFKA-18592 Cleanup ReplicaManager (#18621)
    
    Reviewers: Ismael Juma <[email protected]>, Christo Lolov 
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../server/builders/ReplicaManagerBuilder.java     | 33 ----------------------
 core/src/main/scala/kafka/cluster/Partition.scala  | 18 ------------
 .../main/scala/kafka/server/ReplicaManager.scala   | 27 +-----------------
 .../AbstractCoordinatorConcurrencyTest.scala       |  1 -
 .../unit/kafka/server/ReplicaManagerTest.scala     |  6 ----
 docs/zk2kraft.html                                 |  9 ++++++
 6 files changed, 10 insertions(+), 84 deletions(-)

diff --git 
a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java 
b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
index 626b53c12c4..b580485139b 100644
--- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
+++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
@@ -22,7 +22,6 @@ import kafka.log.remote.RemoteLogManager;
 import kafka.server.AddPartitionsToTxnManager;
 import kafka.server.AlterPartitionManager;
 import kafka.server.DelayedDeleteRecords;
-import kafka.server.DelayedElectLeader;
 import kafka.server.DelayedFetch;
 import kafka.server.DelayedProduce;
 import kafka.server.DelayedRemoteFetch;
@@ -66,7 +65,6 @@ public class ReplicaManagerBuilder {
     private Optional<DelayedOperationPurgatory<DelayedProduce>> 
delayedProducePurgatory = Optional.empty();
     private Optional<DelayedOperationPurgatory<DelayedFetch>> 
delayedFetchPurgatory = Optional.empty();
     private Optional<DelayedOperationPurgatory<DelayedDeleteRecords>> 
delayedDeleteRecordsPurgatory = Optional.empty();
-    private Optional<DelayedOperationPurgatory<DelayedElectLeader>> 
delayedElectLeaderPurgatory = Optional.empty();
     private Optional<DelayedOperationPurgatory<DelayedRemoteFetch>> 
delayedRemoteFetchPurgatory = Optional.empty();
     private Optional<DelayedOperationPurgatory<DelayedRemoteListOffsets>> 
delayedRemoteListOffsetsPurgatory = Optional.empty();
     private Optional<DelayedOperationPurgatory<DelayedShareFetch>> 
delayedShareFetchPurgatory = Optional.empty();
@@ -130,36 +128,11 @@ public class ReplicaManagerBuilder {
         return this;
     }
 
-    public ReplicaManagerBuilder setIsShuttingDown(AtomicBoolean 
isShuttingDown) {
-        this.isShuttingDown = isShuttingDown;
-        return this;
-    }
-
-    public ReplicaManagerBuilder 
setDelayedProducePurgatory(DelayedOperationPurgatory<DelayedProduce> 
delayedProducePurgatory) {
-        this.delayedProducePurgatory = Optional.of(delayedProducePurgatory);
-        return this;
-    }
-
     public ReplicaManagerBuilder 
setDelayedFetchPurgatory(DelayedOperationPurgatory<DelayedFetch> 
delayedFetchPurgatory) {
         this.delayedFetchPurgatory = Optional.of(delayedFetchPurgatory);
         return this;
     }
 
-    public ReplicaManagerBuilder 
setDelayedRemoteFetchPurgatory(DelayedOperationPurgatory<DelayedRemoteFetch> 
delayedRemoteFetchPurgatory) {
-        this.delayedRemoteFetchPurgatory = 
Optional.of(delayedRemoteFetchPurgatory);
-        return this;
-    }
-
-    public ReplicaManagerBuilder 
setDelayedDeleteRecordsPurgatory(DelayedOperationPurgatory<DelayedDeleteRecords>
 delayedDeleteRecordsPurgatory) {
-        this.delayedDeleteRecordsPurgatory = 
Optional.of(delayedDeleteRecordsPurgatory);
-        return this;
-    }
-
-    public ReplicaManagerBuilder 
setDelayedElectLeaderPurgatoryParam(DelayedOperationPurgatory<DelayedElectLeader>
 delayedElectLeaderPurgatory) {
-        this.delayedElectLeaderPurgatory = 
Optional.of(delayedElectLeaderPurgatory);
-        return this;
-    }
-
     public ReplicaManagerBuilder setThreadNamePrefix(String threadNamePrefix) {
         this.threadNamePrefix = Optional.of(threadNamePrefix);
         return this;
@@ -170,11 +143,6 @@ public class ReplicaManagerBuilder {
         return this;
     }
 
-    public ReplicaManagerBuilder 
setAddPartitionsToTransactionManager(AddPartitionsToTxnManager 
addPartitionsToTxnManager) {
-        this.addPartitionsToTxnManager = 
Optional.of(addPartitionsToTxnManager);
-        return this;
-    }
-
     public ReplicaManagerBuilder 
setDirectoryEventHandler(DirectoryEventHandler directoryEventHandler) {
         this.directoryEventHandler = directoryEventHandler;
         return this;
@@ -206,7 +174,6 @@ public class ReplicaManagerBuilder {
                              OptionConverters.toScala(delayedProducePurgatory),
                              OptionConverters.toScala(delayedFetchPurgatory),
                              
OptionConverters.toScala(delayedDeleteRecordsPurgatory),
-                             
OptionConverters.toScala(delayedElectLeaderPurgatory),
                              
OptionConverters.toScala(delayedRemoteFetchPurgatory),
                              
OptionConverters.toScala(delayedRemoteListOffsetsPurgatory),
                              
OptionConverters.toScala(delayedShareFetchPurgatory),
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index f41684bbe9d..261383b18d9 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -1634,24 +1634,6 @@ class Partition(val topicPartition: TopicPartition,
     localLog.fetchOffsetSnapshot
   }
 
-  def legacyFetchOffsetsForTimestamp(timestamp: Long,
-                                     maxNumOffsets: Int,
-                                     isFromConsumer: Boolean,
-                                     fetchOnlyFromLeader: Boolean): Seq[Long] 
= inReadLock(leaderIsrUpdateLock) {
-    val localLog = localLogWithEpochOrThrow(Optional.empty(), 
fetchOnlyFromLeader)
-    val allOffsets = localLog.legacyFetchOffsetsBefore(timestamp, 
maxNumOffsets)
-
-    if (!isFromConsumer) {
-      allOffsets
-    } else {
-      val hw = localLog.highWatermark
-      if (allOffsets.exists(_ > hw))
-        hw +: allOffsets.dropWhile(_ > hw)
-      else
-        allOffsets
-    }
-  }
-
   def logStartOffset: Long = {
     inReadLock(leaderIsrUpdateLock) {
       leaderLogIfLocal.map(_.logStartOffset).getOrElse(-1)
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index b76e151c2d3..d8e415028ea 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -53,7 +53,7 @@ import org.apache.kafka.server.{ActionQueue, 
DelayedActionQueue, common}
 import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal, 
StopPartition, TopicOptionalIdPartition}
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.network.BrokerEndPoint
-import org.apache.kafka.server.purgatory.{DelayedOperationKey, 
DelayedOperationPurgatory, TopicPartitionOperationKey}
+import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, 
TopicPartitionOperationKey}
 import org.apache.kafka.server.share.fetch.{DelayedShareFetchKey, 
DelayedShareFetchPartitionKey}
 import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
 import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
@@ -274,7 +274,6 @@ class ReplicaManager(val config: KafkaConfig,
                      delayedProducePurgatoryParam: 
Option[DelayedOperationPurgatory[DelayedProduce]] = None,
                      delayedFetchPurgatoryParam: 
Option[DelayedOperationPurgatory[DelayedFetch]] = None,
                      delayedDeleteRecordsPurgatoryParam: 
Option[DelayedOperationPurgatory[DelayedDeleteRecords]] = None,
-                     delayedElectLeaderPurgatoryParam: 
Option[DelayedOperationPurgatory[DelayedElectLeader]] = None,
                      delayedRemoteFetchPurgatoryParam: 
Option[DelayedOperationPurgatory[DelayedRemoteFetch]] = None,
                      delayedRemoteListOffsetsPurgatoryParam: 
Option[DelayedOperationPurgatory[DelayedRemoteListOffsets]] = None,
                      delayedShareFetchPurgatoryParam: 
Option[DelayedOperationPurgatory[DelayedShareFetch]] = None,
@@ -298,9 +297,6 @@ class ReplicaManager(val config: KafkaConfig,
     new DelayedOperationPurgatory[DelayedDeleteRecords](
       "DeleteRecords", config.brokerId,
       config.deleteRecordsPurgatoryPurgeIntervalRequests))
-  val delayedElectLeaderPurgatory = delayedElectLeaderPurgatoryParam.getOrElse(
-    new DelayedOperationPurgatory[DelayedElectLeader](
-      "ElectLeader", config.brokerId))
   val delayedRemoteFetchPurgatory = delayedRemoteFetchPurgatoryParam.getOrElse(
     new DelayedOperationPurgatory[DelayedRemoteFetch](
       "RemoteFetch", config.brokerId))
@@ -387,13 +383,6 @@ class ReplicaManager(val config: KafkaConfig,
 
   def getLog(topicPartition: TopicPartition): Option[UnifiedLog] = 
logManager.getLog(topicPartition)
 
-  def hasDelayedElectionOperations: Boolean = 
delayedElectLeaderPurgatory.numDelayed != 0
-
-  def tryCompleteElection(key: DelayedOperationKey): Unit = {
-    val completed = delayedElectLeaderPurgatory.checkAndComplete(key)
-    debug("Request key %s unblocked %d ElectLeader.".format(key.keyLabel, 
completed))
-  }
-
   def startup(): Unit = {
     // start ISR expiration thread
     // A follower can lag behind leader for up to config.replicaLagTimeMaxMs x 
1.5 before it is removed from ISR
@@ -628,10 +617,6 @@ class ReplicaManager(val config: KafkaConfig,
     onlinePartition(topicPartition).flatMap(_.log)
   }
 
-  def getLogDir(topicPartition: TopicPartition): Option[String] = {
-    localLog(topicPartition).map(_.parentDir)
-  }
-
   def tryCompleteActions(): Unit = defaultActionQueue.tryCompleteActions()
 
   def addToActionQueue(action: Runnable): Unit = defaultActionQueue.add(action)
@@ -1490,15 +1475,6 @@ class ReplicaManager(val config: KafkaConfig,
     partition.fetchOffsetForTimestamp(timestamp, isolationLevel, 
currentLeaderEpoch, fetchOnlyFromLeader, remoteLogManager)
   }
 
-  def legacyFetchOffsetsForTimestamp(topicPartition: TopicPartition,
-                                     timestamp: Long,
-                                     maxNumOffsets: Int,
-                                     isFromConsumer: Boolean,
-                                     fetchOnlyFromLeader: Boolean): Seq[Long] 
= {
-    val partition = getPartitionOrException(topicPartition)
-    partition.legacyFetchOffsetsForTimestamp(timestamp, maxNumOffsets, 
isFromConsumer, fetchOnlyFromLeader)
-  }
-
   /**
    * Returns [[LogReadResult]] with error if a task for RemoteStorageFetchInfo 
could not be scheduled successfully
    * else returns [[None]].
@@ -2525,7 +2501,6 @@ class ReplicaManager(val config: KafkaConfig,
     delayedRemoteListOffsetsPurgatory.shutdown()
     delayedProducePurgatory.shutdown()
     delayedDeleteRecordsPurgatory.shutdown()
-    delayedElectLeaderPurgatory.shutdown()
     delayedShareFetchPurgatory.shutdown()
     if (checkpointHW)
       checkpointHighWatermarks()
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
index df8b6b6f6d4..bdc12c3051e 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
@@ -190,7 +190,6 @@ object AbstractCoordinatorConcurrencyTest {
       delayedProducePurgatoryParam = Some(producePurgatory),
       delayedFetchPurgatoryParam = Some(delayedFetchPurgatoryParam),
       delayedDeleteRecordsPurgatoryParam = 
Some(delayedDeleteRecordsPurgatoryParam),
-      delayedElectLeaderPurgatoryParam = 
Some(delayedElectLeaderPurgatoryParam),
       delayedRemoteFetchPurgatoryParam = 
Some(delayedRemoteFetchPurgatoryParam),
       delayedRemoteListOffsetsPurgatoryParam = 
Some(delayedRemoteListOffsetsPurgatoryParam),
       threadNamePrefix = Option(this.getClass.getName)) {
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 2060ed50596..73acbaca278 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -2724,8 +2724,6 @@ class ReplicaManagerTest {
       "Fetch", timer, 0, false)
     val mockDeleteRecordsPurgatory = new 
DelayedOperationPurgatory[DelayedDeleteRecords](
       "DeleteRecords", timer, 0, false)
-    val mockElectLeaderPurgatory = new 
DelayedOperationPurgatory[DelayedElectLeader](
-      "ElectLeader", timer, 0, false)
     val mockRemoteFetchPurgatory = new 
DelayedOperationPurgatory[DelayedRemoteFetch](
       "RemoteFetch", timer, 0, false)
     val mockRemoteListOffsetsPurgatory = new 
DelayedOperationPurgatory[DelayedRemoteListOffsets](
@@ -2755,7 +2753,6 @@ class ReplicaManagerTest {
       delayedProducePurgatoryParam = Some(mockProducePurgatory),
       delayedFetchPurgatoryParam = Some(mockFetchPurgatory),
       delayedDeleteRecordsPurgatoryParam = Some(mockDeleteRecordsPurgatory),
-      delayedElectLeaderPurgatoryParam = Some(mockElectLeaderPurgatory),
       delayedRemoteFetchPurgatoryParam = Some(mockRemoteFetchPurgatory),
       delayedRemoteListOffsetsPurgatoryParam = 
Some(mockRemoteListOffsetsPurgatory),
       delayedShareFetchPurgatoryParam = Some(mockDelayedShareFetchPurgatory),
@@ -3151,8 +3148,6 @@ class ReplicaManagerTest {
       "Fetch", timer, 0, false)
     val mockDeleteRecordsPurgatory = new 
DelayedOperationPurgatory[DelayedDeleteRecords](
       "DeleteRecords", timer, 0, false)
-    val mockDelayedElectLeaderPurgatory = new 
DelayedOperationPurgatory[DelayedElectLeader](
-      "DelayedElectLeader", timer, 0, false)
     val mockDelayedRemoteFetchPurgatory = new 
DelayedOperationPurgatory[DelayedRemoteFetch](
       "DelayedRemoteFetch", timer, 0, false)
     val mockDelayedRemoteListOffsetsPurgatory = new 
DelayedOperationPurgatory[DelayedRemoteListOffsets](
@@ -3189,7 +3184,6 @@ class ReplicaManagerTest {
       delayedProducePurgatoryParam = Some(mockProducePurgatory),
       delayedFetchPurgatoryParam = Some(mockFetchPurgatory),
       delayedDeleteRecordsPurgatoryParam = Some(mockDeleteRecordsPurgatory),
-      delayedElectLeaderPurgatoryParam = Some(mockDelayedElectLeaderPurgatory),
       delayedRemoteFetchPurgatoryParam = Some(mockDelayedRemoteFetchPurgatory),
       delayedRemoteListOffsetsPurgatoryParam = 
Some(mockDelayedRemoteListOffsetsPurgatory),
       delayedShareFetchPurgatoryParam = Some(mockDelayedShareFetchPurgatory),
diff --git a/docs/zk2kraft.html b/docs/zk2kraft.html
index 123aaca4e18..2d3e8148c80 100644
--- a/docs/zk2kraft.html
+++ b/docs/zk2kraft.html
@@ -188,5 +188,14 @@
                 In Kraft mode, Zookeeper is not used, so the metrics is 
removed.
             </p>
         </li>
+        <li>
+            <p>
+                Remove the metrics for leader election purgatory.
+            </p>
+            <ul>
+                
<li><code>kafka.server:type=DelayedOperationPurgatory,delayedOperation=ElectLeader,name=PurgatorySize</code></li>
+                
<li><code>kafka.server:type=DelayedOperationPurgatory,delayedOperation=ElectLeader,name=NumDelayedOperations</code></li>
+            </ul>
+        </li>
     </ul>
 </div>
\ No newline at end of file

Reply via email to