This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 39069bbad22 KAFKA-18491 Remove zkClient & maybeUpdateMetadataCache 
from ReplicaManager (#18507)
39069bbad22 is described below

commit 39069bbad227803fcd56761d6bdeb5d2471b19ee
Author: Peter Lee <peterx...@gmail.com>
AuthorDate: Tue Jan 14 03:20:23 2025 +0800

    KAFKA-18491 Remove zkClient & maybeUpdateMetadataCache from ReplicaManager 
(#18507)
    
    Reviewers: Chia-Ping Tsai <chia7...@gmail.com>
---
 .../server/builders/ReplicaManagerBuilder.java     |  8 ------
 .../src/main/scala/kafka/server/BrokerServer.scala |  1 -
 .../main/scala/kafka/server/ReplicaManager.scala   | 32 +++-------------------
 .../kafka/server/LocalLeaderEndPointTest.scala     |  4 +--
 .../server/HighwatermarkPersistenceTest.scala      |  5 ++--
 .../unit/kafka/server/IsrExpirationTest.scala      |  3 +-
 .../kafka/server/ReplicaManagerQuotasTest.scala    |  3 +-
 .../server/epoch/OffsetsForLeaderEpochTest.scala   |  8 +++---
 8 files changed, 17 insertions(+), 47 deletions(-)

diff --git 
a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java 
b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
index f64dc96e6e5..626b53c12c4 100644
--- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
+++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
@@ -32,7 +32,6 @@ import kafka.server.MetadataCache;
 import kafka.server.QuotaFactory.QuotaManagers;
 import kafka.server.ReplicaManager;
 import kafka.server.share.DelayedShareFetch;
-import kafka.zk.KafkaZkClient;
 
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.Time;
@@ -64,7 +63,6 @@ public class ReplicaManagerBuilder {
     private BrokerTopicStats brokerTopicStats = null;
     private AtomicBoolean isShuttingDown = new AtomicBoolean(false);
     private Optional<RemoteLogManager> remoteLogManager = Optional.empty();
-    private Optional<KafkaZkClient> zkClient = Optional.empty();
     private Optional<DelayedOperationPurgatory<DelayedProduce>> 
delayedProducePurgatory = Optional.empty();
     private Optional<DelayedOperationPurgatory<DelayedFetch>> 
delayedFetchPurgatory = Optional.empty();
     private Optional<DelayedOperationPurgatory<DelayedDeleteRecords>> 
delayedDeleteRecordsPurgatory = Optional.empty();
@@ -137,11 +135,6 @@ public class ReplicaManagerBuilder {
         return this;
     }
 
-    public ReplicaManagerBuilder setZkClient(KafkaZkClient zkClient) {
-        this.zkClient = Optional.of(zkClient);
-        return this;
-    }
-
     public ReplicaManagerBuilder 
setDelayedProducePurgatory(DelayedOperationPurgatory<DelayedProduce> 
delayedProducePurgatory) {
         this.delayedProducePurgatory = Optional.of(delayedProducePurgatory);
         return this;
@@ -210,7 +203,6 @@ public class ReplicaManagerBuilder {
                              alterPartitionManager,
                              brokerTopicStats,
                              isShuttingDown,
-                             OptionConverters.toScala(zkClient),
                              OptionConverters.toScala(delayedProducePurgatory),
                              OptionConverters.toScala(delayedFetchPurgatory),
                              
OptionConverters.toScala(delayedDeleteRecordsPurgatory),
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 3fd2f7789c9..600566f821b 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -347,7 +347,6 @@ class BrokerServer(
         alterPartitionManager = alterPartitionManager,
         brokerTopicStats = brokerTopicStats,
         isShuttingDown = isShuttingDown,
-        zkClient = None,
         threadNamePrefix = None, // The ReplicaManager only runs on the 
broker, and already includes the ID in thread names.
         delayedRemoteFetchPurgatoryParam = None,
         brokerEpochSupplier = () => lifecycleManager.brokerEpoch,
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 1c28d407d3d..6d792c7647a 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -24,10 +24,8 @@ import kafka.log.{LogManager, UnifiedLog}
 import kafka.server.HostedPartition.Online
 import kafka.server.QuotaFactory.QuotaManagers
 import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, 
FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, 
IsrShrinksPerSecMetricName, LeaderCountMetricName, 
OfflineReplicaCountMetricName, PartitionCountMetricName, 
PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, 
ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, 
UnderReplicatedPartitionsMetricName, createLogReadResult, 
isListOffsetsTimestampUnsupported}
-import kafka.server.metadata.ZkMetadataCache
 import kafka.server.share.DelayedShareFetch
 import kafka.utils._
-import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic
 import 
org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult
@@ -277,7 +275,6 @@ class ReplicaManager(val config: KafkaConfig,
                      val alterPartitionManager: AlterPartitionManager,
                      val brokerTopicStats: BrokerTopicStats = new 
BrokerTopicStats(),
                      val isShuttingDown: AtomicBoolean = new 
AtomicBoolean(false),
-                     val zkClient: Option[KafkaZkClient] = None,
                      delayedProducePurgatoryParam: 
Option[DelayedOperationPurgatory[DelayedProduce]] = None,
                      delayedFetchPurgatoryParam: 
Option[DelayedOperationPurgatory[DelayedFetch]] = None,
                      delayedDeleteRecordsPurgatoryParam: 
Option[DelayedOperationPurgatory[DelayedDeleteRecords]] = None,
@@ -2037,23 +2034,6 @@ class ReplicaManager(val config: KafkaConfig,
 
   def getLogConfig(topicPartition: TopicPartition): Option[LogConfig] = 
localLog(topicPartition).map(_.config)
 
-  def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: 
UpdateMetadataRequest) : Seq[TopicPartition] =  {
-    replicaStateChangeLock synchronized {
-      if (updateMetadataRequest.controllerEpoch < controllerEpoch) {
-        val stateControllerEpochErrorMessage = s"Received update metadata 
request with correlation id $correlationId " +
-          s"from an old controller ${updateMetadataRequest.controllerId} with 
epoch ${updateMetadataRequest.controllerEpoch}. " +
-          s"Latest known controller epoch is $controllerEpoch"
-        stateChangeLogger.warn(stateControllerEpochErrorMessage)
-        throw new 
ControllerMovedException(stateChangeLogger.messageWithPrefix(stateControllerEpochErrorMessage))
-      } else {
-        val zkMetadataCache = metadataCache.asInstanceOf[ZkMetadataCache]
-        val deletedPartitions = zkMetadataCache.updateMetadata(correlationId, 
updateMetadataRequest)
-        controllerEpoch = updateMetadataRequest.controllerEpoch
-        deletedPartitions
-      }
-    }
-  }
-
   def becomeLeaderOrFollower(correlationId: Int,
                              leaderAndIsrRequest: LeaderAndIsrRequest,
                              onLeadershipChange: (Iterable[Partition], 
Iterable[Partition]) => Unit): LeaderAndIsrResponse = {
@@ -2652,15 +2632,11 @@ class ReplicaManager(val config: KafkaConfig,
     }
 
     if (notifyController) {
-      if (zkClient.isEmpty) {
-        if (uuid.isDefined) {
-          directoryEventHandler.handleFailure(uuid.get)
-        } else {
-          fatal(s"Unable to propagate directory failure disabled because 
directory $dir has no UUID")
-          Exit.halt(1)
-        }
+      if (uuid.isDefined) {
+        directoryEventHandler.handleFailure(uuid.get)
       } else {
-        zkClient.get.propagateLogDirEvent(localBrokerId)
+        fatal(s"Unable to propagate directory failure disabled because 
directory $dir has no UUID")
+        Exit.halt(1)
       }
     }
     warn(s"Stopped serving replicas in dir $dir")
diff --git a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala 
b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
index 95545a38a62..25e32ca4d79 100644
--- a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
+++ b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
@@ -29,7 +29,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.{MemoryRecords, SimpleRecord}
 import org.apache.kafka.common.requests.LeaderAndIsrRequest
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
-import org.apache.kafka.server.common.OffsetAndEpoch
+import org.apache.kafka.server.common.{KRaftVersion, OffsetAndEpoch}
 import org.apache.kafka.server.network.BrokerEndPoint
 import org.apache.kafka.server.util.{MockScheduler, MockTime}
 import org.apache.kafka.storage.internals.checkpoint.LazyOffsetCheckpoints
@@ -69,7 +69,7 @@ class LocalLeaderEndPointTest extends Logging {
       scheduler = new MockScheduler(time),
       logManager = mockLogMgr,
       quotaManagers = quotaManager,
-      metadataCache = MetadataCache.zkMetadataCache(config.brokerId, 
config.interBrokerProtocolVersion),
+      metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => 
KRaftVersion.KRAFT_VERSION_0),
       logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
       alterPartitionManager = alterPartitionManager)
     val partition = replicaManager.createPartition(topicPartition)
diff --git 
a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index 579487bc477..f625afa1fa7 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -27,6 +27,7 @@ import kafka.cluster.Partition
 import kafka.server.metadata.MockConfigRepository
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record.SimpleRecord
+import org.apache.kafka.server.common.KRaftVersion
 import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
 import org.apache.kafka.storage.internals.log.{CleanerConfig, 
LogDirFailureChannel}
 
@@ -69,7 +70,7 @@ class HighwatermarkPersistenceTest {
       scheduler = scheduler,
       logManager = logManagers.head,
       quotaManagers = quotaManager,
-      metadataCache = MetadataCache.zkMetadataCache(configs.head.brokerId, 
configs.head.interBrokerProtocolVersion),
+      metadataCache = MetadataCache.kRaftMetadataCache(configs.head.brokerId, 
() => KRaftVersion.KRAFT_VERSION_0),
       logDirFailureChannel = logDirFailureChannels.head,
       alterPartitionManager = alterIsrManager)
     replicaManager.startup()
@@ -127,7 +128,7 @@ class HighwatermarkPersistenceTest {
       scheduler = scheduler,
       logManager = logManagers.head,
       quotaManagers = quotaManager,
-      metadataCache = MetadataCache.zkMetadataCache(configs.head.brokerId, 
configs.head.interBrokerProtocolVersion),
+      metadataCache = MetadataCache.kRaftMetadataCache(configs.head.brokerId, 
() => KRaftVersion.KRAFT_VERSION_0),
       logDirFailureChannel = logDirFailureChannels.head,
       alterPartitionManager = alterIsrManager)
     replicaManager.startup()
diff --git a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala 
b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
index 429b6869013..2f11690bacd 100644
--- a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
@@ -27,6 +27,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.metadata.LeaderRecoveryState
+import org.apache.kafka.server.common.KRaftVersion
 import org.apache.kafka.server.config.ReplicationConfigs
 import org.apache.kafka.server.util.MockTime
 import org.apache.kafka.storage.internals.log.{LogDirFailureChannel, 
LogOffsetMetadata}
@@ -72,7 +73,7 @@ class IsrExpirationTest {
       scheduler = null,
       logManager = logManager,
       quotaManagers = quotaManager,
-      metadataCache = MetadataCache.zkMetadataCache(configs.head.brokerId, 
configs.head.interBrokerProtocolVersion),
+      metadataCache = MetadataCache.kRaftMetadataCache(configs.head.brokerId, 
() => KRaftVersion.KRAFT_VERSION_0),
       logDirFailureChannel = new 
LogDirFailureChannel(configs.head.logDirs.size),
       alterPartitionManager = alterIsrManager)
   }
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index 160fa849d9b..f0a4be811bb 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.common.requests.FetchRequest
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
 import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
 import org.apache.kafka.metadata.LeaderRecoveryState
+import org.apache.kafka.server.common.KRaftVersion
 import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams}
 import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
 import org.apache.kafka.storage.internals.log.{FetchDataInfo, LogConfig, 
LogDirFailureChannel, LogOffsetMetadata, LogOffsetSnapshot}
@@ -307,7 +308,7 @@ class ReplicaManagerQuotasTest {
       scheduler = scheduler,
       logManager = logManager,
       quotaManagers = quotaManager,
-      metadataCache = MetadataCache.zkMetadataCache(leaderBrokerId, 
configs.head.interBrokerProtocolVersion),
+      metadataCache = MetadataCache.kRaftMetadataCache(leaderBrokerId, () => 
KRaftVersion.KRAFT_VERSION_0),
       logDirFailureChannel = new 
LogDirFailureChannel(configs.head.logDirs.size),
       alterPartitionManager = alterIsrManager)
 
diff --git 
a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala 
b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
index faab5ab4a61..79f4be41b8f 100644
--- 
a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.RecordBatch
 import 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH,
 UNDEFINED_EPOCH_OFFSET}
-import org.apache.kafka.server.common.OffsetAndEpoch
+import org.apache.kafka.server.common.{KRaftVersion, OffsetAndEpoch}
 import org.apache.kafka.server.util.MockTime
 import org.apache.kafka.storage.internals.log.LogDirFailureChannel
 import org.junit.jupiter.api.Assertions._
@@ -72,7 +72,7 @@ class OffsetsForLeaderEpochTest {
       scheduler = null,
       logManager = logManager,
       quotaManagers = quotaManager,
-      metadataCache = MetadataCache.zkMetadataCache(config.brokerId, 
config.interBrokerProtocolVersion),
+      metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => 
KRaftVersion.KRAFT_VERSION_0),
       logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
       alterPartitionManager = alterIsrManager)
     val partition = replicaManager.createPartition(tp)
@@ -101,7 +101,7 @@ class OffsetsForLeaderEpochTest {
       scheduler = null,
       logManager = logManager,
       quotaManagers = quotaManager,
-      metadataCache = MetadataCache.zkMetadataCache(config.brokerId, 
config.interBrokerProtocolVersion),
+      metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => 
KRaftVersion.KRAFT_VERSION_0),
       logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
       alterPartitionManager = alterIsrManager)
     replicaManager.createPartition(tp)
@@ -132,7 +132,7 @@ class OffsetsForLeaderEpochTest {
       scheduler = null,
       logManager = logManager,
       quotaManagers = quotaManager,
-      metadataCache = MetadataCache.zkMetadataCache(config.brokerId, 
config.interBrokerProtocolVersion),
+      metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => 
KRaftVersion.KRAFT_VERSION_0),
       logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
       alterPartitionManager = alterIsrManager)
 

Reply via email to