This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f2a782a4d7 MINOR: Rename `AlterIsrManager` to `AlterPartitionManager` 
(#12089)
f2a782a4d7 is described below

commit f2a782a4d7c71dabf79861f18c4bb14d2b2776ae
Author: Jason Gustafson <[email protected]>
AuthorDate: Tue Apr 26 09:34:18 2022 -0700

    MINOR: Rename `AlterIsrManager` to `AlterPartitionManager` (#12089)
    
    Since we have changed the `AlterIsr` API to `AlterPartition`, it makes 
sense to rename `AlterIsrManager` as well and some of the associated classes.
    
    Reviewers: dengziming <[email protected]>, David Jacot 
<[email protected]>
---
 .../apache/kafka/common/protocol/ApiKeysTest.java  |  2 +-
 .../server/builders/ReplicaManagerBuilder.java     | 12 ++--
 core/src/main/scala/kafka/cluster/Partition.scala  | 32 +++++------
 ...srManager.scala => AlterPartitionManager.scala} | 64 +++++++++++-----------
 .../src/main/scala/kafka/server/BrokerServer.scala |  6 +-
 core/src/main/scala/kafka/server/KafkaServer.scala |  8 +--
 .../main/scala/kafka/server/ReplicaManager.scala   |  2 +-
 ...Manager.scala => ZkAlterPartitionManager.scala} |  6 +-
 .../admin/ReassignPartitionsIntegrationTest.scala  |  4 +-
 .../unit/kafka/cluster/AbstractPartitionTest.scala |  6 +-
 .../unit/kafka/cluster/PartitionLockTest.scala     |  4 +-
 .../scala/unit/kafka/cluster/PartitionTest.scala   |  6 +-
 ...rTest.scala => AlterPartitionManagerTest.scala} | 24 ++++----
 .../server/HighwatermarkPersistenceTest.scala      |  4 +-
 .../unit/kafka/server/IsrExpirationTest.scala      |  6 +-
 .../scala/unit/kafka/server/KafkaServerTest.scala  |  8 +--
 .../server/ReplicaManagerConcurrencyTest.scala     |  4 +-
 .../kafka/server/ReplicaManagerQuotasTest.scala    |  4 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     | 22 ++++----
 .../server/epoch/OffsetsForLeaderEpochTest.scala   |  6 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    | 22 ++++----
 .../jmh/fetcher/ReplicaFetcherThreadBenchmark.java | 12 ++--
 .../partition/PartitionMakeFollowerBenchmark.java  | 12 ++--
 .../UpdateFollowerFetchStateBenchmark.java         | 12 ++--
 .../apache/kafka/jmh/server/CheckpointBench.java   |  8 +--
 .../kafka/jmh/server/PartitionCreationBench.java   |  8 +--
 26 files changed, 153 insertions(+), 151 deletions(-)

diff --git 
a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java 
b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
index 7e0a6e438e..1aa420b36f 100644
--- a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
@@ -44,7 +44,7 @@ public class ApiKeysTest {
     }
 
     @Test
-    public void testAlterIsrIsClusterAction() {
+    public void testAlterPartitionIsClusterAction() {
         assertTrue(ApiKeys.ALTER_PARTITION.clusterAction);
     }
 
diff --git 
a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java 
b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
index a0051784b4..a1339264bf 100644
--- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
+++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
@@ -18,7 +18,7 @@
 package kafka.server.builders;
 
 import kafka.log.LogManager;
-import kafka.server.AlterIsrManager;
+import kafka.server.AlterPartitionManager;
 import kafka.server.BrokerTopicStats;
 import kafka.server.DelayedDeleteRecords;
 import kafka.server.DelayedElectLeader;
@@ -50,7 +50,7 @@ public class ReplicaManagerBuilder {
     private QuotaManagers quotaManagers = null;
     private MetadataCache metadataCache = null;
     private LogDirFailureChannel logDirFailureChannel = null;
-    private AlterIsrManager alterIsrManager = null;
+    private AlterPartitionManager alterPartitionManager = null;
     private BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
     private AtomicBoolean isShuttingDown = new AtomicBoolean(false);
     private Optional<KafkaZkClient> zkClient = Optional.empty();
@@ -100,8 +100,8 @@ public class ReplicaManagerBuilder {
         return this;
     }
 
-    public ReplicaManagerBuilder setAlterIsrManager(AlterIsrManager 
alterIsrManager) {
-        this.alterIsrManager = alterIsrManager;
+    public ReplicaManagerBuilder 
setAlterPartitionManager(AlterPartitionManager alterPartitionManager) {
+        this.alterPartitionManager = alterPartitionManager;
         return this;
     }
 
@@ -151,7 +151,7 @@ public class ReplicaManagerBuilder {
         if (logManager == null) throw new RuntimeException("You must set 
logManager");
         if (metadataCache == null) throw new RuntimeException("You must set 
metadataCache");
         if (logDirFailureChannel == null) throw new RuntimeException("You must 
set logDirFailureChannel");
-        if (alterIsrManager == null) throw new RuntimeException("You must set 
alterIsrManager");
+        if (alterPartitionManager == null) throw new RuntimeException("You 
must set alterIsrManager");
         return new ReplicaManager(config,
                              metrics,
                              time,
@@ -160,7 +160,7 @@ public class ReplicaManagerBuilder {
                              quotaManagers,
                              metadataCache,
                              logDirFailureChannel,
-                             alterIsrManager,
+                             alterPartitionManager,
                              brokerTopicStats,
                              isShuttingDown,
                              OptionConverters.toScala(zkClient),
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index f27a9cb558..0d1e5de0cc 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -46,9 +46,9 @@ import org.apache.kafka.metadata.LeaderRecoveryState
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
-trait IsrChangeListener {
-  def markExpand(): Unit
-  def markShrink(): Unit
+trait AlterPartitionListener {
+  def markIsrExpand(): Unit
+  def markIsrShrink(): Unit
   def markFailed(): Unit
 }
 
@@ -72,12 +72,12 @@ object Partition extends KafkaMetricsGroup {
             time: Time,
             replicaManager: ReplicaManager): Partition = {
 
-    val isrChangeListener = new IsrChangeListener {
-      override def markExpand(): Unit = {
+    val isrChangeListener = new AlterPartitionListener {
+      override def markIsrExpand(): Unit = {
         replicaManager.isrExpandRate.mark()
       }
 
-      override def markShrink(): Unit = {
+      override def markIsrShrink(): Unit = {
         replicaManager.isrShrinkRate.mark()
       }
 
@@ -95,11 +95,11 @@ object Partition extends KafkaMetricsGroup {
       interBrokerProtocolVersion = 
replicaManager.config.interBrokerProtocolVersion,
       localBrokerId = replicaManager.config.brokerId,
       time = time,
-      isrChangeListener = isrChangeListener,
+      alterPartitionListener = isrChangeListener,
       delayedOperations = delayedOperations,
       metadataCache = replicaManager.metadataCache,
       logManager = replicaManager.logManager,
-      alterIsrManager = replicaManager.alterIsrManager)
+      alterIsrManager = replicaManager.alterPartitionManager)
   }
 
   def removeMetrics(topicPartition: TopicPartition): Unit = {
@@ -235,11 +235,11 @@ class Partition(val topicPartition: TopicPartition,
                 interBrokerProtocolVersion: ApiVersion,
                 localBrokerId: Int,
                 time: Time,
-                isrChangeListener: IsrChangeListener,
+                alterPartitionListener: AlterPartitionListener,
                 delayedOperations: DelayedOperations,
                 metadataCache: MetadataCache,
                 logManager: LogManager,
-                alterIsrManager: AlterIsrManager) extends Logging with 
KafkaMetricsGroup {
+                alterIsrManager: AlterPartitionManager) extends Logging with 
KafkaMetricsGroup {
 
   def topic: String = topicPartition.topic
   def partitionId: Int = topicPartition.partition
@@ -1417,14 +1417,14 @@ class Partition(val topicPartition: TopicPartition,
    * to the KRaft metadata log).
    *
    * @param proposedIsrState The ISR state change that was requested
-   * @param error The error returned from [[AlterIsrManager]]
+   * @param error The error returned from [[AlterPartitionManager]]
    * @return true if the `AlterPartition` request should be retried, false 
otherwise
    */
   private def handleAlterPartitionError(
     proposedIsrState: PendingPartitionChange,
     error: Errors
   ): Boolean = {
-    isrChangeListener.markFailed()
+    alterPartitionListener.markFailed()
     error match {
       case Errors.OPERATION_NOT_ATTEMPTED =>
         // Since the operation was not attempted, it is safe to reset back to 
the committed state.
@@ -1465,11 +1465,11 @@ class Partition(val topicPartition: TopicPartition,
     // Success from controller, still need to check a few things
     if (leaderAndIsr.leaderEpoch != leaderEpoch) {
       debug(s"Ignoring new ISR $leaderAndIsr since we have a stale leader 
epoch $leaderEpoch.")
-      isrChangeListener.markFailed()
+      alterPartitionListener.markFailed()
       false
     } else if (leaderAndIsr.partitionEpoch < partitionEpoch) {
       debug(s"Ignoring new ISR $leaderAndIsr since we have a newer version 
$partitionEpoch.")
-      isrChangeListener.markFailed()
+      alterPartitionListener.markFailed()
       false
     } else {
       // This is one of two states:
@@ -1482,8 +1482,8 @@ class Partition(val topicPartition: TopicPartition,
       info(s"ISR updated to ${partitionState.isr.mkString(",")} and version 
updated to $partitionEpoch")
 
       proposedIsrState match {
-        case PendingExpandIsr(_, _, _) => isrChangeListener.markExpand()
-        case PendingShrinkIsr(_, _, _) => isrChangeListener.markShrink()
+        case PendingExpandIsr(_, _, _) => 
alterPartitionListener.markIsrExpand()
+        case PendingShrinkIsr(_, _, _) => 
alterPartitionListener.markIsrShrink()
       }
 
       // we may need to increment high watermark since ISR could be down to 1
diff --git a/core/src/main/scala/kafka/server/AlterIsrManager.scala 
b/core/src/main/scala/kafka/server/AlterPartitionManager.scala
similarity index 84%
rename from core/src/main/scala/kafka/server/AlterIsrManager.scala
rename to core/src/main/scala/kafka/server/AlterPartitionManager.scala
index ae9ef007b8..8f5e4438c7 100644
--- a/core/src/main/scala/kafka/server/AlterIsrManager.scala
+++ b/core/src/main/scala/kafka/server/AlterPartitionManager.scala
@@ -49,7 +49,7 @@ import scala.jdk.CollectionConverters._
  * Note that ISR state changes can still be initiated by the controller and 
sent to the partitions via LeaderAndIsr
  * requests.
  */
-trait AlterIsrManager {
+trait AlterPartitionManager {
   def start(): Unit = {}
 
   def shutdown(): Unit = {}
@@ -61,12 +61,14 @@ trait AlterIsrManager {
   ): CompletableFuture[LeaderAndIsr]
 }
 
-case class AlterIsrItem(topicPartition: TopicPartition,
-                        leaderAndIsr: LeaderAndIsr,
-                        future: CompletableFuture[LeaderAndIsr],
-                        controllerEpoch: Int) // controllerEpoch needed for Zk 
impl
+case class AlterPartitionItem(
+  topicPartition: TopicPartition,
+  leaderAndIsr: LeaderAndIsr,
+  future: CompletableFuture[LeaderAndIsr],
+  controllerEpoch: Int // controllerEpoch needed for `ZkAlterPartitionManager`
+)
 
-object AlterIsrManager {
+object AlterPartitionManager {
 
   /**
    * Factory to AlterPartition based implementation, used when IBP >= 2.7-IV2
@@ -79,7 +81,7 @@ object AlterIsrManager {
     metrics: Metrics,
     threadNamePrefix: Option[String],
     brokerEpochSupplier: () => Long
-  ): AlterIsrManager = {
+  ): AlterPartitionManager = {
     val nodeProvider = MetadataCacheControllerNodeProvider(config, 
metadataCache)
 
     val channelManager = BrokerToControllerChannelManager(
@@ -91,7 +93,7 @@ object AlterIsrManager {
       threadNamePrefix = threadNamePrefix,
       retryTimeoutMs = Long.MaxValue
     )
-    new DefaultAlterIsrManager(
+    new DefaultAlterPartitionManager(
       controllerChannelManager = channelManager,
       scheduler = scheduler,
       time = time,
@@ -108,23 +110,23 @@ object AlterIsrManager {
     scheduler: Scheduler,
     time: Time,
     zkClient: KafkaZkClient
-  ): AlterIsrManager = {
-    new ZkIsrManager(scheduler, time, zkClient)
+  ): AlterPartitionManager = {
+    new ZkAlterPartitionManager(scheduler, time, zkClient)
   }
 
 }
 
-class DefaultAlterIsrManager(
+class DefaultAlterPartitionManager(
   val controllerChannelManager: BrokerToControllerChannelManager,
   val scheduler: Scheduler,
   val time: Time,
   val brokerId: Int,
   val brokerEpochSupplier: () => Long,
   ibpVersion: ApiVersion
-) extends AlterIsrManager with Logging with KafkaMetricsGroup {
+) extends AlterPartitionManager with Logging with KafkaMetricsGroup {
 
   // Used to allow only one pending ISR update per partition (visible for 
testing)
-  private[server] val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] 
= new ConcurrentHashMap[TopicPartition, AlterIsrItem]()
+  private[server] val unsentIsrUpdates: util.Map[TopicPartition, 
AlterPartitionItem] = new ConcurrentHashMap[TopicPartition, 
AlterPartitionItem]()
 
   // Used to allow only one in-flight request at a time
   private val inflightRequest: AtomicBoolean = new AtomicBoolean(false)
@@ -143,8 +145,8 @@ class DefaultAlterIsrManager(
     controllerEpoch: Int
   ): CompletableFuture[LeaderAndIsr] = {
     val future = new CompletableFuture[LeaderAndIsr]()
-    val alterIsrItem = AlterIsrItem(topicPartition, leaderAndIsr, future, 
controllerEpoch)
-    val enqueued = unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, 
alterIsrItem) == null
+    val alterPartitionItem = AlterPartitionItem(topicPartition, leaderAndIsr, 
future, controllerEpoch)
+    val enqueued = 
unsentIsrUpdates.putIfAbsent(alterPartitionItem.topicPartition, 
alterPartitionItem) == null
     if (enqueued) {
       maybePropagateIsrChanges()
     } else {
@@ -158,9 +160,9 @@ class DefaultAlterIsrManager(
     // Send all pending items if there is not already a request in-flight.
     if (!unsentIsrUpdates.isEmpty && inflightRequest.compareAndSet(false, 
true)) {
       // Copy current unsent ISRs but don't remove from the map, they get 
cleared in the response handler
-      val inflightAlterIsrItems = new ListBuffer[AlterIsrItem]()
-      unsentIsrUpdates.values.forEach(item => 
inflightAlterIsrItems.append(item))
-      sendRequest(inflightAlterIsrItems.toSeq)
+      val inflightAlterPartitionItems = new ListBuffer[AlterPartitionItem]()
+      unsentIsrUpdates.values.forEach(item => 
inflightAlterPartitionItems.append(item))
+      sendRequest(inflightAlterPartitionItems.toSeq)
     }
   }
 
@@ -170,8 +172,8 @@ class DefaultAlterIsrManager(
     }
   }
 
-  private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
-    val message = buildRequest(inflightAlterIsrItems)
+  private def sendRequest(inflightAlterPartitionItems: 
Seq[AlterPartitionItem]): Unit = {
+    val message = buildRequest(inflightAlterPartitionItems)
     debug(s"Sending AlterPartition to controller $message")
 
     // We will not timeout AlterPartition request, instead letting it retry 
indefinitely
@@ -192,7 +194,7 @@ class DefaultAlterIsrManager(
               Errors.UNSUPPORTED_VERSION
             } else {
               val body = 
response.responseBody().asInstanceOf[AlterPartitionResponse]
-              handleAlterPartitionResponse(body, message.brokerEpoch, 
inflightAlterIsrItems)
+              handleAlterPartitionResponse(body, message.brokerEpoch, 
inflightAlterPartitionItems)
             }
           } finally {
             // clear the flag so future requests can proceed
@@ -216,16 +218,16 @@ class DefaultAlterIsrManager(
       })
   }
 
-  private def buildRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): 
AlterPartitionRequestData = {
+  private def buildRequest(inflightAlterPartitionItems: 
Seq[AlterPartitionItem]): AlterPartitionRequestData = {
     val message = new AlterPartitionRequestData()
       .setBrokerId(brokerId)
       .setBrokerEpoch(brokerEpochSupplier.apply())
 
-      inflightAlterIsrItems.groupBy(_.topicPartition.topic).foreach { case 
(topic, items) => 
+      inflightAlterPartitionItems.groupBy(_.topicPartition.topic).foreach { 
case (topic, items) =>
       val topicData = new AlterPartitionRequestData.TopicData()
         .setName(topic)
       message.topics.add(topicData)
-      items.foreach { item => 
+      items.foreach { item =>
         val partitionData = new AlterPartitionRequestData.PartitionData()
           .setPartitionIndex(item.topicPartition.partition)
           .setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
@@ -245,7 +247,7 @@ class DefaultAlterIsrManager(
   def handleAlterPartitionResponse(
     alterPartitionResp: AlterPartitionResponse,
     sentBrokerEpoch: Long,
-    inflightAlterIsrItems: Seq[AlterIsrItem]
+    inflightAlterPartitionItems: Seq[AlterPartitionItem]
   ): Errors = {
     val data = alterPartitionResp.data
 
@@ -291,21 +293,21 @@ class DefaultAlterIsrManager(
         // Iterate across the items we sent rather than what we received to 
ensure we run the callback even if a
         // partition was somehow erroneously excluded from the response. Note 
that these callbacks are run from
         // the leaderIsrUpdateLock write lock in 
Partition#sendAlterPartitionRequest
-        inflightAlterIsrItems.foreach { inflightAlterIsr =>
-          partitionResponses.get(inflightAlterIsr.topicPartition) match {
+        inflightAlterPartitionItems.foreach { inflightAlterPartition =>
+          partitionResponses.get(inflightAlterPartition.topicPartition) match {
             case Some(leaderAndIsrOrError) =>
               try {
                 leaderAndIsrOrError match {
-                  case Left(error) => 
inflightAlterIsr.future.completeExceptionally(error.exception)
-                  case Right(leaderAndIsr) => 
inflightAlterIsr.future.complete(leaderAndIsr)
+                  case Left(error) => 
inflightAlterPartition.future.completeExceptionally(error.exception)
+                  case Right(leaderAndIsr) => 
inflightAlterPartition.future.complete(leaderAndIsr)
                 }
               } finally {
                 // Regardless of callback outcome, we need to clear from the 
unsent updates map to unblock further updates
-                unsentIsrUpdates.remove(inflightAlterIsr.topicPartition)
+                unsentIsrUpdates.remove(inflightAlterPartition.topicPartition)
               }
             case None =>
               // Don't remove this partition from the update map so it will 
get re-sent
-              warn(s"Partition ${inflightAlterIsr.topicPartition} was sent but 
not included in the response")
+              warn(s"Partition ${inflightAlterPartition.topicPartition} was 
sent but not included in the response")
           }
         }
 
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 8fdc59d945..5447e29863 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -129,7 +129,7 @@ class BrokerServer(
 
   @volatile var featureCache: FinalizedFeatureCache = null
 
-  var alterIsrManager: AlterIsrManager = null
+  var alterIsrManager: AlterPartitionManager = null
 
   var autoTopicCreationManager: AutoTopicCreationManager = null
 
@@ -250,7 +250,7 @@ class BrokerServer(
         threadNamePrefix,
         retryTimeoutMs = Long.MaxValue
       )
-      alterIsrManager = new DefaultAlterIsrManager(
+      alterIsrManager = new DefaultAlterPartitionManager(
         controllerChannelManager = alterIsrChannelManager,
         scheduler = kafkaScheduler,
         time = time,
@@ -269,7 +269,7 @@ class BrokerServer(
         quotaManagers = quotaManagers,
         metadataCache = metadataCache,
         logDirFailureChannel = logDirFailureChannel,
-        alterIsrManager = alterIsrManager,
+        alterPartitionManager = alterIsrManager,
         brokerTopicStats = brokerTopicStats,
         isShuttingDown = isShuttingDown,
         zkClient = None,
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index d006165577..78ec415c3b 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -140,7 +140,7 @@ class KafkaServer(
 
   var clientToControllerChannelManager: BrokerToControllerChannelManager = null
 
-  var alterIsrManager: AlterIsrManager = null
+  var alterIsrManager: AlterPartitionManager = null
 
   var kafkaScheduler: KafkaScheduler = null
 
@@ -311,7 +311,7 @@ class KafkaServer(
 
         // Start alter partition manager based on the IBP version
         alterIsrManager = if 
(config.interBrokerProtocolVersion.isAlterIsrSupported) {
-          AlterIsrManager(
+          AlterPartitionManager(
             config = config,
             metadataCache = metadataCache,
             scheduler = kafkaScheduler,
@@ -321,7 +321,7 @@ class KafkaServer(
             brokerEpochSupplier = () => kafkaController.brokerEpoch
           )
         } else {
-          AlterIsrManager(kafkaScheduler, time, zkClient)
+          AlterPartitionManager(kafkaScheduler, time, zkClient)
         }
         alterIsrManager.start()
 
@@ -479,7 +479,7 @@ class KafkaServer(
       quotaManagers = quotaManagers,
       metadataCache = metadataCache,
       logDirFailureChannel = logDirFailureChannel,
-      alterIsrManager = alterIsrManager,
+      alterPartitionManager = alterIsrManager,
       brokerTopicStats = brokerTopicStats,
       isShuttingDown = isShuttingDown,
       zkClient = Some(zkClient),
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index d72c7351f6..dac8313be3 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -190,7 +190,7 @@ class ReplicaManager(val config: KafkaConfig,
                      quotaManagers: QuotaManagers,
                      val metadataCache: MetadataCache,
                      logDirFailureChannel: LogDirFailureChannel,
-                     val alterIsrManager: AlterIsrManager,
+                     val alterPartitionManager: AlterPartitionManager,
                      val brokerTopicStats: BrokerTopicStats = new 
BrokerTopicStats(),
                      val isShuttingDown: AtomicBoolean = new 
AtomicBoolean(false),
                      val zkClient: Option[KafkaZkClient] = None,
diff --git a/core/src/main/scala/kafka/server/ZkIsrManager.scala 
b/core/src/main/scala/kafka/server/ZkAlterPartitionManager.scala
similarity index 94%
rename from core/src/main/scala/kafka/server/ZkIsrManager.scala
rename to core/src/main/scala/kafka/server/ZkAlterPartitionManager.scala
index 5549e49327..c906ad6a70 100644
--- a/core/src/main/scala/kafka/server/ZkIsrManager.scala
+++ b/core/src/main/scala/kafka/server/ZkAlterPartitionManager.scala
@@ -35,7 +35,7 @@ import scala.collection.mutable
  */
 case class IsrChangePropagationConfig(checkIntervalMs: Long, maxDelayMs: Long, 
lingerMs: Long)
 
-object ZkIsrManager {
+object ZkAlterPartitionManager {
   // This field is mutable to allow overriding change notification behavior in 
test cases
   @volatile var DefaultIsrPropagationConfig: IsrChangePropagationConfig = 
IsrChangePropagationConfig(
     checkIntervalMs = 2500,
@@ -44,9 +44,9 @@ object ZkIsrManager {
   )
 }
 
-class ZkIsrManager(scheduler: Scheduler, time: Time, zkClient: KafkaZkClient) 
extends AlterIsrManager with Logging {
+class ZkAlterPartitionManager(scheduler: Scheduler, time: Time, zkClient: 
KafkaZkClient) extends AlterPartitionManager with Logging {
 
-  private val isrChangeNotificationConfig = 
ZkIsrManager.DefaultIsrPropagationConfig
+  private val isrChangeNotificationConfig = 
ZkAlterPartitionManager.DefaultIsrPropagationConfig
   // Visible for testing
   private[server] val isrChangeSet: mutable.Set[TopicPartition] = new 
mutable.HashSet[TopicPartition]()
   private val lastIsrChangeMs = new AtomicLong(time.milliseconds())
diff --git 
a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
index 518c8c4b91..5385b2faa1 100644
--- 
a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
@@ -21,7 +21,7 @@ import java.io.Closeable
 import java.util.{Collections, HashMap, List}
 import kafka.admin.ReassignPartitionsCommand._
 import kafka.api.KAFKA_2_7_IV1
-import kafka.server.{IsrChangePropagationConfig, KafkaConfig, KafkaServer, 
ZkIsrManager}
+import kafka.server.{IsrChangePropagationConfig, KafkaConfig, KafkaServer, 
ZkAlterPartitionManager}
 import kafka.utils.Implicits._
 import kafka.utils.TestUtils
 import kafka.server.QuorumTestHarness
@@ -83,7 +83,7 @@ class ReassignPartitionsIntegrationTest extends 
QuorumTestHarness {
 
     // Override change notification settings so that test is not delayed by ISR
     // change notification delay
-    ZkIsrManager.DefaultIsrPropagationConfig = IsrChangePropagationConfig(
+    ZkAlterPartitionManager.DefaultIsrPropagationConfig = 
IsrChangePropagationConfig(
       checkIntervalMs = 500,
       lingerMs = 100,
       maxDelayMs = 500
diff --git a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala 
b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
index d59e4ab387..52addbc82b 100644
--- a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
@@ -21,7 +21,7 @@ import kafka.log.{CleanerConfig, LogConfig, LogManager}
 import kafka.server.{Defaults, MetadataCache}
 import kafka.server.checkpoints.OffsetCheckpoints
 import kafka.server.metadata.MockConfigRepository
-import kafka.utils.TestUtils.{MockAlterIsrManager, MockIsrChangeListener}
+import kafka.utils.TestUtils.{MockAlterPartitionManager, 
MockAlterPartitionListener}
 import kafka.utils.{MockTime, TestUtils}
 import org.apache.kafka.common.TopicPartition
 import 
org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
@@ -49,8 +49,8 @@ class AbstractPartitionTest {
   var logDir1: File = _
   var logDir2: File = _
   var logManager: LogManager = _
-  var alterIsrManager: MockAlterIsrManager = _
-  var isrChangeListener: MockIsrChangeListener = _
+  var alterIsrManager: MockAlterPartitionManager = _
+  var isrChangeListener: MockAlterPartitionListener = _
   var logConfig: LogConfig = _
   var configRepository: MockConfigRepository = _
   val delayedOperations: DelayedOperations = mock(classOf[DelayedOperations])
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
index d6a8649b2a..50bbe18f4f 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
@@ -253,11 +253,11 @@ class PartitionLockTest extends Logging {
   private def setupPartitionWithMocks(logManager: LogManager): Partition = {
     val leaderEpoch = 1
     val brokerId = 0
-    val isrChangeListener: IsrChangeListener = mock(classOf[IsrChangeListener])
+    val isrChangeListener: AlterPartitionListener = 
mock(classOf[AlterPartitionListener])
     val delayedOperations: DelayedOperations = mock(classOf[DelayedOperations])
     val metadataCache: MetadataCache = mock(classOf[MetadataCache])
     val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints])
-    val alterIsrManager: AlterIsrManager = mock(classOf[AlterIsrManager])
+    val alterIsrManager: AlterPartitionManager = 
mock(classOf[AlterPartitionManager])
 
     logManager.startup(Set.empty)
     val partition = new Partition(topicPartition,
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index ec0ad044f7..dcf91ad2d2 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -1734,7 +1734,7 @@ class PartitionTest extends AbstractPartitionTest {
       .when(kafkaZkClient)
       .conditionalUpdatePath(anyString(), any(), ArgumentMatchers.eq(1), any())
 
-    val zkIsrManager = AlterIsrManager(scheduler, time, kafkaZkClient)
+    val zkIsrManager = AlterPartitionManager(scheduler, time, kafkaZkClient)
     zkIsrManager.start()
 
     val partition = new Partition(topicPartition,
@@ -1963,8 +1963,8 @@ class PartitionTest extends AbstractPartitionTest {
     val topicPartition = new TopicPartition("test", 1)
     val partition = new Partition(
       topicPartition, 1000, ApiVersion.latestVersion, 0,
-      new SystemTime(), mock(classOf[IsrChangeListener]), 
mock(classOf[DelayedOperations]),
-      mock(classOf[MetadataCache]), mock(classOf[LogManager]), 
mock(classOf[AlterIsrManager]))
+      new SystemTime(), mock(classOf[AlterPartitionListener]), 
mock(classOf[DelayedOperations]),
+      mock(classOf[MetadataCache]), mock(classOf[LogManager]), 
mock(classOf[AlterPartitionManager]))
 
     val replicas = Seq(0, 1, 2, 3)
     val isr = Set(0, 1, 2, 3)
diff --git a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
similarity index 93%
rename from core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala
rename to core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
index 483a5347e4..d4a1b35660 100644
--- a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
@@ -45,7 +45,7 @@ import org.mockito.Mockito.{mock, reset, times, verify}
 import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
 import scala.jdk.CollectionConverters._
 
-class AlterIsrManagerTest {
+class AlterPartitionManagerTest {
 
   val topic = "test-topic"
   val time = new MockTime
@@ -67,7 +67,7 @@ class AlterIsrManagerTest {
   @MethodSource(Array("provideApiVersions"))
   def testBasic(apiVersion: ApiVersion): Unit = {
     val scheduler = new MockScheduler(time)
-    val alterIsrManager = new DefaultAlterIsrManager(brokerToController, 
scheduler, time, brokerId, () => 2, apiVersion)
+    val alterIsrManager = new DefaultAlterPartitionManager(brokerToController, 
scheduler, time, brokerId, () => 2, apiVersion)
     alterIsrManager.start()
     alterIsrManager.submit(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 
LeaderRecoveryState.RECOVERED, 10), 0)
     verify(brokerToController).start()
@@ -83,7 +83,7 @@ class AlterIsrManagerTest {
     val requestCapture = 
ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[AlterPartitionRequest]])
 
     val scheduler = new MockScheduler(time)
-    val alterIsrManager = new DefaultAlterIsrManager(brokerToController, 
scheduler, time, brokerId, () => 2, apiVersion)
+    val alterIsrManager = new DefaultAlterPartitionManager(brokerToController, 
scheduler, time, brokerId, () => 2, apiVersion)
     alterIsrManager.start()
     alterIsrManager.submit(tp0, new LeaderAndIsr(1, 1, List(1), 
leaderRecoveryState, 10), 0)
     verify(brokerToController).start()
@@ -101,7 +101,7 @@ class AlterIsrManagerTest {
     val callbackCapture: ArgumentCaptor[ControllerRequestCompletionHandler] = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
 
     val scheduler = new MockScheduler(time)
-    val alterIsrManager = new DefaultAlterIsrManager(brokerToController, 
scheduler, time, brokerId, () => 2, apiVersion)
+    val alterIsrManager = new DefaultAlterPartitionManager(brokerToController, 
scheduler, time, brokerId, () => 2, apiVersion)
     alterIsrManager.start()
 
     // Only send one ISR update for a given topic+partition
@@ -139,7 +139,7 @@ class AlterIsrManagerTest {
     val callbackCapture: ArgumentCaptor[ControllerRequestCompletionHandler] = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
 
     val scheduler = new MockScheduler(time)
-    val alterIsrManager = new DefaultAlterIsrManager(brokerToController, 
scheduler, time, brokerId, () => 2, apiVersion)
+    val alterIsrManager = new DefaultAlterPartitionManager(brokerToController, 
scheduler, time, brokerId, () => 2, apiVersion)
     alterIsrManager.start()
 
     // First request will send batch of one
@@ -209,7 +209,7 @@ class AlterIsrManagerTest {
     val callbackCapture: ArgumentCaptor[ControllerRequestCompletionHandler] = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
 
     val scheduler = new MockScheduler(time)
-    val alterIsrManager = new DefaultAlterIsrManager(brokerToController, 
scheduler, time, brokerId, () => 2, KAFKA_3_2_IV0)
+    val alterIsrManager = new DefaultAlterPartitionManager(brokerToController, 
scheduler, time, brokerId, () => 2, KAFKA_3_2_IV0)
     alterIsrManager.start()
     alterIsrManager.submit(tp0, leaderAndIsr, 0)
 
@@ -264,12 +264,12 @@ class AlterIsrManagerTest {
     assertFalse(future.isDone)
   }
 
-  private def testPartitionError(tp: TopicPartition, error: Errors): 
AlterIsrManager = {
+  private def testPartitionError(tp: TopicPartition, error: Errors): 
AlterPartitionManager = {
     val callbackCapture: ArgumentCaptor[ControllerRequestCompletionHandler] = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
     reset(brokerToController)
 
     val scheduler = new MockScheduler(time)
-    val alterIsrManager = new DefaultAlterIsrManager(brokerToController, 
scheduler, time, brokerId, () => 2, KAFKA_3_2_IV0)
+    val alterIsrManager = new DefaultAlterPartitionManager(brokerToController, 
scheduler, time, brokerId, () => 2, KAFKA_3_2_IV0)
     alterIsrManager.start()
 
     val future = alterIsrManager.submit(tp, LeaderAndIsr(1, 1, List(1,2,3), 
LeaderRecoveryState.RECOVERED, 10), 0)
@@ -293,7 +293,7 @@ class AlterIsrManagerTest {
     val callbackCapture: ArgumentCaptor[ControllerRequestCompletionHandler] = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
 
     val scheduler = new MockScheduler(time)
-    val alterIsrManager = new DefaultAlterIsrManager(brokerToController, 
scheduler, time, brokerId, () => 2, apiVersion)
+    val alterIsrManager = new DefaultAlterPartitionManager(brokerToController, 
scheduler, time, brokerId, () => 2, apiVersion)
     alterIsrManager.start()
 
     // First submit will send the request
@@ -322,7 +322,7 @@ class AlterIsrManagerTest {
 
     val brokerEpoch = 2
     val scheduler = new MockScheduler(time)
-    val alterIsrManager = new DefaultAlterIsrManager(brokerToController, 
scheduler, time, brokerId, () => brokerEpoch, apiVersion)
+    val alterIsrManager = new DefaultAlterPartitionManager(brokerToController, 
scheduler, time, brokerId, () => brokerEpoch, apiVersion)
     alterIsrManager.start()
 
     def matchesAlterIsr(topicPartitions: Set[TopicPartition]): 
AbstractRequest.Builder[_ <: AbstractRequest] = {
@@ -395,7 +395,7 @@ class AlterIsrManagerTest {
       .when(kafkaZkClient)
       .conditionalUpdatePath(anyString(), any(), ArgumentMatchers.eq(3), any())
 
-    val zkIsrManager = new ZkIsrManager(scheduler, time, kafkaZkClient)
+    val zkIsrManager = new ZkAlterPartitionManager(scheduler, time, 
kafkaZkClient)
     zkIsrManager.start()
 
     // Correct ZK version
@@ -421,7 +421,7 @@ class AlterIsrManagerTest {
   }
 }
 
-object AlterIsrManagerTest {
+object AlterPartitionManagerTest {
   def provideApiVersions(): JStream[ApiVersion] = {
     JStream.of(
       // Supports KIP-704: unclean leader recovery
diff --git 
a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index e7c5372d6a..6a862bd2fb 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -72,7 +72,7 @@ class HighwatermarkPersistenceTest {
       quotaManagers = quotaManager,
       metadataCache = MetadataCache.zkMetadataCache(configs.head.brokerId),
       logDirFailureChannel = logDirFailureChannels.head,
-      alterIsrManager = alterIsrManager)
+      alterPartitionManager = alterIsrManager)
     replicaManager.startup()
     try {
       replicaManager.checkpointHighWatermarks()
@@ -129,7 +129,7 @@ class HighwatermarkPersistenceTest {
       quotaManagers = quotaManager,
       metadataCache = MetadataCache.zkMetadataCache(configs.head.brokerId),
       logDirFailureChannel = logDirFailureChannels.head,
-      alterIsrManager = alterIsrManager)
+      alterPartitionManager = alterIsrManager)
     replicaManager.startup()
     try {
       replicaManager.checkpointHighWatermarks()
diff --git a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala 
b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
index 6a68217f94..15ae7a7e8e 100644
--- a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
@@ -22,7 +22,7 @@ import java.util.Properties
 import kafka.cluster.Partition
 import kafka.log.{LogManager, UnifiedLog}
 import kafka.server.QuotaFactory.QuotaManagers
-import kafka.utils.TestUtils.MockAlterIsrManager
+import kafka.utils.TestUtils.MockAlterPartitionManager
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
@@ -55,7 +55,7 @@ class IsrExpirationTest {
   var quotaManager: QuotaManagers = null
   var replicaManager: ReplicaManager = null
 
-  var alterIsrManager: MockAlterIsrManager = _
+  var alterIsrManager: MockAlterPartitionManager = _
 
   @BeforeEach
   def setUp(): Unit = {
@@ -73,7 +73,7 @@ class IsrExpirationTest {
       quotaManagers = quotaManager,
       metadataCache = MetadataCache.zkMetadataCache(configs.head.brokerId),
       logDirFailureChannel = new 
LogDirFailureChannel(configs.head.logDirs.size),
-      alterIsrManager = alterIsrManager)
+      alterPartitionManager = alterIsrManager)
   }
 
   @AfterEach
diff --git a/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
index 79bfc241fd..89cbd04fea 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
@@ -116,8 +116,8 @@ class KafkaServerTest extends QuorumTestHarness {
     props.put(KafkaConfig.InterBrokerProtocolVersionProp, "2.7-IV1")
 
     val server = TestUtils.createServer(KafkaConfig.fromProps(props))
-    server.replicaManager.alterIsrManager match {
-      case _: ZkIsrManager =>
+    server.replicaManager.alterPartitionManager match {
+      case _: ZkAlterPartitionManager =>
       case _ => fail("Should use ZK for ISR manager in versions before 
2.7-IV2")
     }
     server.shutdown()
@@ -129,8 +129,8 @@ class KafkaServerTest extends QuorumTestHarness {
     props.put(KafkaConfig.InterBrokerProtocolVersionProp, 
ApiVersion.latestVersion.toString)
 
     val server = TestUtils.createServer(KafkaConfig.fromProps(props))
-    server.replicaManager.alterIsrManager match {
-      case _: DefaultAlterIsrManager =>
+    server.replicaManager.alterPartitionManager match {
+      case _: DefaultAlterPartitionManager =>
       case _ => fail("Should use AlterIsr for ISR manager in versions after 
2.7-IV2")
     }
     server.shutdown()
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
index 34e87958f5..d281788ab8 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
@@ -170,7 +170,7 @@ class ReplicaManagerConcurrencyTest {
       quotaManagers = QuotaFactory.instantiate(config, metrics, time, ""),
       metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId),
       logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
-      alterIsrManager = new MockAlterIsrManager(channel)
+      alterPartitionManager = new MockAlterPartitionManager(channel)
     ) {
       override def createReplicaFetcherManager(
         metrics: Metrics,
@@ -427,7 +427,7 @@ class ReplicaManagerConcurrencyTest {
     }
   }
 
-  private class MockAlterIsrManager(channel: ControllerChannel) extends 
AlterIsrManager {
+  private class MockAlterPartitionManager(channel: ControllerChannel) extends 
AlterPartitionManager {
     override def submit(
       topicPartition: TopicPartition,
       leaderAndIsr: LeaderAndIsr,
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index 658575aca6..d7b69bc61d 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -308,7 +308,7 @@ class ReplicaManagerQuotasTest {
     when(logManager.getLog(any[TopicPartition], 
anyBoolean)).thenReturn(Some(log))
     when(logManager.liveLogDirs).thenReturn(Array.empty[File])
 
-    val alterIsrManager: AlterIsrManager = mock(classOf[AlterIsrManager])
+    val alterIsrManager: AlterPartitionManager = 
mock(classOf[AlterPartitionManager])
 
     val leaderBrokerId = configs.head.brokerId
     quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "")
@@ -321,7 +321,7 @@ class ReplicaManagerQuotasTest {
       quotaManagers = quotaManager,
       metadataCache = MetadataCache.zkMetadataCache(leaderBrokerId),
       logDirFailureChannel = new 
LogDirFailureChannel(configs.head.logDirs.size),
-      alterIsrManager = alterIsrManager)
+      alterPartitionManager = alterIsrManager)
 
     //create the two replicas
     for ((p, _) <- fetchInfo) {
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 6d01f59259..6c17503a61 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -76,7 +76,7 @@ class ReplicaManagerTest {
   val time = new MockTime
   val scheduler = new MockScheduler(time)
   val metrics = new Metrics
-  var alterIsrManager: AlterIsrManager = _
+  var alterPartitionManager: AlterPartitionManager = _
   var config: KafkaConfig = _
   var quotaManager: QuotaManagers = _
 
@@ -90,7 +90,7 @@ class ReplicaManagerTest {
   def setUp(): Unit = {
     val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
     config = KafkaConfig.fromProps(props)
-    alterIsrManager = mock(classOf[AlterIsrManager])
+    alterPartitionManager = mock(classOf[AlterPartitionManager])
     quotaManager = QuotaFactory.instantiate(config, metrics, time, "")
   }
 
@@ -113,7 +113,7 @@ class ReplicaManagerTest {
       quotaManagers = quotaManager,
       metadataCache = MetadataCache.zkMetadataCache(config.brokerId),
       logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
-      alterIsrManager = alterIsrManager)
+      alterPartitionManager = alterPartitionManager)
     try {
       val partition = rm.createPartition(new TopicPartition(topic, 1))
       partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
@@ -140,7 +140,7 @@ class ReplicaManagerTest {
       quotaManagers = quotaManager,
       metadataCache = MetadataCache.zkMetadataCache(config.brokerId),
       logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
-      alterIsrManager = alterIsrManager)
+      alterPartitionManager = alterPartitionManager)
     try {
       val partition = rm.createPartition(new TopicPartition(topic, 1))
       partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
@@ -164,7 +164,7 @@ class ReplicaManagerTest {
       quotaManagers = quotaManager,
       metadataCache = MetadataCache.zkMetadataCache(config.brokerId),
       logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
-      alterIsrManager = alterIsrManager,
+      alterPartitionManager = alterPartitionManager,
       threadNamePrefix = Option(this.getClass.getName))
     try {
       def callback(responseStatus: Map[TopicPartition, PartitionResponse]): 
Unit = {
@@ -219,7 +219,7 @@ class ReplicaManagerTest {
       quotaManagers = quotaManager,
       metadataCache = metadataCache,
       logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
-      alterIsrManager = alterIsrManager)
+      alterPartitionManager = alterPartitionManager)
 
     try {
       val brokerList = Seq[Integer](0, 1).asJava
@@ -2021,7 +2021,7 @@ class ReplicaManagerTest {
       brokerTopicStats = mockBrokerTopicStats,
       metadataCache = metadataCache,
       logDirFailureChannel = mockLogDirFailureChannel,
-      alterIsrManager = alterIsrManager,
+      alterPartitionManager = alterPartitionManager,
       delayedProducePurgatoryParam = Some(mockProducePurgatory),
       delayedFetchPurgatoryParam = Some(mockFetchPurgatory),
       delayedDeleteRecordsPurgatoryParam = Some(mockDeleteRecordsPurgatory),
@@ -2216,7 +2216,7 @@ class ReplicaManagerTest {
       quotaManagers = quotaManager,
       metadataCache = metadataCache,
       logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
-      alterIsrManager = alterIsrManager,
+      alterPartitionManager = alterPartitionManager,
       delayedProducePurgatoryParam = Some(mockProducePurgatory),
       delayedFetchPurgatoryParam = Some(mockFetchPurgatory),
       delayedDeleteRecordsPurgatoryParam = Some(mockDeleteRecordsPurgatory),
@@ -2461,7 +2461,7 @@ class ReplicaManagerTest {
       brokerTopicStats = brokerTopicStats1,
       metadataCache = metadataCache0,
       logDirFailureChannel = new LogDirFailureChannel(config0.logDirs.size),
-      alterIsrManager = alterIsrManager)
+      alterPartitionManager = alterPartitionManager)
     val rm1 = new ReplicaManager(
       metrics = metrics,
       config = config1,
@@ -2472,7 +2472,7 @@ class ReplicaManagerTest {
       brokerTopicStats = brokerTopicStats2,
       metadataCache = metadataCache1,
       logDirFailureChannel = new LogDirFailureChannel(config1.logDirs.size),
-      alterIsrManager = alterIsrManager)
+      alterPartitionManager = alterPartitionManager)
 
     (rm0, rm1)
   }
@@ -2722,7 +2722,7 @@ class ReplicaManagerTest {
         quotaManagers = quotaManager,
         metadataCache = MetadataCache.zkMetadataCache(config.brokerId),
         logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
-        alterIsrManager = alterIsrManager) {
+        alterPartitionManager = alterPartitionManager) {
         override def getPartitionOrException(topicPartition: TopicPartition): 
Partition = {
           throw Errors.NOT_LEADER_OR_FOLLOWER.exception()
         }
diff --git 
a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala 
b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
index 44e17ebb4c..f4e9520745 100644
--- 
a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
@@ -72,7 +72,7 @@ class OffsetsForLeaderEpochTest {
       quotaManagers = quotaManager,
       metadataCache = MetadataCache.zkMetadataCache(config.brokerId),
       logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
-      alterIsrManager = alterIsrManager)
+      alterPartitionManager = alterIsrManager)
     val partition = replicaManager.createPartition(tp)
     partition.setLog(mockLog, isFutureLog = false)
     partition.leaderReplicaIdOpt = Some(config.brokerId)
@@ -101,7 +101,7 @@ class OffsetsForLeaderEpochTest {
       quotaManagers = quotaManager,
       metadataCache = MetadataCache.zkMetadataCache(config.brokerId),
       logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
-      alterIsrManager = alterIsrManager)
+      alterPartitionManager = alterIsrManager)
     replicaManager.createPartition(tp)
 
     //Given
@@ -132,7 +132,7 @@ class OffsetsForLeaderEpochTest {
       quotaManagers = quotaManager,
       metadataCache = MetadataCache.zkMetadataCache(config.brokerId),
       logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
-      alterIsrManager = alterIsrManager)
+      alterPartitionManager = alterIsrManager)
 
     //Given
     val epochRequested: Integer = 5
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index eecdb10d2b..877b3b2a23 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -32,7 +32,7 @@ import java.util.{Arrays, Collections, Optional, Properties}
 import com.yammer.metrics.core.{Gauge, Meter}
 import javax.net.ssl.X509TrustManager
 import kafka.api._
-import kafka.cluster.{Broker, EndPoint, IsrChangeListener}
+import kafka.cluster.{Broker, EndPoint, AlterPartitionListener}
 import kafka.controller.{ControllerEventManager, LeaderIsrAndControllerEpoch}
 import kafka.log._
 import kafka.network.RequestChannel
@@ -1254,8 +1254,8 @@ object TestUtils extends Logging {
                    interBrokerProtocolVersion = interBrokerProtocolVersion)
   }
 
-  class MockAlterIsrManager extends AlterIsrManager {
-    val isrUpdates: mutable.Queue[AlterIsrItem] = new 
mutable.Queue[AlterIsrItem]()
+  class MockAlterPartitionManager extends AlterPartitionManager {
+    val isrUpdates: mutable.Queue[AlterPartitionItem] = new 
mutable.Queue[AlterPartitionItem]()
     val inFlight: AtomicBoolean = new AtomicBoolean(false)
 
 
@@ -1266,7 +1266,7 @@ object TestUtils extends Logging {
     ): CompletableFuture[LeaderAndIsr]= {
       val future = new CompletableFuture[LeaderAndIsr]()
       if (inFlight.compareAndSet(false, true)) {
-        isrUpdates += AlterIsrItem(topicPartition, leaderAndIsr, future, 
controllerEpoch)
+        isrUpdates += AlterPartitionItem(topicPartition, leaderAndIsr, future, 
controllerEpoch)
       } else {
         future.completeExceptionally(new OperationNotAttemptedException(
           s"Failed to enqueue AlterIsr request for $topicPartition since there 
is already an inflight request"))
@@ -1293,18 +1293,18 @@ object TestUtils extends Logging {
     }
   }
 
-  def createAlterIsrManager(): MockAlterIsrManager = {
-    new MockAlterIsrManager()
+  def createAlterIsrManager(): MockAlterPartitionManager = {
+    new MockAlterPartitionManager()
   }
 
-  class MockIsrChangeListener extends IsrChangeListener {
+  class MockAlterPartitionListener extends AlterPartitionListener {
     val expands: AtomicInteger = new AtomicInteger(0)
     val shrinks: AtomicInteger = new AtomicInteger(0)
     val failures: AtomicInteger = new AtomicInteger(0)
 
-    override def markExpand(): Unit = expands.incrementAndGet()
+    override def markIsrExpand(): Unit = expands.incrementAndGet()
 
-    override def markShrink(): Unit = shrinks.incrementAndGet()
+    override def markIsrShrink(): Unit = shrinks.incrementAndGet()
 
     override def markFailed(): Unit = failures.incrementAndGet()
 
@@ -1315,8 +1315,8 @@ object TestUtils extends Logging {
     }
   }
 
-  def createIsrChangeListener(): MockIsrChangeListener = {
-    new MockIsrChangeListener()
+  def createIsrChangeListener(): MockAlterPartitionListener = {
+    new MockAlterPartitionListener()
   }
 
   def produceMessages[B <: KafkaBroker](
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index a857b1164c..1dec000f3a 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -21,14 +21,14 @@ import kafka.api.ApiVersion;
 import kafka.api.ApiVersion$;
 import kafka.cluster.BrokerEndPoint;
 import kafka.cluster.DelayedOperations;
-import kafka.cluster.IsrChangeListener;
+import kafka.cluster.AlterPartitionListener;
 import kafka.cluster.Partition;
 import kafka.log.CleanerConfig;
 import kafka.log.Defaults;
 import kafka.log.LogAppendInfo;
 import kafka.log.LogConfig;
 import kafka.log.LogManager;
-import kafka.server.AlterIsrManager;
+import kafka.server.AlterPartitionManager;
 import kafka.server.BrokerTopicStats;
 import kafka.server.FailedPartitions;
 import kafka.server.InitialFetchState;
@@ -170,12 +170,12 @@ public class ReplicaFetcherThreadBenchmark {
                     .setReplicas(replicas)
                     .setIsNew(true);
 
-            IsrChangeListener isrChangeListener = 
Mockito.mock(IsrChangeListener.class);
+            AlterPartitionListener alterPartitionListener = 
Mockito.mock(AlterPartitionListener.class);
             OffsetCheckpoints offsetCheckpoints = 
Mockito.mock(OffsetCheckpoints.class);
             Mockito.when(offsetCheckpoints.fetch(logDir.getAbsolutePath(), 
tp)).thenReturn(Option.apply(0L));
-            AlterIsrManager isrChannelManager = 
Mockito.mock(AlterIsrManager.class);
+            AlterPartitionManager isrChannelManager = 
Mockito.mock(AlterPartitionManager.class);
             Partition partition = new Partition(tp, 100, 
ApiVersion$.MODULE$.latestVersion(),
-                    0, Time.SYSTEM, isrChangeListener, new 
DelayedOperationsMock(tp),
+                    0, Time.SYSTEM, alterPartitionListener, new 
DelayedOperationsMock(tp),
                     Mockito.mock(MetadataCache.class), logManager, 
isrChannelManager);
 
             partition.makeFollower(partitionState, offsetCheckpoints, topicId);
@@ -227,7 +227,7 @@ public class ReplicaFetcherThreadBenchmark {
             setBrokerTopicStats(brokerTopicStats).
             setMetadataCache(metadataCache).
             setLogDirFailureChannel(new LogDirFailureChannel(logDirs.size())).
-            setAlterIsrManager(TestUtils.createAlterIsrManager()).
+            setAlterPartitionManager(TestUtils.createAlterIsrManager()).
             build();
         fetcher = new ReplicaFetcherBenchThread(config, replicaManager, pool);
         fetcher.addPartitions(initialFetchStates);
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
index e883020936..1bc695ecb1 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
@@ -20,13 +20,13 @@ package org.apache.kafka.jmh.partition;
 import kafka.api.ApiVersion;
 import kafka.api.ApiVersion$;
 import kafka.cluster.DelayedOperations;
-import kafka.cluster.IsrChangeListener;
+import kafka.cluster.AlterPartitionListener;
 import kafka.cluster.Partition;
 import kafka.log.CleanerConfig;
 import kafka.log.Defaults;
 import kafka.log.LogConfig;
 import kafka.log.LogManager;
-import kafka.server.AlterIsrManager;
+import kafka.server.AlterPartitionManager;
 import kafka.server.BrokerTopicStats;
 import kafka.server.LogDirFailureChannel;
 import kafka.server.MetadataCache;
@@ -122,12 +122,12 @@ public class PartitionMakeFollowerBenchmark {
         topicId = OptionConverters.toScala(Optional.of(Uuid.randomUuid()));
 
         Mockito.when(offsetCheckpoints.fetch(logDir.getAbsolutePath(), 
tp)).thenReturn(Option.apply(0L));
-        IsrChangeListener isrChangeListener = 
Mockito.mock(IsrChangeListener.class);
-        AlterIsrManager alterIsrManager = Mockito.mock(AlterIsrManager.class);
+        AlterPartitionListener alterPartitionListener = 
Mockito.mock(AlterPartitionListener.class);
+        AlterPartitionManager alterPartitionManager = 
Mockito.mock(AlterPartitionManager.class);
         partition = new Partition(tp, 100,
             ApiVersion$.MODULE$.latestVersion(), 0, Time.SYSTEM,
-            isrChangeListener, delayedOperations,
-            Mockito.mock(MetadataCache.class), logManager, alterIsrManager);
+            alterPartitionListener, delayedOperations,
+            Mockito.mock(MetadataCache.class), logManager, 
alterPartitionManager);
         partition.createLogIfNotExists(true, false, offsetCheckpoints, 
topicId);
         executorService.submit((Runnable) () -> {
             SimpleRecord[] simpleRecords = new SimpleRecord[] {
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
index 6470125b12..cf7201d4c8 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
@@ -20,13 +20,13 @@ package org.apache.kafka.jmh.partition;
 import kafka.api.ApiVersion;
 import kafka.api.ApiVersion$;
 import kafka.cluster.DelayedOperations;
-import kafka.cluster.IsrChangeListener;
+import kafka.cluster.AlterPartitionListener;
 import kafka.cluster.Partition;
 import kafka.log.CleanerConfig;
 import kafka.log.Defaults;
 import kafka.log.LogConfig;
 import kafka.log.LogManager;
-import kafka.server.AlterIsrManager;
+import kafka.server.AlterPartitionManager;
 import kafka.server.BrokerTopicStats;
 import kafka.server.LogDirFailureChannel;
 import kafka.server.LogOffsetMetadata;
@@ -121,12 +121,12 @@ public class UpdateFollowerFetchStateBenchmark {
             .setPartitionEpoch(1)
             .setReplicas(replicas)
             .setIsNew(true);
-        IsrChangeListener isrChangeListener = 
Mockito.mock(IsrChangeListener.class);
-        AlterIsrManager alterIsrManager = Mockito.mock(AlterIsrManager.class);
+        AlterPartitionListener alterPartitionListener = 
Mockito.mock(AlterPartitionListener.class);
+        AlterPartitionManager alterPartitionManager = 
Mockito.mock(AlterPartitionManager.class);
         partition = new Partition(topicPartition, 100,
                 ApiVersion$.MODULE$.latestVersion(), 0, Time.SYSTEM,
-                isrChangeListener, delayedOperations,
-                Mockito.mock(MetadataCache.class), logManager, 
alterIsrManager);
+                alterPartitionListener, delayedOperations,
+                Mockito.mock(MetadataCache.class), logManager, 
alterPartitionManager);
         partition.makeLeader(partitionState, offsetCheckpoints, topicId);
     }
 
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
index 21a8086e4d..919179ac3b 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
@@ -21,7 +21,7 @@ import kafka.cluster.Partition;
 import kafka.log.CleanerConfig;
 import kafka.log.LogConfig;
 import kafka.log.LogManager;
-import kafka.server.AlterIsrManager;
+import kafka.server.AlterPartitionManager;
 import kafka.server.BrokerTopicStats;
 import kafka.server.KafkaConfig;
 import kafka.server.LogDirFailureChannel;
@@ -88,7 +88,7 @@ public class CheckpointBench {
     private QuotaFactory.QuotaManagers quotaManagers;
     private LogDirFailureChannel failureChannel;
     private LogManager logManager;
-    private AlterIsrManager alterIsrManager;
+    private AlterPartitionManager alterPartitionManager;
 
 
     @SuppressWarnings("deprecation")
@@ -117,7 +117,7 @@ public class CheckpointBench {
                         this.metrics,
                         this.time, "");
 
-        this.alterIsrManager = TestUtils.createAlterIsrManager();
+        this.alterPartitionManager = TestUtils.createAlterIsrManager();
         this.replicaManager = new ReplicaManagerBuilder().
             setConfig(brokerProperties).
             setMetrics(metrics).
@@ -128,7 +128,7 @@ public class CheckpointBench {
             setBrokerTopicStats(brokerTopicStats).
             setMetadataCache(metadataCache).
             setLogDirFailureChannel(failureChannel).
-            setAlterIsrManager(alterIsrManager).
+            setAlterPartitionManager(alterPartitionManager).
             build();
         replicaManager.startup();
 
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
index e344f7d7ae..ac9a7f4c54 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
@@ -22,7 +22,7 @@ import kafka.log.CleanerConfig;
 import kafka.log.Defaults;
 import kafka.log.LogConfig;
 import kafka.log.LogManager;
-import kafka.server.AlterIsrManager;
+import kafka.server.AlterPartitionManager;
 import kafka.server.BrokerTopicStats;
 import kafka.server.KafkaConfig;
 import kafka.server.LogDirFailureChannel;
@@ -94,7 +94,7 @@ public class PartitionCreationBench {
     private KafkaZkClient zkClient;
     private LogDirFailureChannel failureChannel;
     private LogManager logManager;
-    private AlterIsrManager alterIsrManager;
+    private AlterPartitionManager alterPartitionManager;
     private List<TopicPartition> topicPartitions;
 
     @SuppressWarnings("deprecation")
@@ -149,7 +149,7 @@ public class PartitionCreationBench {
                 return new Properties();
             }
         };
-        this.alterIsrManager = TestUtils.createAlterIsrManager();
+        this.alterPartitionManager = TestUtils.createAlterIsrManager();
         this.replicaManager = new ReplicaManagerBuilder().
             setConfig(brokerProperties).
             setMetrics(metrics).
@@ -161,7 +161,7 @@ public class PartitionCreationBench {
             setBrokerTopicStats(brokerTopicStats).
             setMetadataCache(new 
ZkMetadataCache(this.brokerProperties.brokerId())).
             setLogDirFailureChannel(failureChannel).
-            setAlterIsrManager(alterIsrManager).
+            setAlterPartitionManager(alterPartitionManager).
             build();
         replicaManager.startup();
         replicaManager.checkpointHighWatermarks();

Reply via email to