This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 10b24105d8a KAFKA-18542 Cleanup AlterPartitionManager (#18552)
10b24105d8a is described below

commit 10b24105d8a9ab0ce6594e2602ea81f661ab2cde
Author: Ken Huang <s7133...@gmail.com>
AuthorDate: Thu Jan 16 19:50:02 2025 +0800

    KAFKA-18542 Cleanup AlterPartitionManager (#18552)
    
    Reviewers: Chia-Ping Tsai <chia7...@gmail.com>
---
 core/src/main/scala/kafka/cluster/Partition.scala  | 19 ++---------
 .../scala/kafka/server/AlterPartitionManager.scala | 35 +++++++------------
 .../unit/kafka/cluster/PartitionLockTest.scala     |  3 +-
 .../kafka/server/AlterPartitionManagerTest.scala   | 39 +++++++++++-----------
 .../server/ReplicaManagerConcurrencyTest.scala     |  1 -
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  4 +--
 6 files changed, 35 insertions(+), 66 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index eda8200a806..c0ed3eace7a 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -19,7 +19,7 @@ package kafka.cluster
 import java.util.concurrent.locks.ReentrantReadWriteLock
 import java.util.Optional
 import java.util.concurrent.{CompletableFuture, CopyOnWriteArrayList}
-import kafka.controller.{KafkaController, StateChangeLogger}
+import kafka.controller.StateChangeLogger
 import kafka.log._
 import kafka.log.remote.RemoteLogManager
 import kafka.server._
@@ -358,12 +358,6 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  /* Epoch of the controller that last changed the leader. This needs to be 
initialized correctly upon broker startup.
-   * One way of doing that is through the controller's start replica state 
change command. When a new broker starts up
-   * the controller sends it a start replica command containing the leader for 
each partition that the broker hosts.
-   * In addition to the leader, the controller can also send the epoch of the 
controller that elected the leader for
-   * each partition. */
-  private var controllerEpoch: Int = KafkaController.InitialControllerEpoch
   this.logIdent = s"[Partition $topicPartition broker=$localBrokerId] "
 
   private val tags = Map("topic" -> topic, "partition" -> 
partitionId.toString).asJava
@@ -749,10 +743,6 @@ class Partition(val topicPartition: TopicPartition,
         return false
       }
 
-      // Record the epoch of the controller that made the leadership decision. 
This is useful while updating the isr
-      // to maintain the decision maker controller's epoch in the zookeeper 
path.
-      controllerEpoch = partitionState.controllerEpoch
-
       val currentTimeMs = time.milliseconds
       val isNewLeader = !isLeader
       val isNewLeaderEpoch = partitionState.leaderEpoch > leaderEpoch
@@ -861,10 +851,6 @@ class Partition(val topicPartition: TopicPartition,
         return false
       }
 
-      // Record the epoch of the controller that made the leadership decision. 
This is useful while updating the isr
-      // to maintain the decision maker controller's epoch in the zookeeper 
path
-      controllerEpoch = partitionState.controllerEpoch
-
       val isNewLeaderEpoch = partitionState.leaderEpoch > leaderEpoch
       // The leader should be updated before updateAssignmentAndIsr where we 
clear the ISR. Or it is possible to meet
       // the under min isr condition during the makeFollower process and emits 
the wrong metric.
@@ -1874,8 +1860,7 @@ class Partition(val topicPartition: TopicPartition,
     debug(s"Submitting ISR state change $proposedIsrState")
     val future = alterIsrManager.submit(
       new TopicIdPartition(topicId.getOrElse(Uuid.ZERO_UUID), topicPartition),
-      proposedIsrState.sentLeaderAndIsr,
-      controllerEpoch
+      proposedIsrState.sentLeaderAndIsr
     )
     future.whenComplete { (leaderAndIsr, e) =>
       var hwIncremented = false
diff --git a/core/src/main/scala/kafka/server/AlterPartitionManager.scala 
b/core/src/main/scala/kafka/server/AlterPartitionManager.scala
index fd0f8b61ea2..fa1c3602bee 100644
--- a/core/src/main/scala/kafka/server/AlterPartitionManager.scala
+++ b/core/src/main/scala/kafka/server/AlterPartitionManager.scala
@@ -40,9 +40,8 @@ import scala.collection.mutable.ListBuffer
 import scala.jdk.OptionConverters.RichOptional
 
 /**
- * Handles updating the ISR by sending AlterPartition requests to the 
controller (as of 2.7) or by updating ZK directly
- * (prior to 2.7). Updating the ISR is an asynchronous operation, so 
partitions will learn about the result of their
- * request through a callback.
+ * Handles updating the ISR by sending AlterPartition requests to the 
controller. Updating the ISR is an asynchronous 
+ * operation, so partitions will learn about the result of their request 
through a callback.
  *
  * Note that ISR state changes can still be initiated by the controller and 
sent to the partitions via LeaderAndIsr
  * requests.
@@ -54,22 +53,20 @@ trait AlterPartitionManager {
 
   def submit(
     topicIdPartition: TopicIdPartition,
-    leaderAndIsr: LeaderAndIsr,
-    controllerEpoch: Int
+    leaderAndIsr: LeaderAndIsr
   ): CompletableFuture[LeaderAndIsr]
 }
 
 case class AlterPartitionItem(
   topicIdPartition: TopicIdPartition,
   leaderAndIsr: LeaderAndIsr,
-  future: CompletableFuture[LeaderAndIsr],
-  controllerEpoch: Int // controllerEpoch needed for `ZkAlterPartitionManager`
+  future: CompletableFuture[LeaderAndIsr]
 )
 
 object AlterPartitionManager {
 
   /**
-   * Factory to AlterPartition based implementation, used when IBP >= 2.7-IV2
+   * Factory to AlterPartition based implementation
    */
   def apply(
     config: KafkaConfig,
@@ -112,18 +109,11 @@ class DefaultAlterPartitionManager(
 
   // Used to allow only one pending ISR update per partition (visible for 
testing).
   // Note that we key items by TopicPartition despite using TopicIdPartition 
while
-  // submitting changes. We do this to ensure that topics with the same name 
but
-  // with a different topic id or no topic id collide here. There are two 
cases to
-  // consider:
-  // 1) When the cluster is upgraded from IBP < 2.8 to IBP >= 2.8, the ZK 
controller
-  //    assigns topic ids to the partitions. So partitions will start sending 
updates
-  //    with a topic id while they might still have updates without topic ids 
in this
-  //    Map. This would break the contract of only allowing one pending ISR 
update per
-  //    partition.
-  // 2) When a topic is deleted and re-created, we cannot have two entries in 
this Map
-  //    especially if we cannot use an AlterPartition request version which 
supports
-  //    topic ids in the end because the two updates with the same name would 
be merged
-  //    together.
+  // submitting changes. This is done to ensure that topics with the same name 
but
+  // with a different topic id or no topic id collide here. When a topic is 
deleted 
+  // and re-created, we cannot have two entries in this Map especially if we 
cannot 
+  // use an AlterPartition request version which supports topic ids in the end 
because 
+  // the two updates with the same name would be merged together.
   private[server] val unsentIsrUpdates: util.Map[TopicPartition, 
AlterPartitionItem] = new ConcurrentHashMap[TopicPartition, 
AlterPartitionItem]()
 
   // Used to allow only one in-flight request at a time
@@ -139,11 +129,10 @@ class DefaultAlterPartitionManager(
 
   override def submit(
     topicIdPartition: TopicIdPartition,
-    leaderAndIsr: LeaderAndIsr,
-    controllerEpoch: Int
+    leaderAndIsr: LeaderAndIsr
   ): CompletableFuture[LeaderAndIsr] = {
     val future = new CompletableFuture[LeaderAndIsr]()
-    val alterPartitionItem = AlterPartitionItem(topicIdPartition, 
leaderAndIsr, future, controllerEpoch)
+    val alterPartitionItem = AlterPartitionItem(topicIdPartition, 
leaderAndIsr, future)
     val enqueued = 
unsentIsrUpdates.putIfAbsent(alterPartitionItem.topicIdPartition.topicPartition,
 alterPartitionItem) == null
     if (enqueued) {
       maybePropagateIsrChanges()
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
index 4695506863c..8b874424541 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
@@ -341,8 +341,7 @@ class PartitionLockTest extends Logging {
     )).thenReturn(Optional.empty[JLong])
     when(alterIsrManager.submit(
       ArgumentMatchers.eq(topicIdPartition),
-      ArgumentMatchers.any[LeaderAndIsr],
-      ArgumentMatchers.anyInt()
+      ArgumentMatchers.any[LeaderAndIsr]
     )).thenReturn(new CompletableFuture[LeaderAndIsr]())
 
     partition.createLogIfNotExists(isNew = false, isFutureReplica = false, 
offsetCheckpoints, None)
diff --git 
a/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
index 772cf50b0a5..9455eaf7bfd 100644
--- a/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
@@ -75,7 +75,7 @@ class AlterPartitionManagerTest {
     val scheduler = new MockScheduler(time)
     val alterPartitionManager = new 
DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () 
=> 2, () => metadataVersion)
     alterPartitionManager.start()
-    alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1, 2, 
3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10), 0)
+    alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1, 2, 
3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10))
     verify(brokerToController).start()
     verify(brokerToController).sendRequest(any(), any())
   }
@@ -90,7 +90,7 @@ class AlterPartitionManagerTest {
     for (ii <- 1 to 3) {
       isrWithBrokerEpoch += new 
BrokerState().setBrokerId(ii).setBrokerEpoch(100 + ii)
     }
-    alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, 
LeaderRecoveryState.RECOVERED, isrWithBrokerEpoch.toList.asJava, 10), 0)
+    alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, 
LeaderRecoveryState.RECOVERED, isrWithBrokerEpoch.toList.asJava, 10))
 
     val expectedAlterPartitionData = new AlterPartitionRequestData()
       .setBrokerId(brokerId)
@@ -128,7 +128,7 @@ class AlterPartitionManagerTest {
     val scheduler = new MockScheduler(time)
     val alterPartitionManager = new 
DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () 
=> 2, () => metadataVersion)
     alterPartitionManager.start()
-    alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, 
List(1).map(Int.box).asJava, leaderRecoveryState, 10), 0)
+    alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, 
List(1).map(Int.box).asJava, leaderRecoveryState, 10))
     verify(brokerToController).start()
     verify(brokerToController).sendRequest(requestCapture.capture(), any())
 
@@ -148,10 +148,10 @@ class AlterPartitionManagerTest {
     alterPartitionManager.start()
 
     // Only send one ISR update for a given topic+partition
-    val firstSubmitFuture = alterPartitionManager.submit(tp0, new 
LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, 
LeaderRecoveryState.RECOVERED, 10), 0)
+    val firstSubmitFuture = alterPartitionManager.submit(tp0, new 
LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, 
LeaderRecoveryState.RECOVERED, 10))
     assertFalse(firstSubmitFuture.isDone)
 
-    val failedSubmitFuture = alterPartitionManager.submit(tp0, new 
LeaderAndIsr(1, 1, List(1, 2).map(Int.box).asJava, 
LeaderRecoveryState.RECOVERED, 10), 0)
+    val failedSubmitFuture = alterPartitionManager.submit(tp0, new 
LeaderAndIsr(1, 1, List(1, 2).map(Int.box).asJava, 
LeaderRecoveryState.RECOVERED, 10))
     assertTrue(failedSubmitFuture.isCompletedExceptionally)
     assertFutureThrows(failedSubmitFuture, 
classOf[OperationNotAttemptedException])
 
@@ -165,7 +165,7 @@ class AlterPartitionManagerTest {
     callbackCapture.getValue.onComplete(resp)
 
     // Now we can submit this partition again
-    val newSubmitFuture = alterPartitionManager.submit(tp0, new 
LeaderAndIsr(1, 1, List(1).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 
10), 0)
+    val newSubmitFuture = alterPartitionManager.submit(tp0, new 
LeaderAndIsr(1, 1, List(1).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 
10))
     assertFalse(newSubmitFuture.isDone)
 
     verify(brokerToController).start()
@@ -193,12 +193,12 @@ class AlterPartitionManagerTest {
 
     // First request will send batch of one
     alterPartitionManager.submit(new TopicIdPartition(topicId, 0, topic),
-      new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, 
LeaderRecoveryState.RECOVERED, 10), 0)
+      new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, 
LeaderRecoveryState.RECOVERED, 10))
 
     // Other submissions will queue up until a response
     for (i <- 1 to 9) {
       alterPartitionManager.submit(new TopicIdPartition(topicId, i, topic),
-        new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, 
LeaderRecoveryState.RECOVERED, 10), 0)
+        new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, 
LeaderRecoveryState.RECOVERED, 10))
     }
 
     // Simulate response, omitting partition 0 will allow it to stay in unsent 
queue
@@ -235,12 +235,12 @@ class AlterPartitionManagerTest {
     val scheduler = new MockScheduler(time)
     val alterPartitionManager = new 
DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () 
=> 2, () => IBP_3_2_IV0)
     alterPartitionManager.start()
-    val future = alterPartitionManager.submit(tp0, leaderAndIsr, 0)
+    val future = alterPartitionManager.submit(tp0, leaderAndIsr)
     val finalFuture = new CompletableFuture[LeaderAndIsr]()
     future.whenComplete { (_, e) =>
       if (e != null) {
         // Retry when error.
-        alterPartitionManager.submit(tp0, leaderAndIsr, 0).whenComplete { 
(result, e) =>
+        alterPartitionManager.submit(tp0, leaderAndIsr).whenComplete { 
(result, e) =>
           if (e != null) {
             finalFuture.completeExceptionally(e)
           } else {
@@ -309,7 +309,7 @@ class AlterPartitionManagerTest {
     val scheduler = new MockScheduler(time)
     val alterPartitionManager = new 
DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () 
=> 2, () => IBP_3_2_IV0)
     alterPartitionManager.start()
-    alterPartitionManager.submit(tp0, leaderAndIsr, 0)
+    alterPartitionManager.submit(tp0, leaderAndIsr)
 
     verify(brokerToController).start()
     verify(brokerToController).sendRequest(any(), callbackCapture.capture())
@@ -357,7 +357,7 @@ class AlterPartitionManagerTest {
   private def checkPartitionError(error: Errors): Unit = {
     val alterPartitionManager = testPartitionError(tp0, error)
     // Any partition-level error should clear the item from the pending queue 
allowing for future updates
-    val future = alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, 
List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10), 0)
+    val future = alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, 
List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10))
     assertFalse(future.isDone)
   }
 
@@ -369,7 +369,7 @@ class AlterPartitionManagerTest {
     val alterPartitionManager = new 
DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () 
=> 2, () => IBP_3_2_IV0)
     alterPartitionManager.start()
 
-    val future = alterPartitionManager.submit(tp, new LeaderAndIsr(1, 1, 
List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10), 0)
+    val future = alterPartitionManager.submit(tp, new LeaderAndIsr(1, 1, 
List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10))
 
     verify(brokerToController).start()
     verify(brokerToController).sendRequest(any(), callbackCapture.capture())
@@ -393,11 +393,11 @@ class AlterPartitionManagerTest {
     alterPartitionManager.start()
 
     // First submit will send the request
-    alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1, 2, 
3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10), 0)
+    alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1, 2, 
3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10))
 
     // These will become pending unsent items
-    alterPartitionManager.submit(tp1, new LeaderAndIsr(1, 1, List(1, 2, 
3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10), 0)
-    alterPartitionManager.submit(tp2, new LeaderAndIsr(1, 1, List(1, 2, 
3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10), 0)
+    alterPartitionManager.submit(tp1, new LeaderAndIsr(1, 1, List(1, 2, 
3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10))
+    alterPartitionManager.submit(tp2, new LeaderAndIsr(1, 1, List(1, 2, 
3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10))
 
     verify(brokerToController).start()
     verify(brokerToController).sendRequest(any(), callbackCapture.capture())
@@ -415,7 +415,6 @@ class AlterPartitionManagerTest {
   def testPartitionMissingInResponse(metadataVersion: MetadataVersion): Unit = 
{
     val expectedVersion = ApiKeys.ALTER_PARTITION.latestVersion
     val leaderAndIsr = new LeaderAndIsr(1, 1, List(1, 2, 
3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10)
-    val controlledEpoch = 0
     val brokerEpoch = 2
     val scheduler = new MockScheduler(time)
     val brokerToController = 
Mockito.mock(classOf[NodeToControllerChannelManager])
@@ -430,15 +429,15 @@ class AlterPartitionManagerTest {
     alterPartitionManager.start()
 
     // The first `submit` will send the `AlterIsr` request
-    val future1 = alterPartitionManager.submit(tp0, leaderAndIsr, 
controlledEpoch)
+    val future1 = alterPartitionManager.submit(tp0, leaderAndIsr)
     val callback1 = verifySendRequest(brokerToController, 
alterPartitionRequestMatcher(
       expectedTopicPartitions = Set(tp0),
       expectedVersion = expectedVersion
     ))
 
     // Additional calls while the `AlterIsr` request is inflight will be queued
-    val future2 = alterPartitionManager.submit(tp1, leaderAndIsr, 
controlledEpoch)
-    val future3 = alterPartitionManager.submit(tp2, leaderAndIsr, 
controlledEpoch)
+    val future2 = alterPartitionManager.submit(tp1, leaderAndIsr)
+    val future3 = alterPartitionManager.submit(tp2, leaderAndIsr)
 
     // Respond to the first request, which will also allow the next request to 
get sent
     callback1.onComplete(makeClientResponse(
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
index c4eb5e30f5b..09c9a82446a 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
@@ -471,7 +471,6 @@ class ReplicaManagerConcurrencyTest extends Logging {
     override def submit(
       topicPartition: TopicIdPartition,
       leaderAndIsr: LeaderAndIsr,
-      controllerEpoch: Int
     ): CompletableFuture[LeaderAndIsr] = {
       channel.alterIsr(topicPartition, leaderAndIsr)
     }
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index b2d630ca357..580816b32e7 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1009,15 +1009,13 @@ object TestUtils extends Logging {
     override def submit(
       topicPartition: TopicIdPartition,
       leaderAndIsr: LeaderAndIsr,
-      controllerEpoch: Int
     ): CompletableFuture[LeaderAndIsr]= {
       val future = new CompletableFuture[LeaderAndIsr]()
       if (inFlight.compareAndSet(false, true)) {
         isrUpdates += AlterPartitionItem(
           topicPartition,
           leaderAndIsr,
-          future,
-          controllerEpoch
+          future
         )
       } else {
         future.completeExceptionally(new OperationNotAttemptedException(

Reply via email to