This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch 4.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push: new 10b24105d8a KAFKA-18542 Cleanup AlterPartitionManager (#18552) 10b24105d8a is described below commit 10b24105d8a9ab0ce6594e2602ea81f661ab2cde Author: Ken Huang <s7133...@gmail.com> AuthorDate: Thu Jan 16 19:50:02 2025 +0800 KAFKA-18542 Cleanup AlterPartitionManager (#18552) Reviewers: Chia-Ping Tsai <chia7...@gmail.com> --- core/src/main/scala/kafka/cluster/Partition.scala | 19 ++--------- .../scala/kafka/server/AlterPartitionManager.scala | 35 +++++++------------ .../unit/kafka/cluster/PartitionLockTest.scala | 3 +- .../kafka/server/AlterPartitionManagerTest.scala | 39 +++++++++++----------- .../server/ReplicaManagerConcurrencyTest.scala | 1 - .../test/scala/unit/kafka/utils/TestUtils.scala | 4 +-- 6 files changed, 35 insertions(+), 66 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index eda8200a806..c0ed3eace7a 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -19,7 +19,7 @@ package kafka.cluster import java.util.concurrent.locks.ReentrantReadWriteLock import java.util.Optional import java.util.concurrent.{CompletableFuture, CopyOnWriteArrayList} -import kafka.controller.{KafkaController, StateChangeLogger} +import kafka.controller.StateChangeLogger import kafka.log._ import kafka.log.remote.RemoteLogManager import kafka.server._ @@ -358,12 +358,6 @@ class Partition(val topicPartition: TopicPartition, } } - /* Epoch of the controller that last changed the leader. This needs to be initialized correctly upon broker startup. - * One way of doing that is through the controller's start replica state change command. When a new broker starts up - * the controller sends it a start replica command containing the leader for each partition that the broker hosts. - * In addition to the leader, the controller can also send the epoch of the controller that elected the leader for - * each partition. */ - private var controllerEpoch: Int = KafkaController.InitialControllerEpoch this.logIdent = s"[Partition $topicPartition broker=$localBrokerId] " private val tags = Map("topic" -> topic, "partition" -> partitionId.toString).asJava @@ -749,10 +743,6 @@ class Partition(val topicPartition: TopicPartition, return false } - // Record the epoch of the controller that made the leadership decision. This is useful while updating the isr - // to maintain the decision maker controller's epoch in the zookeeper path. - controllerEpoch = partitionState.controllerEpoch - val currentTimeMs = time.milliseconds val isNewLeader = !isLeader val isNewLeaderEpoch = partitionState.leaderEpoch > leaderEpoch @@ -861,10 +851,6 @@ class Partition(val topicPartition: TopicPartition, return false } - // Record the epoch of the controller that made the leadership decision. This is useful while updating the isr - // to maintain the decision maker controller's epoch in the zookeeper path - controllerEpoch = partitionState.controllerEpoch - val isNewLeaderEpoch = partitionState.leaderEpoch > leaderEpoch // The leader should be updated before updateAssignmentAndIsr where we clear the ISR. Or it is possible to meet // the under min isr condition during the makeFollower process and emits the wrong metric. @@ -1874,8 +1860,7 @@ class Partition(val topicPartition: TopicPartition, debug(s"Submitting ISR state change $proposedIsrState") val future = alterIsrManager.submit( new TopicIdPartition(topicId.getOrElse(Uuid.ZERO_UUID), topicPartition), - proposedIsrState.sentLeaderAndIsr, - controllerEpoch + proposedIsrState.sentLeaderAndIsr ) future.whenComplete { (leaderAndIsr, e) => var hwIncremented = false diff --git a/core/src/main/scala/kafka/server/AlterPartitionManager.scala b/core/src/main/scala/kafka/server/AlterPartitionManager.scala index fd0f8b61ea2..fa1c3602bee 100644 --- a/core/src/main/scala/kafka/server/AlterPartitionManager.scala +++ b/core/src/main/scala/kafka/server/AlterPartitionManager.scala @@ -40,9 +40,8 @@ import scala.collection.mutable.ListBuffer import scala.jdk.OptionConverters.RichOptional /** - * Handles updating the ISR by sending AlterPartition requests to the controller (as of 2.7) or by updating ZK directly - * (prior to 2.7). Updating the ISR is an asynchronous operation, so partitions will learn about the result of their - * request through a callback. + * Handles updating the ISR by sending AlterPartition requests to the controller. Updating the ISR is an asynchronous + * operation, so partitions will learn about the result of their request through a callback. * * Note that ISR state changes can still be initiated by the controller and sent to the partitions via LeaderAndIsr * requests. @@ -54,22 +53,20 @@ trait AlterPartitionManager { def submit( topicIdPartition: TopicIdPartition, - leaderAndIsr: LeaderAndIsr, - controllerEpoch: Int + leaderAndIsr: LeaderAndIsr ): CompletableFuture[LeaderAndIsr] } case class AlterPartitionItem( topicIdPartition: TopicIdPartition, leaderAndIsr: LeaderAndIsr, - future: CompletableFuture[LeaderAndIsr], - controllerEpoch: Int // controllerEpoch needed for `ZkAlterPartitionManager` + future: CompletableFuture[LeaderAndIsr] ) object AlterPartitionManager { /** - * Factory to AlterPartition based implementation, used when IBP >= 2.7-IV2 + * Factory to AlterPartition based implementation */ def apply( config: KafkaConfig, @@ -112,18 +109,11 @@ class DefaultAlterPartitionManager( // Used to allow only one pending ISR update per partition (visible for testing). // Note that we key items by TopicPartition despite using TopicIdPartition while - // submitting changes. We do this to ensure that topics with the same name but - // with a different topic id or no topic id collide here. There are two cases to - // consider: - // 1) When the cluster is upgraded from IBP < 2.8 to IBP >= 2.8, the ZK controller - // assigns topic ids to the partitions. So partitions will start sending updates - // with a topic id while they might still have updates without topic ids in this - // Map. This would break the contract of only allowing one pending ISR update per - // partition. - // 2) When a topic is deleted and re-created, we cannot have two entries in this Map - // especially if we cannot use an AlterPartition request version which supports - // topic ids in the end because the two updates with the same name would be merged - // together. + // submitting changes. This is done to ensure that topics with the same name but + // with a different topic id or no topic id collide here. When a topic is deleted + // and re-created, we cannot have two entries in this Map especially if we cannot + // use an AlterPartition request version which supports topic ids in the end because + // the two updates with the same name would be merged together. private[server] val unsentIsrUpdates: util.Map[TopicPartition, AlterPartitionItem] = new ConcurrentHashMap[TopicPartition, AlterPartitionItem]() // Used to allow only one in-flight request at a time @@ -139,11 +129,10 @@ class DefaultAlterPartitionManager( override def submit( topicIdPartition: TopicIdPartition, - leaderAndIsr: LeaderAndIsr, - controllerEpoch: Int + leaderAndIsr: LeaderAndIsr ): CompletableFuture[LeaderAndIsr] = { val future = new CompletableFuture[LeaderAndIsr]() - val alterPartitionItem = AlterPartitionItem(topicIdPartition, leaderAndIsr, future, controllerEpoch) + val alterPartitionItem = AlterPartitionItem(topicIdPartition, leaderAndIsr, future) val enqueued = unsentIsrUpdates.putIfAbsent(alterPartitionItem.topicIdPartition.topicPartition, alterPartitionItem) == null if (enqueued) { maybePropagateIsrChanges() diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala index 4695506863c..8b874424541 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala @@ -341,8 +341,7 @@ class PartitionLockTest extends Logging { )).thenReturn(Optional.empty[JLong]) when(alterIsrManager.submit( ArgumentMatchers.eq(topicIdPartition), - ArgumentMatchers.any[LeaderAndIsr], - ArgumentMatchers.anyInt() + ArgumentMatchers.any[LeaderAndIsr] )).thenReturn(new CompletableFuture[LeaderAndIsr]()) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) diff --git a/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala b/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala index 772cf50b0a5..9455eaf7bfd 100644 --- a/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala @@ -75,7 +75,7 @@ class AlterPartitionManagerTest { val scheduler = new MockScheduler(time) val alterPartitionManager = new DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () => 2, () => metadataVersion) alterPartitionManager.start() - alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10), 0) + alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10)) verify(brokerToController).start() verify(brokerToController).sendRequest(any(), any()) } @@ -90,7 +90,7 @@ class AlterPartitionManagerTest { for (ii <- 1 to 3) { isrWithBrokerEpoch += new BrokerState().setBrokerId(ii).setBrokerEpoch(100 + ii) } - alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, LeaderRecoveryState.RECOVERED, isrWithBrokerEpoch.toList.asJava, 10), 0) + alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, LeaderRecoveryState.RECOVERED, isrWithBrokerEpoch.toList.asJava, 10)) val expectedAlterPartitionData = new AlterPartitionRequestData() .setBrokerId(brokerId) @@ -128,7 +128,7 @@ class AlterPartitionManagerTest { val scheduler = new MockScheduler(time) val alterPartitionManager = new DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () => 2, () => metadataVersion) alterPartitionManager.start() - alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1).map(Int.box).asJava, leaderRecoveryState, 10), 0) + alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1).map(Int.box).asJava, leaderRecoveryState, 10)) verify(brokerToController).start() verify(brokerToController).sendRequest(requestCapture.capture(), any()) @@ -148,10 +148,10 @@ class AlterPartitionManagerTest { alterPartitionManager.start() // Only send one ISR update for a given topic+partition - val firstSubmitFuture = alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10), 0) + val firstSubmitFuture = alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10)) assertFalse(firstSubmitFuture.isDone) - val failedSubmitFuture = alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1, 2).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10), 0) + val failedSubmitFuture = alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1, 2).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10)) assertTrue(failedSubmitFuture.isCompletedExceptionally) assertFutureThrows(failedSubmitFuture, classOf[OperationNotAttemptedException]) @@ -165,7 +165,7 @@ class AlterPartitionManagerTest { callbackCapture.getValue.onComplete(resp) // Now we can submit this partition again - val newSubmitFuture = alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10), 0) + val newSubmitFuture = alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10)) assertFalse(newSubmitFuture.isDone) verify(brokerToController).start() @@ -193,12 +193,12 @@ class AlterPartitionManagerTest { // First request will send batch of one alterPartitionManager.submit(new TopicIdPartition(topicId, 0, topic), - new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10), 0) + new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10)) // Other submissions will queue up until a response for (i <- 1 to 9) { alterPartitionManager.submit(new TopicIdPartition(topicId, i, topic), - new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10), 0) + new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10)) } // Simulate response, omitting partition 0 will allow it to stay in unsent queue @@ -235,12 +235,12 @@ class AlterPartitionManagerTest { val scheduler = new MockScheduler(time) val alterPartitionManager = new DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () => 2, () => IBP_3_2_IV0) alterPartitionManager.start() - val future = alterPartitionManager.submit(tp0, leaderAndIsr, 0) + val future = alterPartitionManager.submit(tp0, leaderAndIsr) val finalFuture = new CompletableFuture[LeaderAndIsr]() future.whenComplete { (_, e) => if (e != null) { // Retry when error. - alterPartitionManager.submit(tp0, leaderAndIsr, 0).whenComplete { (result, e) => + alterPartitionManager.submit(tp0, leaderAndIsr).whenComplete { (result, e) => if (e != null) { finalFuture.completeExceptionally(e) } else { @@ -309,7 +309,7 @@ class AlterPartitionManagerTest { val scheduler = new MockScheduler(time) val alterPartitionManager = new DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () => 2, () => IBP_3_2_IV0) alterPartitionManager.start() - alterPartitionManager.submit(tp0, leaderAndIsr, 0) + alterPartitionManager.submit(tp0, leaderAndIsr) verify(brokerToController).start() verify(brokerToController).sendRequest(any(), callbackCapture.capture()) @@ -357,7 +357,7 @@ class AlterPartitionManagerTest { private def checkPartitionError(error: Errors): Unit = { val alterPartitionManager = testPartitionError(tp0, error) // Any partition-level error should clear the item from the pending queue allowing for future updates - val future = alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10), 0) + val future = alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10)) assertFalse(future.isDone) } @@ -369,7 +369,7 @@ class AlterPartitionManagerTest { val alterPartitionManager = new DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () => 2, () => IBP_3_2_IV0) alterPartitionManager.start() - val future = alterPartitionManager.submit(tp, new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10), 0) + val future = alterPartitionManager.submit(tp, new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10)) verify(brokerToController).start() verify(brokerToController).sendRequest(any(), callbackCapture.capture()) @@ -393,11 +393,11 @@ class AlterPartitionManagerTest { alterPartitionManager.start() // First submit will send the request - alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10), 0) + alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10)) // These will become pending unsent items - alterPartitionManager.submit(tp1, new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10), 0) - alterPartitionManager.submit(tp2, new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10), 0) + alterPartitionManager.submit(tp1, new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10)) + alterPartitionManager.submit(tp2, new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10)) verify(brokerToController).start() verify(brokerToController).sendRequest(any(), callbackCapture.capture()) @@ -415,7 +415,6 @@ class AlterPartitionManagerTest { def testPartitionMissingInResponse(metadataVersion: MetadataVersion): Unit = { val expectedVersion = ApiKeys.ALTER_PARTITION.latestVersion val leaderAndIsr = new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10) - val controlledEpoch = 0 val brokerEpoch = 2 val scheduler = new MockScheduler(time) val brokerToController = Mockito.mock(classOf[NodeToControllerChannelManager]) @@ -430,15 +429,15 @@ class AlterPartitionManagerTest { alterPartitionManager.start() // The first `submit` will send the `AlterIsr` request - val future1 = alterPartitionManager.submit(tp0, leaderAndIsr, controlledEpoch) + val future1 = alterPartitionManager.submit(tp0, leaderAndIsr) val callback1 = verifySendRequest(brokerToController, alterPartitionRequestMatcher( expectedTopicPartitions = Set(tp0), expectedVersion = expectedVersion )) // Additional calls while the `AlterIsr` request is inflight will be queued - val future2 = alterPartitionManager.submit(tp1, leaderAndIsr, controlledEpoch) - val future3 = alterPartitionManager.submit(tp2, leaderAndIsr, controlledEpoch) + val future2 = alterPartitionManager.submit(tp1, leaderAndIsr) + val future3 = alterPartitionManager.submit(tp2, leaderAndIsr) // Respond to the first request, which will also allow the next request to get sent callback1.onComplete(makeClientResponse( diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala index c4eb5e30f5b..09c9a82446a 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala @@ -471,7 +471,6 @@ class ReplicaManagerConcurrencyTest extends Logging { override def submit( topicPartition: TopicIdPartition, leaderAndIsr: LeaderAndIsr, - controllerEpoch: Int ): CompletableFuture[LeaderAndIsr] = { channel.alterIsr(topicPartition, leaderAndIsr) } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index b2d630ca357..580816b32e7 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -1009,15 +1009,13 @@ object TestUtils extends Logging { override def submit( topicPartition: TopicIdPartition, leaderAndIsr: LeaderAndIsr, - controllerEpoch: Int ): CompletableFuture[LeaderAndIsr]= { val future = new CompletableFuture[LeaderAndIsr]() if (inFlight.compareAndSet(false, true)) { isrUpdates += AlterPartitionItem( topicPartition, leaderAndIsr, - future, - controllerEpoch + future ) } else { future.completeExceptionally(new OperationNotAttemptedException(