This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 40890faa1be KAFKA-18592 Cleanup ReplicaManager (#18621)
40890faa1be is described below
commit 40890faa1bed8296919a7e108937ec559d901348
Author: TengYao Chi <[email protected]>
AuthorDate: Fri Jan 24 01:34:36 2025 +0800
KAFKA-18592 Cleanup ReplicaManager (#18621)
Reviewers: Ismael Juma <[email protected]>, Christo Lolov
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../server/builders/ReplicaManagerBuilder.java | 33 ----------------------
core/src/main/scala/kafka/cluster/Partition.scala | 18 ------------
.../main/scala/kafka/server/ReplicaManager.scala | 27 +-----------------
.../AbstractCoordinatorConcurrencyTest.scala | 1 -
.../unit/kafka/server/ReplicaManagerTest.scala | 6 ----
docs/zk2kraft.html | 9 ++++++
6 files changed, 10 insertions(+), 84 deletions(-)
diff --git
a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
index 626b53c12c4..b580485139b 100644
--- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
+++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
@@ -22,7 +22,6 @@ import kafka.log.remote.RemoteLogManager;
import kafka.server.AddPartitionsToTxnManager;
import kafka.server.AlterPartitionManager;
import kafka.server.DelayedDeleteRecords;
-import kafka.server.DelayedElectLeader;
import kafka.server.DelayedFetch;
import kafka.server.DelayedProduce;
import kafka.server.DelayedRemoteFetch;
@@ -66,7 +65,6 @@ public class ReplicaManagerBuilder {
private Optional<DelayedOperationPurgatory<DelayedProduce>>
delayedProducePurgatory = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedFetch>>
delayedFetchPurgatory = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedDeleteRecords>>
delayedDeleteRecordsPurgatory = Optional.empty();
- private Optional<DelayedOperationPurgatory<DelayedElectLeader>>
delayedElectLeaderPurgatory = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedRemoteFetch>>
delayedRemoteFetchPurgatory = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedRemoteListOffsets>>
delayedRemoteListOffsetsPurgatory = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedShareFetch>>
delayedShareFetchPurgatory = Optional.empty();
@@ -130,36 +128,11 @@ public class ReplicaManagerBuilder {
return this;
}
- public ReplicaManagerBuilder setIsShuttingDown(AtomicBoolean
isShuttingDown) {
- this.isShuttingDown = isShuttingDown;
- return this;
- }
-
- public ReplicaManagerBuilder
setDelayedProducePurgatory(DelayedOperationPurgatory<DelayedProduce>
delayedProducePurgatory) {
- this.delayedProducePurgatory = Optional.of(delayedProducePurgatory);
- return this;
- }
-
public ReplicaManagerBuilder
setDelayedFetchPurgatory(DelayedOperationPurgatory<DelayedFetch>
delayedFetchPurgatory) {
this.delayedFetchPurgatory = Optional.of(delayedFetchPurgatory);
return this;
}
- public ReplicaManagerBuilder
setDelayedRemoteFetchPurgatory(DelayedOperationPurgatory<DelayedRemoteFetch>
delayedRemoteFetchPurgatory) {
- this.delayedRemoteFetchPurgatory =
Optional.of(delayedRemoteFetchPurgatory);
- return this;
- }
-
- public ReplicaManagerBuilder
setDelayedDeleteRecordsPurgatory(DelayedOperationPurgatory<DelayedDeleteRecords>
delayedDeleteRecordsPurgatory) {
- this.delayedDeleteRecordsPurgatory =
Optional.of(delayedDeleteRecordsPurgatory);
- return this;
- }
-
- public ReplicaManagerBuilder
setDelayedElectLeaderPurgatoryParam(DelayedOperationPurgatory<DelayedElectLeader>
delayedElectLeaderPurgatory) {
- this.delayedElectLeaderPurgatory =
Optional.of(delayedElectLeaderPurgatory);
- return this;
- }
-
public ReplicaManagerBuilder setThreadNamePrefix(String threadNamePrefix) {
this.threadNamePrefix = Optional.of(threadNamePrefix);
return this;
@@ -170,11 +143,6 @@ public class ReplicaManagerBuilder {
return this;
}
- public ReplicaManagerBuilder
setAddPartitionsToTransactionManager(AddPartitionsToTxnManager
addPartitionsToTxnManager) {
- this.addPartitionsToTxnManager =
Optional.of(addPartitionsToTxnManager);
- return this;
- }
-
public ReplicaManagerBuilder
setDirectoryEventHandler(DirectoryEventHandler directoryEventHandler) {
this.directoryEventHandler = directoryEventHandler;
return this;
@@ -206,7 +174,6 @@ public class ReplicaManagerBuilder {
OptionConverters.toScala(delayedProducePurgatory),
OptionConverters.toScala(delayedFetchPurgatory),
OptionConverters.toScala(delayedDeleteRecordsPurgatory),
-
OptionConverters.toScala(delayedElectLeaderPurgatory),
OptionConverters.toScala(delayedRemoteFetchPurgatory),
OptionConverters.toScala(delayedRemoteListOffsetsPurgatory),
OptionConverters.toScala(delayedShareFetchPurgatory),
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala
b/core/src/main/scala/kafka/cluster/Partition.scala
index 4764bba5cb7..5035c86aa06 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -1635,24 +1635,6 @@ class Partition(val topicPartition: TopicPartition,
localLog.fetchOffsetSnapshot
}
- def legacyFetchOffsetsForTimestamp(timestamp: Long,
- maxNumOffsets: Int,
- isFromConsumer: Boolean,
- fetchOnlyFromLeader: Boolean): Seq[Long]
= inReadLock(leaderIsrUpdateLock) {
- val localLog = localLogWithEpochOrThrow(Optional.empty(),
fetchOnlyFromLeader)
- val allOffsets = localLog.legacyFetchOffsetsBefore(timestamp,
maxNumOffsets)
-
- if (!isFromConsumer) {
- allOffsets
- } else {
- val hw = localLog.highWatermark
- if (allOffsets.exists(_ > hw))
- hw +: allOffsets.dropWhile(_ > hw)
- else
- allOffsets
- }
- }
-
def logStartOffset: Long = {
inReadLock(leaderIsrUpdateLock) {
leaderLogIfLocal.map(_.logStartOffset).getOrElse(-1)
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index b76e151c2d3..d8e415028ea 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -53,7 +53,7 @@ import org.apache.kafka.server.{ActionQueue,
DelayedActionQueue, common}
import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal,
StopPartition, TopicOptionalIdPartition}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.network.BrokerEndPoint
-import org.apache.kafka.server.purgatory.{DelayedOperationKey,
DelayedOperationPurgatory, TopicPartitionOperationKey}
+import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory,
TopicPartitionOperationKey}
import org.apache.kafka.server.share.fetch.{DelayedShareFetchKey,
DelayedShareFetchPartitionKey}
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
@@ -274,7 +274,6 @@ class ReplicaManager(val config: KafkaConfig,
delayedProducePurgatoryParam:
Option[DelayedOperationPurgatory[DelayedProduce]] = None,
delayedFetchPurgatoryParam:
Option[DelayedOperationPurgatory[DelayedFetch]] = None,
delayedDeleteRecordsPurgatoryParam:
Option[DelayedOperationPurgatory[DelayedDeleteRecords]] = None,
- delayedElectLeaderPurgatoryParam:
Option[DelayedOperationPurgatory[DelayedElectLeader]] = None,
delayedRemoteFetchPurgatoryParam:
Option[DelayedOperationPurgatory[DelayedRemoteFetch]] = None,
delayedRemoteListOffsetsPurgatoryParam:
Option[DelayedOperationPurgatory[DelayedRemoteListOffsets]] = None,
delayedShareFetchPurgatoryParam:
Option[DelayedOperationPurgatory[DelayedShareFetch]] = None,
@@ -298,9 +297,6 @@ class ReplicaManager(val config: KafkaConfig,
new DelayedOperationPurgatory[DelayedDeleteRecords](
"DeleteRecords", config.brokerId,
config.deleteRecordsPurgatoryPurgeIntervalRequests))
- val delayedElectLeaderPurgatory = delayedElectLeaderPurgatoryParam.getOrElse(
- new DelayedOperationPurgatory[DelayedElectLeader](
- "ElectLeader", config.brokerId))
val delayedRemoteFetchPurgatory = delayedRemoteFetchPurgatoryParam.getOrElse(
new DelayedOperationPurgatory[DelayedRemoteFetch](
"RemoteFetch", config.brokerId))
@@ -387,13 +383,6 @@ class ReplicaManager(val config: KafkaConfig,
def getLog(topicPartition: TopicPartition): Option[UnifiedLog] =
logManager.getLog(topicPartition)
- def hasDelayedElectionOperations: Boolean =
delayedElectLeaderPurgatory.numDelayed != 0
-
- def tryCompleteElection(key: DelayedOperationKey): Unit = {
- val completed = delayedElectLeaderPurgatory.checkAndComplete(key)
- debug("Request key %s unblocked %d ElectLeader.".format(key.keyLabel,
completed))
- }
-
def startup(): Unit = {
// start ISR expiration thread
// A follower can lag behind leader for up to config.replicaLagTimeMaxMs x
1.5 before it is removed from ISR
@@ -628,10 +617,6 @@ class ReplicaManager(val config: KafkaConfig,
onlinePartition(topicPartition).flatMap(_.log)
}
- def getLogDir(topicPartition: TopicPartition): Option[String] = {
- localLog(topicPartition).map(_.parentDir)
- }
-
def tryCompleteActions(): Unit = defaultActionQueue.tryCompleteActions()
def addToActionQueue(action: Runnable): Unit = defaultActionQueue.add(action)
@@ -1490,15 +1475,6 @@ class ReplicaManager(val config: KafkaConfig,
partition.fetchOffsetForTimestamp(timestamp, isolationLevel,
currentLeaderEpoch, fetchOnlyFromLeader, remoteLogManager)
}
- def legacyFetchOffsetsForTimestamp(topicPartition: TopicPartition,
- timestamp: Long,
- maxNumOffsets: Int,
- isFromConsumer: Boolean,
- fetchOnlyFromLeader: Boolean): Seq[Long]
= {
- val partition = getPartitionOrException(topicPartition)
- partition.legacyFetchOffsetsForTimestamp(timestamp, maxNumOffsets,
isFromConsumer, fetchOnlyFromLeader)
- }
-
/**
* Returns [[LogReadResult]] with error if a task for RemoteStorageFetchInfo
could not be scheduled successfully
* else returns [[None]].
@@ -2525,7 +2501,6 @@ class ReplicaManager(val config: KafkaConfig,
delayedRemoteListOffsetsPurgatory.shutdown()
delayedProducePurgatory.shutdown()
delayedDeleteRecordsPurgatory.shutdown()
- delayedElectLeaderPurgatory.shutdown()
delayedShareFetchPurgatory.shutdown()
if (checkpointHW)
checkpointHighWatermarks()
diff --git
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
index df8b6b6f6d4..bdc12c3051e 100644
---
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
@@ -190,7 +190,6 @@ object AbstractCoordinatorConcurrencyTest {
delayedProducePurgatoryParam = Some(producePurgatory),
delayedFetchPurgatoryParam = Some(delayedFetchPurgatoryParam),
delayedDeleteRecordsPurgatoryParam =
Some(delayedDeleteRecordsPurgatoryParam),
- delayedElectLeaderPurgatoryParam =
Some(delayedElectLeaderPurgatoryParam),
delayedRemoteFetchPurgatoryParam =
Some(delayedRemoteFetchPurgatoryParam),
delayedRemoteListOffsetsPurgatoryParam =
Some(delayedRemoteListOffsetsPurgatoryParam),
threadNamePrefix = Option(this.getClass.getName)) {
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 6f6bc663082..a6aa6f4b5ee 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -2723,8 +2723,6 @@ class ReplicaManagerTest {
"Fetch", timer, 0, false)
val mockDeleteRecordsPurgatory = new
DelayedOperationPurgatory[DelayedDeleteRecords](
"DeleteRecords", timer, 0, false)
- val mockElectLeaderPurgatory = new
DelayedOperationPurgatory[DelayedElectLeader](
- "ElectLeader", timer, 0, false)
val mockRemoteFetchPurgatory = new
DelayedOperationPurgatory[DelayedRemoteFetch](
"RemoteFetch", timer, 0, false)
val mockRemoteListOffsetsPurgatory = new
DelayedOperationPurgatory[DelayedRemoteListOffsets](
@@ -2754,7 +2752,6 @@ class ReplicaManagerTest {
delayedProducePurgatoryParam = Some(mockProducePurgatory),
delayedFetchPurgatoryParam = Some(mockFetchPurgatory),
delayedDeleteRecordsPurgatoryParam = Some(mockDeleteRecordsPurgatory),
- delayedElectLeaderPurgatoryParam = Some(mockElectLeaderPurgatory),
delayedRemoteFetchPurgatoryParam = Some(mockRemoteFetchPurgatory),
delayedRemoteListOffsetsPurgatoryParam =
Some(mockRemoteListOffsetsPurgatory),
delayedShareFetchPurgatoryParam = Some(mockDelayedShareFetchPurgatory),
@@ -3150,8 +3147,6 @@ class ReplicaManagerTest {
"Fetch", timer, 0, false)
val mockDeleteRecordsPurgatory = new
DelayedOperationPurgatory[DelayedDeleteRecords](
"DeleteRecords", timer, 0, false)
- val mockDelayedElectLeaderPurgatory = new
DelayedOperationPurgatory[DelayedElectLeader](
- "DelayedElectLeader", timer, 0, false)
val mockDelayedRemoteFetchPurgatory = new
DelayedOperationPurgatory[DelayedRemoteFetch](
"DelayedRemoteFetch", timer, 0, false)
val mockDelayedRemoteListOffsetsPurgatory = new
DelayedOperationPurgatory[DelayedRemoteListOffsets](
@@ -3188,7 +3183,6 @@ class ReplicaManagerTest {
delayedProducePurgatoryParam = Some(mockProducePurgatory),
delayedFetchPurgatoryParam = Some(mockFetchPurgatory),
delayedDeleteRecordsPurgatoryParam = Some(mockDeleteRecordsPurgatory),
- delayedElectLeaderPurgatoryParam = Some(mockDelayedElectLeaderPurgatory),
delayedRemoteFetchPurgatoryParam = Some(mockDelayedRemoteFetchPurgatory),
delayedRemoteListOffsetsPurgatoryParam =
Some(mockDelayedRemoteListOffsetsPurgatory),
delayedShareFetchPurgatoryParam = Some(mockDelayedShareFetchPurgatory),
diff --git a/docs/zk2kraft.html b/docs/zk2kraft.html
index 123aaca4e18..2d3e8148c80 100644
--- a/docs/zk2kraft.html
+++ b/docs/zk2kraft.html
@@ -188,5 +188,14 @@
In Kraft mode, Zookeeper is not used, so the metrics is
removed.
</p>
</li>
+ <li>
+ <p>
+ Remove the metrics for leader election purgatory.
+ </p>
+ <ul>
+
<li><code>kafka.server:type=DelayedOperationPurgatory,delayedOperation=ElectLeader,name=PurgatorySize</code></li>
+
<li><code>kafka.server:type=DelayedOperationPurgatory,delayedOperation=ElectLeader,name=NumDelayedOperations</code></li>
+ </ul>
+ </li>
</ul>
</div>
\ No newline at end of file