This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch 4.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push: new 9203c6a5cf6 KAFKA-18502 Remove kafka.controller.Election (#18518) 9203c6a5cf6 is described below commit 9203c6a5cf6e0a0b8c0677a36efac8610f3b4090 Author: TengYao Chi <kiting...@gmail.com> AuthorDate: Wed Jan 15 05:42:16 2025 +0800 KAFKA-18502 Remove kafka.controller.Election (#18518) Reviewers: Chia-Ping Tsai <chia7...@gmail.com> --- .../src/main/scala/kafka/controller/Election.scala | 168 --------------------- 1 file changed, 168 deletions(-) diff --git a/core/src/main/scala/kafka/controller/Election.scala b/core/src/main/scala/kafka/controller/Election.scala deleted file mode 100644 index d9d76e38766..00000000000 --- a/core/src/main/scala/kafka/controller/Election.scala +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.controller - -import org.apache.kafka.common.TopicPartition -import org.apache.kafka.metadata.LeaderAndIsr - -import scala.collection.Seq -import scala.jdk.CollectionConverters._ - -case class ElectionResult(topicPartition: TopicPartition, leaderAndIsr: Option[LeaderAndIsr], liveReplicas: Seq[Int]) - -object Election { - - private def leaderForOffline(partition: TopicPartition, - leaderAndIsrOpt: Option[LeaderAndIsr], - uncleanLeaderElectionEnabled: Boolean, - isLeaderRecoverySupported: Boolean, - controllerContext: ControllerContext): ElectionResult = { - - val assignment = controllerContext.partitionReplicaAssignment(partition) - val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, partition)) - leaderAndIsrOpt match { - case Some(leaderAndIsr) => - val isr = leaderAndIsr.isr.asScala.map(_.toInt) - val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection( - assignment, isr, liveReplicas.toSet, uncleanLeaderElectionEnabled, controllerContext) - val newLeaderAndIsrOpt = leaderOpt.map { leader => - val newIsr = if (isr.contains(leader)) isr.filter(replica => controllerContext.isReplicaOnline(replica, partition)) - else List(leader) - val newIsrAsJava = newIsr.map(Integer.valueOf).asJava - if (!isr.contains(leader) && isLeaderRecoverySupported) { - // The new leader is not in the old ISR so mark the partition a RECOVERING - leaderAndIsr.newRecoveringLeaderAndIsr(leader, newIsrAsJava) - } else { - // Elect a new leader but keep the previous leader recovery state - leaderAndIsr.newLeaderAndIsr(leader, newIsrAsJava) - } - } - ElectionResult(partition, newLeaderAndIsrOpt, liveReplicas) - - case None => - ElectionResult(partition, None, liveReplicas) - } - } - - /** - * Elect leaders for new or offline partitions. - * - * @param controllerContext Context with the current state of the cluster - * @param isLeaderRecoverySupported true leader recovery is support and should be set if election is unclean - * @param partitionsWithUncleanLeaderRecoveryState A sequence of tuples representing the partitions - * that need election, their leader/ISR state, and whether - * or not unclean leader election is enabled - * - * @return The election results - */ - def leaderForOffline( - controllerContext: ControllerContext, - isLeaderRecoverySupported: Boolean, - partitionsWithUncleanLeaderRecoveryState: Seq[(TopicPartition, Option[LeaderAndIsr], Boolean)] - ): Seq[ElectionResult] = { - partitionsWithUncleanLeaderRecoveryState.map { - case (partition, leaderAndIsrOpt, uncleanLeaderElectionEnabled) => - leaderForOffline(partition, leaderAndIsrOpt, uncleanLeaderElectionEnabled, isLeaderRecoverySupported, controllerContext) - } - } - - private def leaderForReassign(partition: TopicPartition, - leaderAndIsr: LeaderAndIsr, - controllerContext: ControllerContext): ElectionResult = { - val targetReplicas = controllerContext.partitionFullReplicaAssignment(partition).targetReplicas - val liveReplicas = targetReplicas.filter(replica => controllerContext.isReplicaOnline(replica, partition)) - val isr = leaderAndIsr.isr - val leaderOpt = PartitionLeaderElectionAlgorithms.reassignPartitionLeaderElection(targetReplicas, isr.asScala.map(_.toInt), liveReplicas.toSet) - val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderAndIsr.newLeader(leader)) - ElectionResult(partition, newLeaderAndIsrOpt, targetReplicas) - } - - /** - * Elect leaders for partitions that are undergoing reassignment. - * - * @param controllerContext Context with the current state of the cluster - * @param leaderAndIsrs A sequence of tuples representing the partitions that need election - * and their respective leader/ISR states - * - * @return The election results - */ - def leaderForReassign(controllerContext: ControllerContext, - leaderAndIsrs: Seq[(TopicPartition, LeaderAndIsr)]): Seq[ElectionResult] = { - leaderAndIsrs.map { case (partition, leaderAndIsr) => - leaderForReassign(partition, leaderAndIsr, controllerContext) - } - } - - private def leaderForPreferredReplica(partition: TopicPartition, - leaderAndIsr: LeaderAndIsr, - controllerContext: ControllerContext): ElectionResult = { - val assignment = controllerContext.partitionReplicaAssignment(partition) - val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, partition)) - val isr = leaderAndIsr.isr - val leaderOpt = PartitionLeaderElectionAlgorithms.preferredReplicaPartitionLeaderElection(assignment, isr.asScala.map(_.toInt), liveReplicas.toSet) - val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderAndIsr.newLeader(leader)) - ElectionResult(partition, newLeaderAndIsrOpt, assignment) - } - - /** - * Elect preferred leaders. - * - * @param controllerContext Context with the current state of the cluster - * @param leaderAndIsrs A sequence of tuples representing the partitions that need election - * and their respective leader/ISR states - * - * @return The election results - */ - def leaderForPreferredReplica(controllerContext: ControllerContext, - leaderAndIsrs: Seq[(TopicPartition, LeaderAndIsr)]): Seq[ElectionResult] = { - leaderAndIsrs.map { case (partition, leaderAndIsr) => - leaderForPreferredReplica(partition, leaderAndIsr, controllerContext) - } - } - - private def leaderForControlledShutdown(partition: TopicPartition, - leaderAndIsr: LeaderAndIsr, - shuttingDownBrokerIds: Set[Int], - controllerContext: ControllerContext): ElectionResult = { - val assignment = controllerContext.partitionReplicaAssignment(partition) - val liveOrShuttingDownReplicas = assignment.filter(replica => - controllerContext.isReplicaOnline(replica, partition, includeShuttingDownBrokers = true)) - val isr = leaderAndIsr.isr.asScala.map(_.toInt) - val leaderOpt = PartitionLeaderElectionAlgorithms.controlledShutdownPartitionLeaderElection(assignment, isr, - liveOrShuttingDownReplicas.toSet, shuttingDownBrokerIds) - val newIsr = isr.filter(replica => !shuttingDownBrokerIds.contains(replica)).map(Integer.valueOf).asJava - val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderAndIsr.newLeaderAndIsr(leader, newIsr)) - ElectionResult(partition, newLeaderAndIsrOpt, liveOrShuttingDownReplicas) - } - - /** - * Elect leaders for partitions whose current leaders are shutting down. - * - * @param controllerContext Context with the current state of the cluster - * @param leaderAndIsrs A sequence of tuples representing the partitions that need election - * and their respective leader/ISR states - * - * @return The election results - */ - def leaderForControlledShutdown(controllerContext: ControllerContext, - leaderAndIsrs: Seq[(TopicPartition, LeaderAndIsr)]): Seq[ElectionResult] = { - val shuttingDownBrokerIds = controllerContext.shuttingDownBrokerIds.toSet - leaderAndIsrs.map { case (partition, leaderAndIsr) => - leaderForControlledShutdown(partition, leaderAndIsr, shuttingDownBrokerIds, controllerContext) - } - } -}