This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch 4.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push: new 39069bbad22 KAFKA-18491 Remove zkClient & maybeUpdateMetadataCache from ReplicaManager (#18507) 39069bbad22 is described below commit 39069bbad227803fcd56761d6bdeb5d2471b19ee Author: Peter Lee <peterx...@gmail.com> AuthorDate: Tue Jan 14 03:20:23 2025 +0800 KAFKA-18491 Remove zkClient & maybeUpdateMetadataCache from ReplicaManager (#18507) Reviewers: Chia-Ping Tsai <chia7...@gmail.com> --- .../server/builders/ReplicaManagerBuilder.java | 8 ------ .../src/main/scala/kafka/server/BrokerServer.scala | 1 - .../main/scala/kafka/server/ReplicaManager.scala | 32 +++------------------- .../kafka/server/LocalLeaderEndPointTest.scala | 4 +-- .../server/HighwatermarkPersistenceTest.scala | 5 ++-- .../unit/kafka/server/IsrExpirationTest.scala | 3 +- .../kafka/server/ReplicaManagerQuotasTest.scala | 3 +- .../server/epoch/OffsetsForLeaderEpochTest.scala | 8 +++--- 8 files changed, 17 insertions(+), 47 deletions(-) diff --git a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java index f64dc96e6e5..626b53c12c4 100644 --- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java +++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java @@ -32,7 +32,6 @@ import kafka.server.MetadataCache; import kafka.server.QuotaFactory.QuotaManagers; import kafka.server.ReplicaManager; import kafka.server.share.DelayedShareFetch; -import kafka.zk.KafkaZkClient; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.Time; @@ -64,7 +63,6 @@ public class ReplicaManagerBuilder { private BrokerTopicStats brokerTopicStats = null; private AtomicBoolean isShuttingDown = new AtomicBoolean(false); private Optional<RemoteLogManager> remoteLogManager = Optional.empty(); - private Optional<KafkaZkClient> zkClient = Optional.empty(); private Optional<DelayedOperationPurgatory<DelayedProduce>> delayedProducePurgatory = Optional.empty(); private Optional<DelayedOperationPurgatory<DelayedFetch>> delayedFetchPurgatory = Optional.empty(); private Optional<DelayedOperationPurgatory<DelayedDeleteRecords>> delayedDeleteRecordsPurgatory = Optional.empty(); @@ -137,11 +135,6 @@ public class ReplicaManagerBuilder { return this; } - public ReplicaManagerBuilder setZkClient(KafkaZkClient zkClient) { - this.zkClient = Optional.of(zkClient); - return this; - } - public ReplicaManagerBuilder setDelayedProducePurgatory(DelayedOperationPurgatory<DelayedProduce> delayedProducePurgatory) { this.delayedProducePurgatory = Optional.of(delayedProducePurgatory); return this; @@ -210,7 +203,6 @@ public class ReplicaManagerBuilder { alterPartitionManager, brokerTopicStats, isShuttingDown, - OptionConverters.toScala(zkClient), OptionConverters.toScala(delayedProducePurgatory), OptionConverters.toScala(delayedFetchPurgatory), OptionConverters.toScala(delayedDeleteRecordsPurgatory), diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 3fd2f7789c9..600566f821b 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -347,7 +347,6 @@ class BrokerServer( alterPartitionManager = alterPartitionManager, brokerTopicStats = brokerTopicStats, isShuttingDown = isShuttingDown, - zkClient = None, threadNamePrefix = None, // The ReplicaManager only runs on the broker, and already includes the ID in thread names. delayedRemoteFetchPurgatoryParam = None, brokerEpochSupplier = () => lifecycleManager.brokerEpoch, diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 1c28d407d3d..6d792c7647a 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -24,10 +24,8 @@ import kafka.log.{LogManager, UnifiedLog} import kafka.server.HostedPartition.Online import kafka.server.QuotaFactory.QuotaManagers import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, IsrShrinksPerSecMetricName, LeaderCountMetricName, OfflineReplicaCountMetricName, PartitionCountMetricName, PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, UnderReplicatedPartitionsMetricName, createLogReadResult, isListOffsetsTimestampUnsupported} -import kafka.server.metadata.ZkMetadataCache import kafka.server.share.DelayedShareFetch import kafka.utils._ -import kafka.zk.KafkaZkClient import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult @@ -277,7 +275,6 @@ class ReplicaManager(val config: KafkaConfig, val alterPartitionManager: AlterPartitionManager, val brokerTopicStats: BrokerTopicStats = new BrokerTopicStats(), val isShuttingDown: AtomicBoolean = new AtomicBoolean(false), - val zkClient: Option[KafkaZkClient] = None, delayedProducePurgatoryParam: Option[DelayedOperationPurgatory[DelayedProduce]] = None, delayedFetchPurgatoryParam: Option[DelayedOperationPurgatory[DelayedFetch]] = None, delayedDeleteRecordsPurgatoryParam: Option[DelayedOperationPurgatory[DelayedDeleteRecords]] = None, @@ -2037,23 +2034,6 @@ class ReplicaManager(val config: KafkaConfig, def getLogConfig(topicPartition: TopicPartition): Option[LogConfig] = localLog(topicPartition).map(_.config) - def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest) : Seq[TopicPartition] = { - replicaStateChangeLock synchronized { - if (updateMetadataRequest.controllerEpoch < controllerEpoch) { - val stateControllerEpochErrorMessage = s"Received update metadata request with correlation id $correlationId " + - s"from an old controller ${updateMetadataRequest.controllerId} with epoch ${updateMetadataRequest.controllerEpoch}. " + - s"Latest known controller epoch is $controllerEpoch" - stateChangeLogger.warn(stateControllerEpochErrorMessage) - throw new ControllerMovedException(stateChangeLogger.messageWithPrefix(stateControllerEpochErrorMessage)) - } else { - val zkMetadataCache = metadataCache.asInstanceOf[ZkMetadataCache] - val deletedPartitions = zkMetadataCache.updateMetadata(correlationId, updateMetadataRequest) - controllerEpoch = updateMetadataRequest.controllerEpoch - deletedPartitions - } - } - } - def becomeLeaderOrFollower(correlationId: Int, leaderAndIsrRequest: LeaderAndIsrRequest, onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse = { @@ -2652,15 +2632,11 @@ class ReplicaManager(val config: KafkaConfig, } if (notifyController) { - if (zkClient.isEmpty) { - if (uuid.isDefined) { - directoryEventHandler.handleFailure(uuid.get) - } else { - fatal(s"Unable to propagate directory failure disabled because directory $dir has no UUID") - Exit.halt(1) - } + if (uuid.isDefined) { + directoryEventHandler.handleFailure(uuid.get) } else { - zkClient.get.propagateLogDirEvent(localBrokerId) + fatal(s"Unable to propagate directory failure disabled because directory $dir has no UUID") + Exit.halt(1) } } warn(s"Stopped serving replicas in dir $dir") diff --git a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala index 95545a38a62..25e32ca4d79 100644 --- a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala +++ b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala @@ -29,7 +29,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.{MemoryRecords, SimpleRecord} import org.apache.kafka.common.requests.LeaderAndIsrRequest import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse -import org.apache.kafka.server.common.OffsetAndEpoch +import org.apache.kafka.server.common.{KRaftVersion, OffsetAndEpoch} import org.apache.kafka.server.network.BrokerEndPoint import org.apache.kafka.server.util.{MockScheduler, MockTime} import org.apache.kafka.storage.internals.checkpoint.LazyOffsetCheckpoints @@ -69,7 +69,7 @@ class LocalLeaderEndPointTest extends Logging { scheduler = new MockScheduler(time), logManager = mockLogMgr, quotaManagers = quotaManager, - metadataCache = MetadataCache.zkMetadataCache(config.brokerId, config.interBrokerProtocolVersion), + metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => KRaftVersion.KRAFT_VERSION_0), logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), alterPartitionManager = alterPartitionManager) val partition = replicaManager.createPartition(topicPartition) diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index 579487bc477..f625afa1fa7 100755 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -27,6 +27,7 @@ import kafka.cluster.Partition import kafka.server.metadata.MockConfigRepository import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.record.SimpleRecord +import org.apache.kafka.server.common.KRaftVersion import org.apache.kafka.server.util.{KafkaScheduler, MockTime} import org.apache.kafka.storage.internals.log.{CleanerConfig, LogDirFailureChannel} @@ -69,7 +70,7 @@ class HighwatermarkPersistenceTest { scheduler = scheduler, logManager = logManagers.head, quotaManagers = quotaManager, - metadataCache = MetadataCache.zkMetadataCache(configs.head.brokerId, configs.head.interBrokerProtocolVersion), + metadataCache = MetadataCache.kRaftMetadataCache(configs.head.brokerId, () => KRaftVersion.KRAFT_VERSION_0), logDirFailureChannel = logDirFailureChannels.head, alterPartitionManager = alterIsrManager) replicaManager.startup() @@ -127,7 +128,7 @@ class HighwatermarkPersistenceTest { scheduler = scheduler, logManager = logManagers.head, quotaManagers = quotaManager, - metadataCache = MetadataCache.zkMetadataCache(configs.head.brokerId, configs.head.interBrokerProtocolVersion), + metadataCache = MetadataCache.kRaftMetadataCache(configs.head.brokerId, () => KRaftVersion.KRAFT_VERSION_0), logDirFailureChannel = logDirFailureChannels.head, alterPartitionManager = alterIsrManager) replicaManager.startup() diff --git a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala index 429b6869013..2f11690bacd 100644 --- a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala @@ -27,6 +27,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.utils.Time import org.apache.kafka.metadata.LeaderRecoveryState +import org.apache.kafka.server.common.KRaftVersion import org.apache.kafka.server.config.ReplicationConfigs import org.apache.kafka.server.util.MockTime import org.apache.kafka.storage.internals.log.{LogDirFailureChannel, LogOffsetMetadata} @@ -72,7 +73,7 @@ class IsrExpirationTest { scheduler = null, logManager = logManager, quotaManagers = quotaManager, - metadataCache = MetadataCache.zkMetadataCache(configs.head.brokerId, configs.head.interBrokerProtocolVersion), + metadataCache = MetadataCache.kRaftMetadataCache(configs.head.brokerId, () => KRaftVersion.KRAFT_VERSION_0), logDirFailureChannel = new LogDirFailureChannel(configs.head.logDirs.size), alterPartitionManager = alterIsrManager) } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala index 160fa849d9b..f0a4be811bb 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala @@ -30,6 +30,7 @@ import org.apache.kafka.common.requests.FetchRequest import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.metadata.LeaderRecoveryState +import org.apache.kafka.server.common.KRaftVersion import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams} import org.apache.kafka.server.util.{KafkaScheduler, MockTime} import org.apache.kafka.storage.internals.log.{FetchDataInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogOffsetSnapshot} @@ -307,7 +308,7 @@ class ReplicaManagerQuotasTest { scheduler = scheduler, logManager = logManager, quotaManagers = quotaManager, - metadataCache = MetadataCache.zkMetadataCache(leaderBrokerId, configs.head.interBrokerProtocolVersion), + metadataCache = MetadataCache.kRaftMetadataCache(leaderBrokerId, () => KRaftVersion.KRAFT_VERSION_0), logDirFailureChannel = new LogDirFailureChannel(configs.head.logDirs.size), alterPartitionManager = alterIsrManager) diff --git a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala index faab5ab4a61..79f4be41b8f 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala @@ -28,7 +28,7 @@ import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET} -import org.apache.kafka.server.common.OffsetAndEpoch +import org.apache.kafka.server.common.{KRaftVersion, OffsetAndEpoch} import org.apache.kafka.server.util.MockTime import org.apache.kafka.storage.internals.log.LogDirFailureChannel import org.junit.jupiter.api.Assertions._ @@ -72,7 +72,7 @@ class OffsetsForLeaderEpochTest { scheduler = null, logManager = logManager, quotaManagers = quotaManager, - metadataCache = MetadataCache.zkMetadataCache(config.brokerId, config.interBrokerProtocolVersion), + metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => KRaftVersion.KRAFT_VERSION_0), logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), alterPartitionManager = alterIsrManager) val partition = replicaManager.createPartition(tp) @@ -101,7 +101,7 @@ class OffsetsForLeaderEpochTest { scheduler = null, logManager = logManager, quotaManagers = quotaManager, - metadataCache = MetadataCache.zkMetadataCache(config.brokerId, config.interBrokerProtocolVersion), + metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => KRaftVersion.KRAFT_VERSION_0), logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), alterPartitionManager = alterIsrManager) replicaManager.createPartition(tp) @@ -132,7 +132,7 @@ class OffsetsForLeaderEpochTest { scheduler = null, logManager = logManager, quotaManagers = quotaManager, - metadataCache = MetadataCache.zkMetadataCache(config.brokerId, config.interBrokerProtocolVersion), + metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => KRaftVersion.KRAFT_VERSION_0), logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), alterPartitionManager = alterIsrManager)