This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d3f19e4  KAFKA-10825 ZooKeeper ISR manager (#9713)
d3f19e4 is described below

commit d3f19e4bb047338840d380d2253dc92cf56b0a0f
Author: David Arthur <[email protected]>
AuthorDate: Mon Dec 21 14:44:02 2020 -0500

    KAFKA-10825 ZooKeeper ISR manager (#9713)
    
    ISR-related cleanup in ReplicaManager and Partition. Removes ISR change 
logic from ReplicaManager and adds a new ZkIsrManager class which adheres to a 
new AlterIsrManager trait. Unifies all of the ISR logic in Partition so we 
don't have separate code paths for ZK vs AlterIsr. Also removes 
PartitionStateStore
---
 core/src/main/scala/kafka/cluster/Partition.scala  | 138 +++++----------------
 .../main/scala/kafka/server/AlterIsrManager.scala  |  48 +++++--
 core/src/main/scala/kafka/server/KafkaServer.scala |   8 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |  61 +--------
 .../src/main/scala/kafka/server/ZkIsrManager.scala | 109 ++++++++++++++++
 .../admin/ReassignPartitionsIntegrationTest.scala  |   5 +-
 .../unit/kafka/cluster/AbstractPartitionTest.scala |   6 +-
 .../unit/kafka/cluster/PartitionLockTest.scala     |  13 +-
 .../scala/unit/kafka/cluster/PartitionTest.scala   | 121 ++++++++++++++----
 .../unit/kafka/server/AlterIsrManagerTest.scala    |  93 +++++++++-----
 .../scala/unit/kafka/server/KafkaServerTest.scala  |  31 ++++-
 .../unit/kafka/server/ReplicaManagerTest.scala     |   7 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  47 +++++--
 .../jmh/fetcher/ReplicaFetcherThreadBenchmark.java |   5 +-
 .../partition/PartitionMakeFollowerBenchmark.java  |   5 +-
 .../UpdateFollowerFetchStateBenchmark.java         |   5 +-
 .../apache/kafka/jmh/server/CheckpointBench.java   |   4 -
 17 files changed, 427 insertions(+), 279 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index c0a7d76..d045c4f 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -28,7 +28,7 @@ import kafka.server._
 import kafka.server.checkpoints.OffsetCheckpoints
 import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils._
-import kafka.zk.{AdminZkClient, KafkaZkClient}
+import kafka.zk.AdminZkClient
 import kafka.zookeeper.ZooKeeperClientException
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.message.FetchResponseData
@@ -51,40 +51,8 @@ trait IsrChangeListener {
   def markFailed(): Unit
 }
 
-trait PartitionStateStore {
-  def fetchTopicConfig(): Properties
-  def shrinkIsr(controllerEpoch: Int, leaderAndIsr: LeaderAndIsr): Option[Int]
-  def expandIsr(controllerEpoch: Int, leaderAndIsr: LeaderAndIsr): Option[Int]
-}
-
-class ZkPartitionStateStore(topicPartition: TopicPartition,
-                            zkClient: KafkaZkClient) extends 
PartitionStateStore {
-
-  override def fetchTopicConfig(): Properties = {
-    val adminZkClient = new AdminZkClient(zkClient)
-    adminZkClient.fetchEntityConfig(ConfigType.Topic, topicPartition.topic)
-  }
-
-  override def shrinkIsr(controllerEpoch: Int, leaderAndIsr: LeaderAndIsr): 
Option[Int] = {
-    val newVersionOpt = updateIsr(controllerEpoch, leaderAndIsr)
-    newVersionOpt
-  }
-
-  override def expandIsr(controllerEpoch: Int, leaderAndIsr: LeaderAndIsr): 
Option[Int] = {
-    val newVersionOpt = updateIsr(controllerEpoch, leaderAndIsr)
-    newVersionOpt
-  }
-
-  private def updateIsr(controllerEpoch: Int, leaderAndIsr: LeaderAndIsr): 
Option[Int] = {
-    val (updateSucceeded, newVersion) = 
ReplicationUtils.updateLeaderAndIsr(zkClient, topicPartition,
-      leaderAndIsr, controllerEpoch)
-
-    if (updateSucceeded) {
-      Some(newVersion)
-    } else {
-      None
-    }
-  }
+trait TopicConfigFetcher {
+  def fetch(): Properties
 }
 
 class DelayedOperations(topicPartition: TopicPartition,
@@ -109,21 +77,22 @@ object Partition extends KafkaMetricsGroup {
 
     val isrChangeListener = new IsrChangeListener {
       override def markExpand(): Unit = {
-        replicaManager.recordIsrChange(topicPartition)
         replicaManager.isrExpandRate.mark()
       }
 
       override def markShrink(): Unit = {
-        replicaManager.recordIsrChange(topicPartition)
         replicaManager.isrShrinkRate.mark()
       }
 
       override def markFailed(): Unit = 
replicaManager.failedIsrUpdatesRate.mark()
     }
 
-    val zkIsrBackingStore = new ZkPartitionStateStore(
-      topicPartition,
-      replicaManager.zkClient)
+    val configProvider = new TopicConfigFetcher {
+      override def fetch(): Properties = {
+        val adminZkClient = new AdminZkClient(replicaManager.zkClient)
+        adminZkClient.fetchEntityConfig(ConfigType.Topic, topicPartition.topic)
+      }
+    }
 
     val delayedOperations = new DelayedOperations(
       topicPartition,
@@ -136,7 +105,7 @@ object Partition extends KafkaMetricsGroup {
       interBrokerProtocolVersion = 
replicaManager.config.interBrokerProtocolVersion,
       localBrokerId = replicaManager.config.brokerId,
       time = time,
-      stateStore = zkIsrBackingStore,
+      topicConfigProvider = configProvider,
       isrChangeListener = isrChangeListener,
       delayedOperations = delayedOperations,
       metadataCache = replicaManager.metadataCache,
@@ -259,7 +228,7 @@ class Partition(val topicPartition: TopicPartition,
                 interBrokerProtocolVersion: ApiVersion,
                 localBrokerId: Int,
                 time: Time,
-                stateStore: PartitionStateStore,
+                topicConfigProvider: TopicConfigFetcher,
                 isrChangeListener: IsrChangeListener,
                 delayedOperations: DelayedOperations,
                 metadataCache: MetadataCache,
@@ -285,8 +254,6 @@ class Partition(val topicPartition: TopicPartition,
   @volatile private[cluster] var isrState: IsrState = CommittedIsr(Set.empty)
   @volatile var assignmentState: AssignmentState = 
SimpleAssignmentState(Seq.empty)
 
-  private val useAlterIsr: Boolean = 
interBrokerProtocolVersion.isAlterIsrSupported
-
   // Logs belonging to this partition. Majority of time it will be only one 
log, but if log directory
   // is getting changed (as a result of ReplicaAlterLogDirs command), we may 
have two logs until copy
   // completes and a switch to new location is performed.
@@ -375,7 +342,7 @@ class Partition(val topicPartition: TopicPartition,
   // Visible for testing
   private[cluster] def createLog(isNew: Boolean, isFutureReplica: Boolean, 
offsetCheckpoints: OffsetCheckpoints): Log = {
     def fetchLogConfig: LogConfig = {
-      val props = stateStore.fetchTopicConfig()
+      val props = topicConfigProvider.fetch()
       LogConfig.fromProps(logManager.currentDefaultConfig.originals, props)
     }
 
@@ -1325,14 +1292,6 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   private[cluster] def expandIsr(newInSyncReplica: Int): Unit = {
-    if (useAlterIsr) {
-      expandIsrWithAlterIsr(newInSyncReplica)
-    } else {
-      expandIsrWithZk(newInSyncReplica)
-    }
-  }
-
-  private def expandIsrWithAlterIsr(newInSyncReplica: Int): Unit = {
     // This is called from maybeExpandIsr which holds the ISR write lock
     if (!isrState.isInflight) {
       // When expanding the ISR, we can safely assume the new replica will 
make it into the ISR since this puts us in
@@ -1343,26 +1302,7 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  private def expandIsrWithZk(newInSyncReplica: Int): Unit = {
-    val newInSyncReplicaIds = isrState.isr + newInSyncReplica
-    info(s"Expanding ISR from ${isrState.isr.mkString(",")} to 
${newInSyncReplicaIds.mkString(",")}")
-    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
newInSyncReplicaIds.toList, zkVersion)
-    val zkVersionOpt = stateStore.expandIsr(controllerEpoch, newLeaderAndIsr)
-    if (zkVersionOpt.isDefined) {
-      isrChangeListener.markExpand()
-    }
-    maybeUpdateIsrAndVersionWithZk(newInSyncReplicaIds, zkVersionOpt)
-  }
-
   private[cluster] def shrinkIsr(outOfSyncReplicas: Set[Int]): Unit = {
-    if (useAlterIsr) {
-      shrinkIsrWithAlterIsr(outOfSyncReplicas)
-    } else {
-      shrinkIsrWithZk(isrState.isr -- outOfSyncReplicas)
-    }
-  }
-
-  private def shrinkIsrWithAlterIsr(outOfSyncReplicas: Set[Int]): Unit = {
     // This is called from maybeShrinkIsr which holds the ISR write lock
     if (!isrState.isInflight) {
       // When shrinking the ISR, we cannot assume that the update will succeed 
as this could erroneously advance the HW
@@ -1374,47 +1314,29 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  private def shrinkIsrWithZk(newIsr: Set[Int]): Unit = {
-    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
newIsr.toList, zkVersion)
-    val zkVersionOpt = stateStore.shrinkIsr(controllerEpoch, newLeaderAndIsr)
-    if (zkVersionOpt.isDefined) {
-      isrChangeListener.markShrink()
-    }
-    maybeUpdateIsrAndVersionWithZk(newIsr, zkVersionOpt)
-  }
-
-  private def maybeUpdateIsrAndVersionWithZk(isr: Set[Int], zkVersionOpt: 
Option[Int]): Unit = {
-    zkVersionOpt match {
-      case Some(newVersion) =>
-        isrState = CommittedIsr(isr)
-        zkVersion = newVersion
-        info("ISR updated to [%s] and zkVersion updated to 
[%d]".format(isr.mkString(","), zkVersion))
-
-      case None =>
-        info(s"Cached zkVersion $zkVersion not equal to that in zookeeper, 
skip updating ISR")
-        isrChangeListener.markFailed()
-    }
-  }
-
   private def sendAlterIsrRequest(proposedIsrState: IsrState): Unit = {
     val isrToSend: Set[Int] = proposedIsrState match {
       case PendingExpandIsr(isr, newInSyncReplicaId) => isr + 
newInSyncReplicaId
       case PendingShrinkIsr(isr, outOfSyncReplicaIds) => isr -- 
outOfSyncReplicaIds
       case state =>
-        throw new IllegalStateException(s"Invalid state $state for `AlterIsr` 
request for partition $topicPartition")
+        isrChangeListener.markFailed()
+        throw new IllegalStateException(s"Invalid state $state for ISR change 
for partition $topicPartition")
     }
 
     val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
isrToSend.toList, zkVersion)
-    val alterIsrItem = AlterIsrItem(topicPartition, newLeaderAndIsr, 
handleAlterIsrResponse(proposedIsrState))
+    val alterIsrItem = AlterIsrItem(topicPartition, newLeaderAndIsr, 
handleAlterIsrResponse(proposedIsrState), controllerEpoch)
 
-    if (!alterIsrManager.enqueue(alterIsrItem)) {
+    val oldState = isrState
+    isrState = proposedIsrState
+
+    if (!alterIsrManager.submit(alterIsrItem)) {
+      // If the ISR manager did not accept our update, we need to revert back 
to previous state
+      isrState = oldState
       isrChangeListener.markFailed()
-      throw new IllegalStateException(s"Failed to enqueue `AlterIsr` request 
with state " +
-        s"$newLeaderAndIsr for partition $topicPartition")
+      throw new IllegalStateException(s"Failed to enqueue ISR change state 
$newLeaderAndIsr for partition $topicPartition")
     }
 
-    isrState = proposedIsrState
-    debug(s"Sent `AlterIsr` request to change state to $newLeaderAndIsr after 
transition to $proposedIsrState")
+    debug(s"Enqueued ISR change to state $newLeaderAndIsr after transition to 
$proposedIsrState")
   }
 
   /**
@@ -1438,27 +1360,27 @@ class Partition(val topicPartition: TopicPartition,
           isrChangeListener.markFailed()
           error match {
             case Errors.UNKNOWN_TOPIC_OR_PARTITION =>
-              debug(s"Controller failed to update ISR to $proposedIsrState 
since it doesn't know about this topic or partition. Giving up.")
+              debug(s"Failed to update ISR to $proposedIsrState since it 
doesn't know about this topic or partition. Giving up.")
             case Errors.FENCED_LEADER_EPOCH =>
-              debug(s"Controller failed to update ISR to $proposedIsrState 
since we sent an old leader epoch. Giving up.")
+              debug(s"Failed to update ISR to $proposedIsrState since we sent 
an old leader epoch. Giving up.")
             case Errors.INVALID_UPDATE_VERSION =>
-              debug(s"Controller failed to update ISR to $proposedIsrState due 
to invalid zk version. Giving up.")
+              debug(s"Failed to update ISR to $proposedIsrState due to invalid 
zk version. Giving up.")
             case _ =>
-              warn(s"Controller failed to update ISR to $proposedIsrState due 
to unexpected $error. Retrying.")
+              warn(s"Failed to update ISR to $proposedIsrState due to 
unexpected $error. Retrying.")
               sendAlterIsrRequest(proposedIsrState)
           }
         case Right(leaderAndIsr: LeaderAndIsr) =>
           // Success from controller, still need to check a few things
           if (leaderAndIsr.leaderEpoch != leaderEpoch) {
-            debug(s"Ignoring ISR from AlterIsr with ${leaderAndIsr} since we 
have a stale leader epoch $leaderEpoch.")
+            debug(s"Ignoring new ISR ${leaderAndIsr} since we have a stale 
leader epoch $leaderEpoch.")
             isrChangeListener.markFailed()
           } else if (leaderAndIsr.zkVersion <= zkVersion) {
-            debug(s"Ignoring ISR from AlterIsr with ${leaderAndIsr} since we 
have a newer version $zkVersion.")
+            debug(s"Ignoring new ISR ${leaderAndIsr} since we have a newer 
version $zkVersion.")
             isrChangeListener.markFailed()
           } else {
             isrState = CommittedIsr(leaderAndIsr.isr.toSet)
             zkVersion = leaderAndIsr.zkVersion
-            info(s"ISR updated from AlterIsr to ${isrState.isr.mkString(",")} 
and version updated to [$zkVersion]")
+            info(s"ISR updated to ${isrState.isr.mkString(",")} and version 
updated to [$zkVersion]")
             proposedIsrState match {
               case PendingExpandIsr(_, _) => isrChangeListener.markExpand()
               case PendingShrinkIsr(_, _) => isrChangeListener.markShrink()
diff --git a/core/src/main/scala/kafka/server/AlterIsrManager.scala 
b/core/src/main/scala/kafka/server/AlterIsrManager.scala
index 16fe620..fa4616b 100644
--- a/core/src/main/scala/kafka/server/AlterIsrManager.scala
+++ b/core/src/main/scala/kafka/server/AlterIsrManager.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
 import kafka.api.LeaderAndIsr
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
 import org.apache.kafka.clients.ClientResponse
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
@@ -35,24 +36,51 @@ import scala.collection.mutable.ListBuffer
 import scala.jdk.CollectionConverters._
 
 /**
- * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
- * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ * Handles updating the ISR by sending AlterIsr requests to the controller (as 
of 2.7) or by updating ZK directly
+ * (prior to 2.7). Updating the ISR is an asynchronous operation, so 
partitions will learn about the result of their
+ * request through a callback.
+ *
+ * Note that ISR state changes can still be initiated by the controller and 
sent to the partitions via LeaderAndIsr
+ * requests.
  */
 trait AlterIsrManager {
   def start(): Unit
 
-  def enqueue(alterIsrItem: AlterIsrItem): Boolean
+  def submit(alterIsrItem: AlterIsrItem): Boolean
 
   def clearPending(topicPartition: TopicPartition): Unit
 }
 
-case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr, callback: Either[Errors, LeaderAndIsr] => Unit)
+case class AlterIsrItem(topicPartition: TopicPartition,
+                        leaderAndIsr: LeaderAndIsr,
+                        callback: Either[Errors, LeaderAndIsr] => Unit,
+                        controllerEpoch: Int) // controllerEpoch needed for Zk 
impl
+
+object AlterIsrManager {
+  /**
+   * Factory to AlterIsr based implementation, used when IBP >= 2.7-IV2
+   */
+  def apply(controllerChannelManager: BrokerToControllerChannelManager,
+            scheduler: Scheduler,
+            time: Time,
+            brokerId: Int,
+            brokerEpochSupplier: () => Long): AlterIsrManager = {
+    new DefaultAlterIsrManager(controllerChannelManager, scheduler, time, 
brokerId, brokerEpochSupplier)
+  }
+
+  /**
+   * Factory for ZK based implementation, used when IBP < 2.7-IV2
+   */
+  def apply(scheduler: Scheduler, time: Time, zkClient: KafkaZkClient): 
AlterIsrManager = {
+    new ZkIsrManager(scheduler, time, zkClient)
+  }
+}
 
-class AlterIsrManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
-                          val scheduler: Scheduler,
-                          val time: Time,
-                          val brokerId: Int,
-                          val brokerEpochSupplier: () => Long) extends 
AlterIsrManager with Logging with KafkaMetricsGroup {
+class DefaultAlterIsrManager(val controllerChannelManager: 
BrokerToControllerChannelManager,
+                             val scheduler: Scheduler,
+                             val time: Time,
+                             val brokerId: Int,
+                             val brokerEpochSupplier: () => Long) extends 
AlterIsrManager with Logging with KafkaMetricsGroup {
 
   // Used to allow only one pending ISR update per partition
   private val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new 
ConcurrentHashMap[TopicPartition, AlterIsrItem]()
@@ -66,7 +94,7 @@ class AlterIsrManagerImpl(val controllerChannelManager: 
BrokerToControllerChanne
     scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, 
TimeUnit.MILLISECONDS)
   }
 
-  override def enqueue(alterIsrItem: AlterIsrItem): Boolean = {
+  override def submit(alterIsrItem: AlterIsrItem): Boolean = {
     unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == 
null
   }
 
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 3723eae..1e7bb19 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -444,8 +444,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
   }
 
   protected def createReplicaManager(isShuttingDown: AtomicBoolean): 
ReplicaManager = {
-    val alterIsrManager = new AlterIsrManagerImpl(alterIsrChannelManager, 
kafkaScheduler,
-      time, config.brokerId, () => kafkaController.brokerEpoch)
+    val alterIsrManager: AlterIsrManager = if 
(config.interBrokerProtocolVersion.isAlterIsrSupported) {
+      AlterIsrManager(alterIsrChannelManager, kafkaScheduler,
+        time, config.brokerId, () => kafkaController.brokerEpoch)
+    } else {
+      AlterIsrManager(kafkaScheduler, time, zkClient)
+    }
     new ReplicaManager(config, metrics, time, zkClient, kafkaScheduler, 
logManager, isShuttingDown, quotaManagers,
       brokerTopicStats, metadataCache, logDirFailureChannel, alterIsrManager)
   }
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index a3934a5..5da3b69 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -19,7 +19,7 @@ package kafka.server
 import java.io.File
 import java.util.Optional
 import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+import java.util.concurrent.atomic.{AtomicBoolean}
 import java.util.concurrent.locks.Lock
 
 import com.yammer.metrics.core.Meter
@@ -166,26 +166,8 @@ object HostedPartition {
   final object Offline extends HostedPartition
 }
 
-case class IsrChangePropagationConfig(
-  // How often to check for ISR
-  checkIntervalMs: Long,
-
-  // Maximum time that an ISR change may be delayed before sending the 
notification
-  maxDelayMs: Long,
-
-  // Maximum time to await additional changes before sending the notification
-  lingerMs: Long
-)
-
 object ReplicaManager {
   val HighWatermarkFilename = "replication-offset-checkpoint"
-
-  // This field is mutable to allow overriding change notification behavior in 
test cases
-  @volatile var DefaultIsrPropagationConfig: IsrChangePropagationConfig = 
IsrChangePropagationConfig(
-    checkIntervalMs = 2500,
-    lingerMs = 5000,
-    maxDelayMs = 60000,
-  )
 }
 
 class ReplicaManager(val config: KafkaConfig,
@@ -251,11 +233,6 @@ class ReplicaManager(val config: KafkaConfig,
   this.logIdent = s"[ReplicaManager broker=$localBrokerId] "
   private val stateChangeLogger = new StateChangeLogger(localBrokerId, 
inControllerContext = false, None)
 
-  private val isrChangeNotificationConfig = 
ReplicaManager.DefaultIsrPropagationConfig
-  private val isrChangeSet: mutable.Set[TopicPartition] = new 
mutable.HashSet[TopicPartition]()
-  private val lastIsrChangeMs = new AtomicLong(time.milliseconds())
-  private val lastIsrPropagationMs = new AtomicLong(time.milliseconds())
-
   private var logDirFailureHandler: LogDirFailureHandler = null
 
   private class LogDirFailureHandler(name: String, haltBrokerOnDirFailure: 
Boolean) extends ShutdownableThread(name) {
@@ -294,34 +271,6 @@ class ReplicaManager(val config: KafkaConfig,
       scheduler.schedule("highwatermark-checkpoint", checkpointHighWatermarks 
_, period = config.replicaHighWatermarkCheckpointIntervalMs, unit = 
TimeUnit.MILLISECONDS)
   }
 
-  def recordIsrChange(topicPartition: TopicPartition): Unit = {
-    if (!config.interBrokerProtocolVersion.isAlterIsrSupported) {
-      isrChangeSet synchronized {
-        isrChangeSet += topicPartition
-        lastIsrChangeMs.set(time.milliseconds())
-      }
-    }
-  }
-  /**
-   * This function periodically runs to see if ISR needs to be propagated. It 
propagates ISR when:
-   * 1. There is ISR change not propagated yet.
-   * 2. There is no ISR Change in the last five seconds, or it has been more 
than 60 seconds since the last ISR propagation.
-   * This allows an occasional ISR change to be propagated within a few 
seconds, and avoids overwhelming controller and
-   * other brokers when large amount of ISR change occurs.
-   */
-  def maybePropagateIsrChanges(): Unit = {
-    val now = time.milliseconds()
-    isrChangeSet synchronized {
-      if (isrChangeSet.nonEmpty &&
-        (lastIsrChangeMs.get() + isrChangeNotificationConfig.lingerMs < now ||
-          lastIsrPropagationMs.get() + isrChangeNotificationConfig.maxDelayMs 
< now)) {
-        zkClient.propagateIsrChanges(isrChangeSet)
-        isrChangeSet.clear()
-        lastIsrPropagationMs.set(now)
-      }
-    }
-  }
-
   // When ReplicaAlterDirThread finishes replacing a current replica with a 
future replica, it will
   // remove the partition from the partition state map. But it will not close 
itself even if the
   // partition state map is empty. Thus we need to call 
shutdownIdleReplicaAlterDirThread() periodically
@@ -343,14 +292,8 @@ class ReplicaManager(val config: KafkaConfig,
     // start ISR expiration thread
     // A follower can lag behind leader for up to config.replicaLagTimeMaxMs x 
1.5 before it is removed from ISR
     scheduler.schedule("isr-expiration", maybeShrinkIsr _, period = 
config.replicaLagTimeMaxMs / 2, unit = TimeUnit.MILLISECONDS)
-    // If using AlterIsr, we don't need the znode ISR propagation
-    if (!config.interBrokerProtocolVersion.isAlterIsrSupported) {
-      scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges _,
-        period = isrChangeNotificationConfig.checkIntervalMs, unit = 
TimeUnit.MILLISECONDS)
-    } else {
-      alterIsrManager.start()
-    }
     scheduler.schedule("shutdown-idle-replica-alter-log-dirs-thread", 
shutdownIdleReplicaAlterLogDirsThread _, period = 10000L, unit = 
TimeUnit.MILLISECONDS)
+    alterIsrManager.start()
 
     // If inter-broker protocol (IBP) < 1.0, the controller will send 
LeaderAndIsrRequest V0 which does not include isNew field.
     // In this case, the broker receiving the request cannot determine whether 
it is safe to create a partition if a log directory has failed.
diff --git a/core/src/main/scala/kafka/server/ZkIsrManager.scala 
b/core/src/main/scala/kafka/server/ZkIsrManager.scala
new file mode 100644
index 0000000..2d88aac
--- /dev/null
+++ b/core/src/main/scala/kafka/server/ZkIsrManager.scala
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.utils.{Logging, ReplicationUtils, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.utils.Time
+
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicLong
+import scala.collection.mutable
+
+/**
+ * @param checkIntervalMs How often to check for ISR
+ * @param maxDelayMs  Maximum time that an ISR change may be delayed before 
sending the notification
+ * @param lingerMs  Maximum time to await additional changes before sending 
the notification
+ */
+case class IsrChangePropagationConfig(checkIntervalMs: Long, maxDelayMs: Long, 
lingerMs: Long)
+
+object ZkIsrManager {
+  // This field is mutable to allow overriding change notification behavior in 
test cases
+  @volatile var DefaultIsrPropagationConfig: IsrChangePropagationConfig = 
IsrChangePropagationConfig(
+    checkIntervalMs = 2500,
+    lingerMs = 5000,
+    maxDelayMs = 60000,
+  )
+}
+
+class ZkIsrManager(scheduler: Scheduler, time: Time, zkClient: KafkaZkClient) 
extends AlterIsrManager with Logging {
+
+  private val isrChangeNotificationConfig = 
ZkIsrManager.DefaultIsrPropagationConfig
+  // Visible for testing
+  private[server] val isrChangeSet: mutable.Set[TopicPartition] = new 
mutable.HashSet[TopicPartition]()
+  private val lastIsrChangeMs = new AtomicLong(time.milliseconds())
+  private val lastIsrPropagationMs = new AtomicLong(time.milliseconds())
+
+  override def start(): Unit = {
+    scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges _,
+      period = isrChangeNotificationConfig.checkIntervalMs, unit = 
TimeUnit.MILLISECONDS)
+  }
+
+  override def clearPending(topicPartition: TopicPartition): Unit = {
+    // Since we always immediately process ZK updates and never actually 
enqueue anything, there is nothing to
+    // clear here so this is a no-op. Even if there are changes that have not 
been propagated, the write to ZK
+    // has already happened, so we may as well send the notification to the 
controller.
+  }
+
+  override def submit(alterIsrItem: AlterIsrItem): Boolean = {
+    debug(s"Writing new ISR ${alterIsrItem.leaderAndIsr.isr} to ZooKeeper with 
version " +
+      s"${alterIsrItem.leaderAndIsr.zkVersion} for partition 
${alterIsrItem.topicPartition}")
+
+    val (updateSucceeded, newVersion) = 
ReplicationUtils.updateLeaderAndIsr(zkClient, alterIsrItem.topicPartition,
+      alterIsrItem.leaderAndIsr, alterIsrItem.controllerEpoch)
+
+    if (updateSucceeded) {
+      // Track which partitions need to be propagated to the controller
+      isrChangeSet synchronized {
+        isrChangeSet += alterIsrItem.topicPartition
+        lastIsrChangeMs.set(time.milliseconds())
+      }
+
+      // We rely on Partition#isrState being properly set to the pending ISR 
at this point since we are synchronously
+      // applying the callback
+      
alterIsrItem.callback.apply(Right(alterIsrItem.leaderAndIsr.withZkVersion(newVersion)))
+    } else {
+      alterIsrItem.callback.apply(Left(Errors.INVALID_UPDATE_VERSION))
+    }
+
+    // Return true since we unconditionally accept the AlterIsrItem. The 
result of the operation is indicated by the
+    // callback, not the return value of this method
+    true
+  }
+
+  /**
+   * This function periodically runs to see if ISR needs to be propagated. It 
propagates ISR when:
+   * 1. There is ISR change not propagated yet.
+   * 2. There is no ISR Change in the last five seconds, or it has been more 
than 60 seconds since the last ISR propagation.
+   * This allows an occasional ISR change to be propagated within a few 
seconds, and avoids overwhelming controller and
+   * other brokers when large amount of ISR change occurs.
+   */
+  private[server] def maybePropagateIsrChanges(): Unit = {
+    val now = time.milliseconds()
+    isrChangeSet synchronized {
+      if (isrChangeSet.nonEmpty &&
+        (lastIsrChangeMs.get() + isrChangeNotificationConfig.lingerMs < now ||
+          lastIsrPropagationMs.get() + isrChangeNotificationConfig.maxDelayMs 
< now)) {
+        zkClient.propagateIsrChanges(isrChangeSet)
+        isrChangeSet.clear()
+        lastIsrPropagationMs.set(now)
+      }
+    }
+  }
+}
diff --git 
a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
index 6d5d02d..0b108d2 100644
--- 
a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
@@ -19,10 +19,9 @@ package kafka.admin
 
 import java.io.Closeable
 import java.util.{Collections, HashMap, List}
-
 import kafka.admin.ReassignPartitionsCommand._
 import kafka.api.KAFKA_2_7_IV1
-import kafka.server.{IsrChangePropagationConfig, KafkaConfig, KafkaServer, 
ReplicaManager}
+import kafka.server.{IsrChangePropagationConfig, KafkaConfig, KafkaServer, 
ZkIsrManager}
 import kafka.utils.Implicits._
 import kafka.utils.TestUtils
 import kafka.zk.{KafkaZkClient, ZooKeeperTestHarness}
@@ -88,7 +87,7 @@ class ReassignPartitionsIntegrationTest extends 
ZooKeeperTestHarness {
 
     // Override change notification settings so that test is not delayed by ISR
     // change notification delay
-    ReplicaManager.DefaultIsrPropagationConfig = IsrChangePropagationConfig(
+    ZkIsrManager.DefaultIsrPropagationConfig = IsrChangePropagationConfig(
       checkIntervalMs = 500,
       lingerMs = 100,
       maxDelayMs = 500
diff --git a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala 
b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
index 603598e..99f5d54 100644
--- a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
@@ -43,7 +43,7 @@ class AbstractPartitionTest {
   var alterIsrManager: MockAlterIsrManager = _
   var isrChangeListener: MockIsrChangeListener = _
   var logConfig: LogConfig = _
-  val stateStore: PartitionStateStore = mock(classOf[PartitionStateStore])
+  var topicConfigProvider: TopicConfigFetcher = _
   val delayedOperations: DelayedOperations = mock(classOf[DelayedOperations])
   val metadataCache: MetadataCache = mock(classOf[MetadataCache])
   val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints])
@@ -55,6 +55,7 @@ class AbstractPartitionTest {
 
     val logProps = createLogProperties(Map.empty)
     logConfig = LogConfig(logProps)
+    topicConfigProvider = TestUtils.createTopicConfigProvider(logProps)
 
     tmpDir = TestUtils.tempDir()
     logDir1 = TestUtils.randomPartitionLogDir(tmpDir)
@@ -70,14 +71,13 @@ class AbstractPartitionTest {
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       localBrokerId = brokerId,
       time,
-      stateStore,
+      topicConfigProvider,
       isrChangeListener,
       delayedOperations,
       metadataCache,
       logManager,
       alterIsrManager)
 
-    
when(stateStore.fetchTopicConfig()).thenReturn(createLogProperties(Map.empty))
     when(offsetCheckpoints.fetch(ArgumentMatchers.anyString, 
ArgumentMatchers.eq(topicPartition)))
       .thenReturn(None)
   }
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
index e5dbec7..ab4cd8a 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
@@ -21,7 +21,7 @@ import java.util.Properties
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent._
 
-import kafka.api.{ApiVersion, LeaderAndIsr}
+import kafka.api.ApiVersion
 import kafka.log._
 import kafka.server._
 import kafka.server.checkpoints.OffsetCheckpoints
@@ -248,7 +248,7 @@ class PartitionLockTest extends Logging {
     val leaderEpoch = 1
     val brokerId = 0
     val topicPartition = new TopicPartition("test-topic", 0)
-    val stateStore: PartitionStateStore = mock(classOf[PartitionStateStore])
+    val topicConfigProvider = 
TestUtils.createTopicConfigProvider(createLogProperties(Map.empty))
     val isrChangeListener: IsrChangeListener = mock(classOf[IsrChangeListener])
     val delayedOperations: DelayedOperations = mock(classOf[DelayedOperations])
     val metadataCache: MetadataCache = mock(classOf[MetadataCache])
@@ -261,7 +261,7 @@ class PartitionLockTest extends Logging {
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       localBrokerId = brokerId,
       mockTime,
-      stateStore,
+      topicConfigProvider,
       isrChangeListener,
       delayedOperations,
       metadataCache,
@@ -282,14 +282,9 @@ class PartitionLockTest extends Logging {
         new SlowLog(log, mockTime, appendSemaphore)
       }
     }
-    
when(stateStore.fetchTopicConfig()).thenReturn(createLogProperties(Map.empty))
     when(offsetCheckpoints.fetch(ArgumentMatchers.anyString, 
ArgumentMatchers.eq(topicPartition)))
       .thenReturn(None)
-    when(stateStore.shrinkIsr(ArgumentMatchers.anyInt, 
ArgumentMatchers.any[LeaderAndIsr]))
-      .thenReturn(Some(2))
-    when(stateStore.expandIsr(ArgumentMatchers.anyInt, 
ArgumentMatchers.any[LeaderAndIsr]))
-      .thenReturn(Some(2))
-    when(alterIsrManager.enqueue(ArgumentMatchers.any[AlterIsrItem]))
+    when(alterIsrManager.submit(ArgumentMatchers.any[AlterIsrItem]))
       .thenReturn(true)
 
     partition.createLogIfNotExists(isNew = false, isFutureReplica = false, 
offsetCheckpoints)
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index ccf5c9a..bad66f2 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -20,13 +20,14 @@ import java.nio.ByteBuffer
 import java.util.Optional
 import java.util.concurrent.{CountDownLatch, Semaphore}
 import com.yammer.metrics.core.Metric
-import kafka.api.{ApiVersion, LeaderAndIsr}
+import kafka.api.{ApiVersion, KAFKA_2_6_IV0}
 import kafka.common.UnexpectedAppendOffsetException
 import kafka.log.{Defaults => _, _}
 import kafka.metrics.KafkaYammerMetrics
 import kafka.server._
 import kafka.server.checkpoints.OffsetCheckpoints
 import kafka.utils._
+import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.errors.{ApiException, 
NotLeaderOrFollowerException, OffsetNotAvailableException, 
OffsetOutOfRangeException}
 import org.apache.kafka.common.message.FetchResponseData
 import 
org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
@@ -40,6 +41,7 @@ import org.apache.kafka.common.{IsolationLevel, 
TopicPartition}
 import org.junit.Assert._
 import org.junit.Test
 import org.mockito.ArgumentMatchers
+import org.mockito.ArgumentMatchers.{any, anyString}
 import org.mockito.Mockito._
 import org.mockito.invocation.InvocationOnMock
 import org.scalatest.Assertions.assertThrows
@@ -227,7 +229,7 @@ class PartitionTest extends AbstractPartitionTest {
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       localBrokerId = brokerId,
       time,
-      stateStore,
+      topicConfigProvider,
       isrChangeListener,
       delayedOperations,
       metadataCache,
@@ -606,16 +608,15 @@ class PartitionTest extends AbstractPartitionTest {
       }
     }
 
-    when(stateStore.expandIsr(controllerEpoch, new LeaderAndIsr(leader, 
leaderEpoch,
-      List(leader, follower2, follower1), 1)))
-      .thenReturn(Some(2))
-
     updateFollowerFetchState(follower1, LogOffsetMetadata(0))
     updateFollowerFetchState(follower1, LogOffsetMetadata(2))
 
     updateFollowerFetchState(follower2, LogOffsetMetadata(0))
     updateFollowerFetchState(follower2, LogOffsetMetadata(2))
 
+    // Simulate successful ISR update
+    alterIsrManager.completeIsrUpdate(2)
+
     // At this point, the leader has gotten 5 writes, but followers have only 
fetched two
     assertEquals(2, partition.localLogOrException.highWatermark)
 
@@ -699,14 +700,13 @@ class PartitionTest extends AbstractPartitionTest {
       case Left(e: ApiException) => fail(s"Should have seen 
OffsetNotAvailableException, saw $e")
     }
 
-    when(stateStore.expandIsr(controllerEpoch, new LeaderAndIsr(leader, 
leaderEpoch + 2,
-      List(leader, follower2, follower1), 5)))
-      .thenReturn(Some(2))
-
     // Next fetch from replicas, HW is moved up to 5 (ahead of the LEO)
     updateFollowerFetchState(follower1, LogOffsetMetadata(5))
     updateFollowerFetchState(follower2, LogOffsetMetadata(5))
 
+    // Simulate successful ISR update
+    alterIsrManager.completeIsrUpdate(6)
+
     // Error goes away
     fetchOffsetsForTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, 
Some(IsolationLevel.READ_UNCOMMITTED)) match {
       case Right(Some(offsetAndTimestamp)) => assertEquals(5, 
offsetAndTimestamp.offset)
@@ -1006,7 +1006,7 @@ class PartitionTest extends AbstractPartitionTest {
     // Expansion does not affect the ISR
     assertEquals("ISR", Set[Integer](leader, follower2), 
partition.isrState.isr)
     assertEquals("ISR", Set[Integer](leader, follower1, follower2), 
partition.isrState.maximalIsr)
-    assertEquals("AlterIsr", 
alterIsrManager.isrUpdates.dequeue().leaderAndIsr.isr.toSet,
+    assertEquals("AlterIsr", 
alterIsrManager.isrUpdates.head.leaderAndIsr.isr.toSet,
       Set(leader, follower1, follower2))
   }
 
@@ -1165,7 +1165,7 @@ class PartitionTest extends AbstractPartitionTest {
       leaderEndOffset = 6L)
 
     assertEquals(alterIsrManager.isrUpdates.size, 1)
-    val isrItem = alterIsrManager.isrUpdates.dequeue()
+    val isrItem = alterIsrManager.isrUpdates.head
     assertEquals(isrItem.leaderAndIsr.isr, List(brokerId, remoteBrokerId))
     assertEquals(Set(brokerId), partition.isrState.isr)
     assertEquals(Set(brokerId, remoteBrokerId), partition.isrState.maximalIsr)
@@ -1173,7 +1173,7 @@ class PartitionTest extends AbstractPartitionTest {
     assertEquals(0L, remoteReplica.logStartOffset)
 
     // Complete the ISR expansion
-    isrItem.callback.apply(Right(new LeaderAndIsr(brokerId, leaderEpoch, 
List(brokerId, remoteBrokerId), 2)))
+    alterIsrManager.completeIsrUpdate(2)
     assertEquals(Set(brokerId, remoteBrokerId), partition.isrState.isr)
 
     assertEquals(isrChangeListener.expands.get, 1)
@@ -1224,8 +1224,7 @@ class PartitionTest extends AbstractPartitionTest {
     assertEquals(0L, remoteReplica.logStartOffset)
 
     // Simulate failure callback
-    val alterIsrItem = alterIsrManager.isrUpdates.dequeue()
-    alterIsrItem.callback.apply(Left(Errors.INVALID_UPDATE_VERSION))
+    alterIsrManager.failIsrUpdate(Errors.INVALID_UPDATE_VERSION)
 
     // Still no ISR change
     assertEquals(Set(brokerId), partition.inSyncReplicaIds)
@@ -1281,7 +1280,7 @@ class PartitionTest extends AbstractPartitionTest {
     // Shrink the ISR
     partition.maybeShrinkIsr()
     assertEquals(alterIsrManager.isrUpdates.size, 1)
-    assertEquals(alterIsrManager.isrUpdates.dequeue().leaderAndIsr.isr, 
List(brokerId))
+    assertEquals(alterIsrManager.isrUpdates.head.leaderAndIsr.isr, 
List(brokerId))
     assertEquals(Set(brokerId, remoteBrokerId), partition.isrState.isr)
     assertEquals(Set(brokerId, remoteBrokerId), partition.isrState.maximalIsr)
     assertEquals(0L, partition.localLogOrException.highWatermark)
@@ -1450,8 +1449,7 @@ class PartitionTest extends AbstractPartitionTest {
     assertEquals(0L, partition.localLogOrException.highWatermark)
 
     // Simulate failure callback
-    val alterIsrItem = alterIsrManager.isrUpdates.dequeue()
-    alterIsrItem.callback.apply(Left(Errors.INVALID_UPDATE_VERSION))
+    alterIsrManager.failIsrUpdate(Errors.INVALID_UPDATE_VERSION)
 
     // Ensure ISR hasn't changed
     assertEquals(partition.isrState.getClass, classOf[PendingShrinkIsr])
@@ -1534,7 +1532,7 @@ class PartitionTest extends AbstractPartitionTest {
     assertEquals(0L, remoteReplica.logStartOffset)
 
     // Failure
-    alterIsrManager.isrUpdates.dequeue().callback(Left(error))
+    alterIsrManager.failIsrUpdate(error)
     callback(brokerId, remoteBrokerId, partition)
   }
 
@@ -1582,6 +1580,74 @@ class PartitionTest extends AbstractPartitionTest {
   }
 
   @Test
+  def testZkIsrManagerAsyncCallback(): Unit = {
+    // We need a real scheduler here so that the ISR write lock works properly
+    val scheduler = new KafkaScheduler(1, "zk-isr-test")
+    scheduler.startup()
+    val kafkaZkClient = mock(classOf[KafkaZkClient])
+
+    doAnswer(_ => (true, 2))
+      .when(kafkaZkClient)
+      .conditionalUpdatePath(anyString(), any(), ArgumentMatchers.eq(1), any())
+
+    val zkIsrManager = AlterIsrManager(scheduler, time, kafkaZkClient)
+    zkIsrManager.start()
+
+    val partition = new Partition(topicPartition,
+      replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
+      interBrokerProtocolVersion = KAFKA_2_6_IV0, // shouldn't matter, but set 
this to a ZK isr version
+      localBrokerId = brokerId,
+      time,
+      topicConfigProvider,
+      isrChangeListener,
+      delayedOperations,
+      metadataCache,
+      logManager,
+      zkIsrManager)
+
+    val log = logManager.getOrCreateLog(topicPartition, () => logConfig)
+    seedLogData(log, numRecords = 10, leaderEpoch = 4)
+
+    val controllerEpoch = 0
+    val leaderEpoch = 5
+    val follower1 = brokerId + 1
+    val follower2 = brokerId + 2
+    val follower3 = brokerId + 3
+    val replicas = List[Integer](brokerId, follower1, follower2, 
follower3).asJava
+    val isr = List[Integer](brokerId, follower1, follower2).asJava
+
+    doNothing().when(delayedOperations).checkAndCompleteAll()
+
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, 
offsetCheckpoints)
+    assertTrue("Expected become leader transition to succeed",
+      partition.makeLeader(
+        new LeaderAndIsrPartitionState()
+          .setControllerEpoch(controllerEpoch)
+          .setLeader(brokerId)
+          .setLeaderEpoch(leaderEpoch)
+          .setIsr(isr)
+          .setZkVersion(1)
+          .setReplicas(replicas)
+          .setIsNew(true),
+        offsetCheckpoints))
+    assertEquals(Set(brokerId, follower1, follower2), partition.isrState.isr)
+    assertEquals(0L, partition.localLogOrException.highWatermark)
+
+    // Expand ISR
+    partition.expandIsr(follower3)
+
+    // Try avoiding a race
+    TestUtils.waitUntilTrue(() => !partition.isrState.isInflight, "Expected 
ISR state to be committed", 100)
+
+    partition.isrState match {
+      case committed: CommittedIsr => assertEquals(Set(brokerId, follower1, 
follower2, follower3), committed.isr)
+      case _ => fail("Expected a committed ISR following Zk expansion")
+    }
+
+    scheduler.shutdown()
+  }
+
+  @Test
   def testUseCheckpointToInitializeHighWatermark(): Unit = {
     val log = logManager.getOrCreateLog(topicPartition, () => logConfig)
     seedLogData(log, numRecords = 6, leaderEpoch = 5)
@@ -1653,7 +1719,7 @@ class PartitionTest extends AbstractPartitionTest {
     val topicPartition = new TopicPartition("test", 1)
     val partition = new Partition(
       topicPartition, 1000, ApiVersion.latestVersion, 0,
-      new SystemTime(), mock(classOf[PartitionStateStore]), 
mock(classOf[IsrChangeListener]), mock(classOf[DelayedOperations]),
+      new SystemTime(), topicConfigProvider, mock(classOf[IsrChangeListener]), 
mock(classOf[DelayedOperations]),
       mock(classOf[MetadataCache]), mock(classOf[LogManager]), 
mock(classOf[AlterIsrManager]))
 
     val replicas = Seq(0, 1, 2, 3)
@@ -1690,12 +1756,13 @@ class PartitionTest extends AbstractPartitionTest {
   @Test
   def testLogConfigNotDirty(): Unit = {
     val spyLogManager = spy(logManager)
+    val spyConfigProvider = spy(topicConfigProvider)
     val partition = new Partition(topicPartition,
       replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       localBrokerId = brokerId,
       time,
-      stateStore,
+      spyConfigProvider,
       isrChangeListener,
       delayedOperations,
       metadataCache,
@@ -1711,7 +1778,7 @@ class PartitionTest extends AbstractPartitionTest {
       ArgumentMatchers.any()) // This doesn't get evaluated, but needed to 
satisfy compilation
 
     // We should get config from ZK only once
-    verify(stateStore).fetchTopicConfig()
+    verify(spyConfigProvider, times(1)).fetch()
   }
 
   /**
@@ -1720,6 +1787,7 @@ class PartitionTest extends AbstractPartitionTest {
    */
   @Test
   def testLogConfigDirtyAsTopicUpdated(): Unit = {
+    val spyConfigProvider = spy(topicConfigProvider)
     val spyLogManager = spy(logManager)
     doAnswer((invocation: InvocationOnMock) => {
       logManager.initializingLog(topicPartition)
@@ -1731,7 +1799,7 @@ class PartitionTest extends AbstractPartitionTest {
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       localBrokerId = brokerId,
       time,
-      stateStore,
+      spyConfigProvider,
       isrChangeListener,
       delayedOperations,
       metadataCache,
@@ -1748,7 +1816,7 @@ class PartitionTest extends AbstractPartitionTest {
 
     // We should get config from ZK twice, once before log is created, and 
second time once
     // we find log config is dirty and refresh it.
-    verify(stateStore, times(2)).fetchTopicConfig()
+    verify(spyConfigProvider, times(2)).fetch()
   }
 
   /**
@@ -1757,6 +1825,7 @@ class PartitionTest extends AbstractPartitionTest {
    */
   @Test
   def testLogConfigDirtyAsBrokerUpdated(): Unit = {
+    val spyConfigProvider = spy(topicConfigProvider)
     val spyLogManager = spy(logManager)
     doAnswer((invocation: InvocationOnMock) => {
       logManager.initializingLog(topicPartition)
@@ -1768,7 +1837,7 @@ class PartitionTest extends AbstractPartitionTest {
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       localBrokerId = brokerId,
       time,
-      stateStore,
+      spyConfigProvider,
       isrChangeListener,
       delayedOperations,
       metadataCache,
@@ -1785,7 +1854,7 @@ class PartitionTest extends AbstractPartitionTest {
 
     // We should get config from ZK twice, once before log is created, and 
second time once
     // we find log config is dirty and refresh it.
-    verify(stateStore, times(2)).fetchTopicConfig()
+    verify(spyConfigProvider, times(2)).fetch()
   }
 
   private def seedLogData(log: Log, numRecords: Int, leaderEpoch: Int): Unit = 
{
diff --git a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala
index 1f451fe..a2e269b 100644
--- a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala
@@ -19,10 +19,10 @@ package unit.kafka.server
 
 import java.util.Collections
 import java.util.concurrent.atomic.AtomicInteger
-
 import kafka.api.LeaderAndIsr
-import kafka.server.{AlterIsrItem, AlterIsrManager, AlterIsrManagerImpl, 
BrokerToControllerChannelManager, ControllerRequestCompletionHandler}
+import kafka.server.{AlterIsrItem, AlterIsrManager, 
BrokerToControllerChannelManager, ControllerRequestCompletionHandler, 
DefaultAlterIsrManager, ZkIsrManager}
 import kafka.utils.{MockScheduler, MockTime}
+import kafka.zk.KafkaZkClient
 import org.apache.kafka.clients.ClientResponse
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.message.AlterIsrResponseData
@@ -32,6 +32,8 @@ import org.apache.kafka.common.requests.{AbstractRequest, 
AlterIsrRequest, Alter
 import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.{Before, Test}
+import org.mockito.ArgumentMatchers.{any, anyString}
+import org.mockito.{ArgumentMatchers, Mockito}
 
 
 class AlterIsrManagerTest {
@@ -59,9 +61,9 @@ class AlterIsrManagerTest {
     EasyMock.replay(brokerToController)
 
     val scheduler = new MockScheduler(time)
-    val alterIsrManager = new AlterIsrManagerImpl(brokerToController, 
scheduler, time, brokerId, () => 2)
+    val alterIsrManager = new DefaultAlterIsrManager(brokerToController, 
scheduler, time, brokerId, () => 2)
     alterIsrManager.start()
-    alterIsrManager.enqueue(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, 
List(1,2,3), 10), _ => {}))
+    alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, 
List(1,2,3), 10), _ => {}, 0))
     time.sleep(50)
     scheduler.tick()
 
@@ -75,12 +77,12 @@ class AlterIsrManagerTest {
     EasyMock.replay(brokerToController)
 
     val scheduler = new MockScheduler(time)
-    val alterIsrManager = new AlterIsrManagerImpl(brokerToController, 
scheduler, time, brokerId, () => 2)
+    val alterIsrManager = new DefaultAlterIsrManager(brokerToController, 
scheduler, time, brokerId, () => 2)
     alterIsrManager.start()
 
     // Only send one ISR update for a given topic+partition
-    assertTrue(alterIsrManager.enqueue(AlterIsrItem(tp0, new LeaderAndIsr(1, 
1, List(1,2,3), 10), _ => {})))
-    assertFalse(alterIsrManager.enqueue(AlterIsrItem(tp0, new LeaderAndIsr(1, 
1, List(1,2), 10), _ => {})))
+    assertTrue(alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, 
List(1,2,3), 10), _ => {}, 0)))
+    assertFalse(alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 
1, List(1,2), 10), _ => {}, 0)))
 
     time.sleep(50)
     scheduler.tick()
@@ -99,12 +101,12 @@ class AlterIsrManagerTest {
     EasyMock.replay(brokerToController)
 
     val scheduler = new MockScheduler(time)
-    val alterIsrManager = new AlterIsrManagerImpl(brokerToController, 
scheduler, time, brokerId, () => 2)
+    val alterIsrManager = new DefaultAlterIsrManager(brokerToController, 
scheduler, time, brokerId, () => 2)
     alterIsrManager.start()
 
     for (i <- 0 to 9) {
-      alterIsrManager.enqueue(AlterIsrItem(new TopicPartition(topic, i),
-        new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}))
+      alterIsrManager.submit(AlterIsrItem(new TopicPartition(topic, i),
+        new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0))
       time.sleep(1)
     }
 
@@ -112,8 +114,8 @@ class AlterIsrManagerTest {
     scheduler.tick()
 
     // This should not be included in the batch
-    alterIsrManager.enqueue(AlterIsrItem(new TopicPartition(topic, 10),
-      new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}))
+    alterIsrManager.submit(AlterIsrItem(new TopicPartition(topic, 10),
+      new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0))
 
     EasyMock.verify(brokerToController)
 
@@ -124,26 +126,26 @@ class AlterIsrManagerTest {
 
   @Test
   def testAuthorizationFailed(): Unit = {
-    val isrs = Seq(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), 
_ => { }))
+    val isrs = Seq(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), 
_ => { }, 0))
     val manager = testTopLevelError(isrs, Errors.CLUSTER_AUTHORIZATION_FAILED)
     // On authz error, we log the exception and keep retrying
-    assertFalse(manager.enqueue(AlterIsrItem(tp0, null, _ => { })))
+    assertFalse(manager.submit(AlterIsrItem(tp0, null, _ => { }, 0)))
   }
 
   @Test
   def testStaleBrokerEpoch(): Unit = {
-    val isrs = Seq(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), 
_ => { }))
+    val isrs = Seq(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), 
_ => { }, 0))
     val manager = testTopLevelError(isrs, Errors.STALE_BROKER_EPOCH)
     // On stale broker epoch, we want to retry, so we don't clear items from 
the pending map
-    assertFalse(manager.enqueue(AlterIsrItem(tp0, null, _ => { })))
+    assertFalse(manager.submit(AlterIsrItem(tp0, null, _ => { }, 0)))
   }
 
   @Test
   def testOtherErrors(): Unit = {
-    val isrs = Seq(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), 
_ => { }))
+    val isrs = Seq(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), 
_ => { }, 0))
     val manager = testTopLevelError(isrs, Errors.UNKNOWN_SERVER_ERROR)
     // On other unexpected errors, we also want to retry
-    assertFalse(manager.enqueue(AlterIsrItem(tp0, null, _ => { })))
+    assertFalse(manager.submit(AlterIsrItem(tp0, null, _ => { }, 0)))
   }
 
   def testTopLevelError(isrs: Seq[AlterIsrItem], error: Errors): 
AlterIsrManager = {
@@ -153,9 +155,9 @@ class AlterIsrManagerTest {
     EasyMock.replay(brokerToController)
 
     val scheduler = new MockScheduler(time)
-    val alterIsrManager = new AlterIsrManagerImpl(brokerToController, 
scheduler, time, brokerId, () => 2)
+    val alterIsrManager = new DefaultAlterIsrManager(brokerToController, 
scheduler, time, brokerId, () => 2)
     alterIsrManager.start()
-    isrs.foreach(alterIsrManager.enqueue)
+    isrs.foreach(alterIsrManager.submit)
 
     time.sleep(100)
     scheduler.tick()
@@ -175,7 +177,7 @@ class AlterIsrManagerTest {
     errors.foreach(error => {
       val alterIsrManager = testPartitionError(tp0, error)
       // Any partition-level error should clear the item from the pending 
queue allowing for future updates
-      assertTrue(alterIsrManager.enqueue(AlterIsrItem(tp0, null, _ => { })))
+      assertTrue(alterIsrManager.submit(AlterIsrItem(tp0, null, _ => { }, 0)))
     })
   }
 
@@ -186,7 +188,7 @@ class AlterIsrManagerTest {
     EasyMock.replay(brokerToController)
 
     val scheduler = new MockScheduler(time)
-    val alterIsrManager = new AlterIsrManagerImpl(brokerToController, 
scheduler, time, brokerId, () => 2)
+    val alterIsrManager = new DefaultAlterIsrManager(brokerToController, 
scheduler, time, brokerId, () => 2)
     alterIsrManager.start()
 
     var capturedError: Option[Errors] = None
@@ -197,7 +199,7 @@ class AlterIsrManagerTest {
       }
     }
 
-    alterIsrManager.enqueue(AlterIsrItem(tp, new LeaderAndIsr(1, 1, 
List(1,2,3), 10), callback))
+    alterIsrManager.submit(AlterIsrItem(tp, new LeaderAndIsr(1, 1, 
List(1,2,3), 10), callback, 0))
 
     time.sleep(100)
     scheduler.tick()
@@ -228,16 +230,16 @@ class AlterIsrManagerTest {
     EasyMock.replay(brokerToController)
 
     val scheduler = new MockScheduler(time)
-    val alterIsrManager = new AlterIsrManagerImpl(brokerToController, 
scheduler, time, brokerId, () => 2)
+    val alterIsrManager = new DefaultAlterIsrManager(brokerToController, 
scheduler, time, brokerId, () => 2)
     alterIsrManager.start()
-    alterIsrManager.enqueue(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, 
List(1,2,3), 10), _ => {}))
+    alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, 
List(1,2,3), 10), _ => {}, 0))
 
     time.sleep(100)
     scheduler.tick() // Triggers a request
 
     // Enqueue more updates
-    alterIsrManager.enqueue(AlterIsrItem(tp1, new LeaderAndIsr(1, 1, 
List(1,2,3), 10), _ => {}))
-    alterIsrManager.enqueue(AlterIsrItem(tp2, new LeaderAndIsr(1, 1, 
List(1,2,3), 10), _ => {}))
+    alterIsrManager.submit(AlterIsrItem(tp1, new LeaderAndIsr(1, 1, 
List(1,2,3), 10), _ => {}, 0))
+    alterIsrManager.submit(AlterIsrItem(tp2, new LeaderAndIsr(1, 1, 
List(1,2,3), 10), _ => {}, 0))
 
     time.sleep(100)
     scheduler.tick() // Trigger the schedule again, but no request this time
@@ -267,7 +269,7 @@ class AlterIsrManagerTest {
     EasyMock.replay(brokerToController)
 
     val scheduler = new MockScheduler(time)
-    val alterIsrManager = new AlterIsrManagerImpl(brokerToController, 
scheduler, time, brokerId, () => 2)
+    val alterIsrManager = new DefaultAlterIsrManager(brokerToController, 
scheduler, time, brokerId, () => 2)
     alterIsrManager.start()
 
     val count = new AtomicInteger(0)
@@ -275,9 +277,9 @@ class AlterIsrManagerTest {
       count.incrementAndGet()
       return
     }
-    alterIsrManager.enqueue(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, 
List(1,2,3), 10), callback))
-    alterIsrManager.enqueue(AlterIsrItem(tp1, new LeaderAndIsr(1, 1, 
List(1,2,3), 10), callback))
-    alterIsrManager.enqueue(AlterIsrItem(tp2, new LeaderAndIsr(1, 1, 
List(1,2,3), 10), callback))
+    alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, 
List(1,2,3), 10), callback, 0))
+    alterIsrManager.submit(AlterIsrItem(tp1, new LeaderAndIsr(1, 1, 
List(1,2,3), 10), callback, 0))
+    alterIsrManager.submit(AlterIsrItem(tp2, new LeaderAndIsr(1, 1, 
List(1,2,3), 10), callback, 0))
 
 
     time.sleep(100)
@@ -300,4 +302,33 @@ class AlterIsrManagerTest {
 
     assertEquals("Expected all callbacks to run", count.get, 3)
   }
+
+  @Test
+  def testZkBasic(): Unit = {
+    val scheduler = new MockScheduler(time)
+    scheduler.startup()
+
+    val kafkaZkClient = Mockito.mock(classOf[KafkaZkClient])
+    Mockito.doAnswer(_ => (true, 2))
+      .when(kafkaZkClient)
+      .conditionalUpdatePath(anyString(), any(), ArgumentMatchers.eq(1), any())
+    Mockito.doAnswer(_ => (false, 2))
+      .when(kafkaZkClient)
+      .conditionalUpdatePath(anyString(), any(), ArgumentMatchers.eq(3), any())
+
+    val zkIsrManager = new ZkIsrManager(scheduler, time, kafkaZkClient)
+    zkIsrManager.start()
+
+    def expectMatch(expect: Either[Errors, LeaderAndIsr])(result: 
Either[Errors, LeaderAndIsr]): Unit = {
+      assertEquals(expect, result)
+    }
+
+    // Correct ZK version
+    assertTrue(zkIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, 
List(1,2,3), 1),
+      expectMatch(Right(new LeaderAndIsr(1, 1, List(1,2,3), 2))), 0)))
+
+    // Wrong ZK version
+    assertTrue(zkIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, 
List(1,2,3), 3),
+      expectMatch(Left(Errors.INVALID_UPDATE_VERSION)), 0)))
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
index 7342be6..8cce64d 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
@@ -17,13 +17,14 @@
 
 package kafka.server
 
-import java.util.Properties
+import kafka.api.ApiVersion
 
+import java.util.Properties
 import kafka.utils.TestUtils
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.zookeeper.client.ZKClientConfig
 import org.junit.Test
-import org.junit.Assert.assertEquals
+import org.junit.Assert.{assertEquals, fail}
 import org.scalatest.Assertions.intercept
 
 class KafkaServerTest extends ZooKeeperTestHarness {
@@ -102,6 +103,32 @@ class KafkaServerTest extends ZooKeeperTestHarness {
       assertEquals(zkClientValueToExpect(kafkaProp), 
zkClientConfig.get.getProperty(KafkaConfig.ZkSslConfigToSystemPropertyMap(kafkaProp))))
   }
 
+  @Test
+  def testZkIsrManager(): Unit = {
+    val props = TestUtils.createBrokerConfigs(1, zkConnect).head
+    props.put(KafkaConfig.InterBrokerProtocolVersionProp, "2.7-IV1")
+
+    val server = TestUtils.createServer(KafkaConfig.fromProps(props))
+    server.replicaManager.alterIsrManager match {
+      case _: ZkIsrManager =>
+      case _ => fail("Should use ZK for ISR manager in versions before 
2.7-IV2")
+    }
+    server.shutdown()
+  }
+
+  @Test
+  def testAlterIsrManager(): Unit = {
+    val props = TestUtils.createBrokerConfigs(1, zkConnect).head
+    props.put(KafkaConfig.InterBrokerProtocolVersionProp, 
ApiVersion.latestVersion.toString)
+
+    val server = TestUtils.createServer(KafkaConfig.fromProps(props))
+    server.replicaManager.alterIsrManager match {
+      case _: DefaultAlterIsrManager =>
+      case _ => fail("Should use AlterIsr for ISR manager in versions after 
2.7-IV2")
+    }
+    server.shutdown()
+  }
+
   def createServer(nodeId: Int, hostName: String, port: Int): KafkaServer = {
     val props = TestUtils.createBrokerConfig(nodeId, zkConnect)
     props.put(KafkaConfig.AdvertisedListenersProp, 
s"PLAINTEXT://$hostName:$port")
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 80fdc60..c2d7c7aa 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -64,6 +64,7 @@ class ReplicaManagerTest {
 
   val topic = "test-topic"
   val time = new MockTime
+  val scheduler = new MockScheduler(time)
   val metrics = new Metrics
   var kafkaZkClient: KafkaZkClient = _
   var alterIsrManager: AlterIsrManager = _
@@ -1708,9 +1709,11 @@ class ReplicaManagerTest {
     result
   }
 
-  private def setupReplicaManagerWithMockedPurgatories(timer: MockTimer, 
aliveBrokerIds: Seq[Int] = Seq(0, 1)): ReplicaManager = {
+  private def setupReplicaManagerWithMockedPurgatories(timer: MockTimer, 
aliveBrokerIds: Seq[Int] = Seq(0, 1),
+                                                       propsModifier: 
Properties => Unit = _ => {}): ReplicaManager = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
     props.put("log.dirs", TestUtils.tempRelativeDir("data").getAbsolutePath + 
"," + TestUtils.tempRelativeDir("data2").getAbsolutePath)
+    propsModifier.apply(props)
     val config = KafkaConfig.fromProps(props)
     val logProps = new Properties()
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new 
File(_)), LogConfig(logProps))
@@ -1733,7 +1736,7 @@ class ReplicaManagerTest {
     val mockDelayedElectLeaderPurgatory = new 
DelayedOperationPurgatory[DelayedElectLeader](
       purgatoryName = "DelayedElectLeader", timer, reaperEnabled = false)
 
-    new ReplicaManager(config, metrics, time, kafkaZkClient, new 
MockScheduler(time), mockLogMgr,
+    new ReplicaManager(config, metrics, time, kafkaZkClient, scheduler, 
mockLogMgr,
       new AtomicBoolean(false), quotaManager, new BrokerTopicStats,
       metadataCache, new LogDirFailureChannel(config.logDirs.size), 
mockProducePurgatory, mockFetchPurgatory,
       mockDeleteRecordsPurgatory, mockDelayedElectLeaderPurgatory, 
Option(this.getClass.getName), alterIsrManager)
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 5368c20..462e802 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -23,13 +23,12 @@ import java.nio.charset.{Charset, StandardCharsets}
 import java.nio.file.{Files, StandardOpenOption}
 import java.security.cert.X509Certificate
 import java.time.Duration
-import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 import java.util.{Arrays, Collections, Properties}
 import java.util.concurrent.{Callable, ExecutionException, Executors, TimeUnit}
-
 import javax.net.ssl.X509TrustManager
 import kafka.api._
-import kafka.cluster.{Broker, EndPoint, IsrChangeListener}
+import kafka.cluster.{Broker, EndPoint, IsrChangeListener, TopicConfigFetcher}
 import kafka.log._
 import kafka.security.auth.{Acl, Resource, Authorizer => LegacyAuthorizer}
 import kafka.server._
@@ -50,6 +49,7 @@ import org.apache.kafka.common.errors.{KafkaStorageException, 
UnknownTopicOrPart
 import org.apache.kafka.common.header.Header
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.network.{ListenerName, Mode}
+import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.resource.ResourcePattern
@@ -1067,17 +1067,40 @@ object TestUtils extends Logging {
 
   class MockAlterIsrManager extends AlterIsrManager {
     val isrUpdates: mutable.Queue[AlterIsrItem] = new 
mutable.Queue[AlterIsrItem]()
-
-    override def enqueue(alterIsrItem: AlterIsrItem): Boolean = {
-      isrUpdates += alterIsrItem
-      true
+    val inFlight: AtomicBoolean = new AtomicBoolean(false)
+
+    override def submit(alterIsrItem: AlterIsrItem): Boolean = {
+      if (inFlight.compareAndSet(false, true)) {
+        isrUpdates += alterIsrItem
+        true
+      } else {
+        false
+      }
     }
 
     override def clearPending(topicPartition: TopicPartition): Unit = {
-      isrUpdates.clear()
+      inFlight.set(false);
     }
 
     override def start(): Unit = { }
+
+    def completeIsrUpdate(newZkVersion: Int): Unit = {
+      if (inFlight.compareAndSet(true, false)) {
+        val item = isrUpdates.head
+        
item.callback.apply(Right(item.leaderAndIsr.withZkVersion(newZkVersion)))
+      } else {
+        fail("Expected an in-flight ISR update, but there was none")
+      }
+    }
+
+    def failIsrUpdate(error: Errors): Unit = {
+      if (inFlight.compareAndSet(true, false)) {
+        val item = isrUpdates.dequeue()
+        item.callback.apply(Left(error))
+      } else {
+        fail("Expected an in-flight ISR update, but there was none")
+      }
+    }
   }
 
   def createAlterIsrManager(): MockAlterIsrManager = {
@@ -1106,6 +1129,14 @@ object TestUtils extends Logging {
     new MockIsrChangeListener()
   }
 
+  class MockTopicConfigFetcher(var props: Properties) extends 
TopicConfigFetcher {
+    override def fetch(): Properties = props
+  }
+
+  def createTopicConfigProvider(props: Properties): MockTopicConfigFetcher = {
+    new MockTopicConfigFetcher(props)
+  }
+
   def produceMessages(servers: Seq[KafkaServer],
                       records: Seq[ProducerRecord[Array[Byte], Array[Byte]]],
                       acks: Int = -1): Unit = {
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index a0a3387..26fb960 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -22,7 +22,6 @@ import kafka.cluster.BrokerEndPoint;
 import kafka.cluster.DelayedOperations;
 import kafka.cluster.IsrChangeListener;
 import kafka.cluster.Partition;
-import kafka.cluster.PartitionStateStore;
 import kafka.log.CleanerConfig;
 import kafka.log.Defaults;
 import kafka.log.LogAppendInfo;
@@ -153,14 +152,12 @@ public class ReplicaFetcherThreadBenchmark {
                     .setReplicas(replicas)
                     .setIsNew(true);
 
-            PartitionStateStore partitionStateStore = 
Mockito.mock(PartitionStateStore.class);
-            
Mockito.when(partitionStateStore.fetchTopicConfig()).thenReturn(new 
Properties());
             IsrChangeListener isrChangeListener = 
Mockito.mock(IsrChangeListener.class);
             OffsetCheckpoints offsetCheckpoints = 
Mockito.mock(OffsetCheckpoints.class);
             Mockito.when(offsetCheckpoints.fetch(logDir.getAbsolutePath(), 
tp)).thenReturn(Option.apply(0L));
             AlterIsrManager isrChannelManager = 
Mockito.mock(AlterIsrManager.class);
             Partition partition = new Partition(tp, 100, 
ApiVersion$.MODULE$.latestVersion(),
-                    0, Time.SYSTEM, partitionStateStore, isrChangeListener, 
new DelayedOperationsMock(tp),
+                    0, Time.SYSTEM, Properties::new, isrChangeListener, new 
DelayedOperationsMock(tp),
                     Mockito.mock(MetadataCache.class), logManager, 
isrChannelManager);
 
             partition.makeFollower(partitionState, offsetCheckpoints);
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
index 4323a2c..9598390 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
@@ -21,7 +21,6 @@ import kafka.api.ApiVersion$;
 import kafka.cluster.DelayedOperations;
 import kafka.cluster.IsrChangeListener;
 import kafka.cluster.Partition;
-import kafka.cluster.PartitionStateStore;
 import kafka.log.CleanerConfig;
 import kafka.log.Defaults;
 import kafka.log.LogConfig;
@@ -116,14 +115,12 @@ public class PartitionMakeFollowerBenchmark {
 
         TopicPartition tp = new TopicPartition("topic", 0);
 
-        PartitionStateStore partitionStateStore = 
Mockito.mock(PartitionStateStore.class);
-        Mockito.when(partitionStateStore.fetchTopicConfig()).thenReturn(new 
Properties());
         Mockito.when(offsetCheckpoints.fetch(logDir.getAbsolutePath(), 
tp)).thenReturn(Option.apply(0L));
         IsrChangeListener isrChangeListener = 
Mockito.mock(IsrChangeListener.class);
         AlterIsrManager alterIsrManager = Mockito.mock(AlterIsrManager.class);
         partition = new Partition(tp, 100,
             ApiVersion$.MODULE$.latestVersion(), 0, Time.SYSTEM,
-            partitionStateStore, isrChangeListener, delayedOperations,
+            Properties::new, isrChangeListener, delayedOperations,
             Mockito.mock(MetadataCache.class), logManager, alterIsrManager);
         partition.createLogIfNotExists(true, false, offsetCheckpoints);
         executorService.submit((Runnable) () -> {
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
index 54e5f48..a58bd7d 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
@@ -21,7 +21,6 @@ import kafka.api.ApiVersion$;
 import kafka.cluster.DelayedOperations;
 import kafka.cluster.IsrChangeListener;
 import kafka.cluster.Partition;
-import kafka.cluster.PartitionStateStore;
 import kafka.log.CleanerConfig;
 import kafka.log.Defaults;
 import kafka.log.LogConfig;
@@ -115,13 +114,11 @@ public class UpdateFollowerFetchStateBenchmark {
             .setZkVersion(1)
             .setReplicas(replicas)
             .setIsNew(true);
-        PartitionStateStore partitionStateStore = 
Mockito.mock(PartitionStateStore.class);
-        Mockito.when(partitionStateStore.fetchTopicConfig()).thenReturn(new 
Properties());
         IsrChangeListener isrChangeListener = 
Mockito.mock(IsrChangeListener.class);
         AlterIsrManager alterIsrManager = Mockito.mock(AlterIsrManager.class);
         partition = new Partition(topicPartition, 100,
                 ApiVersion$.MODULE$.latestVersion(), 0, Time.SYSTEM,
-                partitionStateStore, isrChangeListener, delayedOperations,
+                Properties::new, isrChangeListener, delayedOperations,
                 Mockito.mock(MetadataCache.class), logManager, 
alterIsrManager);
         partition.makeLeader(partitionState, offsetCheckpoints);
     }
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
index 7c9c6de..27afe59 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
@@ -18,7 +18,6 @@ package org.apache.kafka.jmh.server;
 
 import java.util.Properties;
 import kafka.cluster.Partition;
-import kafka.cluster.PartitionStateStore;
 import kafka.log.CleanerConfig;
 import kafka.log.LogConfig;
 import kafka.log.LogManager;
@@ -39,7 +38,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
-import org.mockito.Mockito;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.Fork;
 import org.openjdk.jmh.annotations.Level;
@@ -150,8 +148,6 @@ public class CheckpointBench {
             }
         }
 
-        PartitionStateStore partitionStateStore = 
Mockito.mock(PartitionStateStore.class);
-        Mockito.when(partitionStateStore.fetchTopicConfig()).thenReturn(new 
Properties());
         OffsetCheckpoints checkpoints = (logDir, topicPartition) -> 
Option.apply(0L);
         for (TopicPartition topicPartition : topicPartitions) {
             final Partition partition = 
this.replicaManager.createPartition(topicPartition);

Reply via email to