This is an automated email from the ASF dual-hosted git repository.
davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d3f19e4 KAFKA-10825 ZooKeeper ISR manager (#9713)
d3f19e4 is described below
commit d3f19e4bb047338840d380d2253dc92cf56b0a0f
Author: David Arthur <[email protected]>
AuthorDate: Mon Dec 21 14:44:02 2020 -0500
KAFKA-10825 ZooKeeper ISR manager (#9713)
ISR-related cleanup in ReplicaManager and Partition. Removes ISR change
logic from ReplicaManager and adds a new ZkIsrManager class which adheres to a
new AlterIsrManager trait. Unifies all of the ISR logic in Partition so we
don't have separate code paths for ZK vs AlterIsr. Also removes
PartitionStateStore
---
core/src/main/scala/kafka/cluster/Partition.scala | 138 +++++----------------
.../main/scala/kafka/server/AlterIsrManager.scala | 48 +++++--
core/src/main/scala/kafka/server/KafkaServer.scala | 8 +-
.../main/scala/kafka/server/ReplicaManager.scala | 61 +--------
.../src/main/scala/kafka/server/ZkIsrManager.scala | 109 ++++++++++++++++
.../admin/ReassignPartitionsIntegrationTest.scala | 5 +-
.../unit/kafka/cluster/AbstractPartitionTest.scala | 6 +-
.../unit/kafka/cluster/PartitionLockTest.scala | 13 +-
.../scala/unit/kafka/cluster/PartitionTest.scala | 121 ++++++++++++++----
.../unit/kafka/server/AlterIsrManagerTest.scala | 93 +++++++++-----
.../scala/unit/kafka/server/KafkaServerTest.scala | 31 ++++-
.../unit/kafka/server/ReplicaManagerTest.scala | 7 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 47 +++++--
.../jmh/fetcher/ReplicaFetcherThreadBenchmark.java | 5 +-
.../partition/PartitionMakeFollowerBenchmark.java | 5 +-
.../UpdateFollowerFetchStateBenchmark.java | 5 +-
.../apache/kafka/jmh/server/CheckpointBench.java | 4 -
17 files changed, 427 insertions(+), 279 deletions(-)
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala
b/core/src/main/scala/kafka/cluster/Partition.scala
index c0a7d76..d045c4f 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -28,7 +28,7 @@ import kafka.server._
import kafka.server.checkpoints.OffsetCheckpoints
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils._
-import kafka.zk.{AdminZkClient, KafkaZkClient}
+import kafka.zk.AdminZkClient
import kafka.zookeeper.ZooKeeperClientException
import org.apache.kafka.common.errors._
import org.apache.kafka.common.message.FetchResponseData
@@ -51,40 +51,8 @@ trait IsrChangeListener {
def markFailed(): Unit
}
-trait PartitionStateStore {
- def fetchTopicConfig(): Properties
- def shrinkIsr(controllerEpoch: Int, leaderAndIsr: LeaderAndIsr): Option[Int]
- def expandIsr(controllerEpoch: Int, leaderAndIsr: LeaderAndIsr): Option[Int]
-}
-
-class ZkPartitionStateStore(topicPartition: TopicPartition,
- zkClient: KafkaZkClient) extends
PartitionStateStore {
-
- override def fetchTopicConfig(): Properties = {
- val adminZkClient = new AdminZkClient(zkClient)
- adminZkClient.fetchEntityConfig(ConfigType.Topic, topicPartition.topic)
- }
-
- override def shrinkIsr(controllerEpoch: Int, leaderAndIsr: LeaderAndIsr):
Option[Int] = {
- val newVersionOpt = updateIsr(controllerEpoch, leaderAndIsr)
- newVersionOpt
- }
-
- override def expandIsr(controllerEpoch: Int, leaderAndIsr: LeaderAndIsr):
Option[Int] = {
- val newVersionOpt = updateIsr(controllerEpoch, leaderAndIsr)
- newVersionOpt
- }
-
- private def updateIsr(controllerEpoch: Int, leaderAndIsr: LeaderAndIsr):
Option[Int] = {
- val (updateSucceeded, newVersion) =
ReplicationUtils.updateLeaderAndIsr(zkClient, topicPartition,
- leaderAndIsr, controllerEpoch)
-
- if (updateSucceeded) {
- Some(newVersion)
- } else {
- None
- }
- }
+trait TopicConfigFetcher {
+ def fetch(): Properties
}
class DelayedOperations(topicPartition: TopicPartition,
@@ -109,21 +77,22 @@ object Partition extends KafkaMetricsGroup {
val isrChangeListener = new IsrChangeListener {
override def markExpand(): Unit = {
- replicaManager.recordIsrChange(topicPartition)
replicaManager.isrExpandRate.mark()
}
override def markShrink(): Unit = {
- replicaManager.recordIsrChange(topicPartition)
replicaManager.isrShrinkRate.mark()
}
override def markFailed(): Unit =
replicaManager.failedIsrUpdatesRate.mark()
}
- val zkIsrBackingStore = new ZkPartitionStateStore(
- topicPartition,
- replicaManager.zkClient)
+ val configProvider = new TopicConfigFetcher {
+ override def fetch(): Properties = {
+ val adminZkClient = new AdminZkClient(replicaManager.zkClient)
+ adminZkClient.fetchEntityConfig(ConfigType.Topic, topicPartition.topic)
+ }
+ }
val delayedOperations = new DelayedOperations(
topicPartition,
@@ -136,7 +105,7 @@ object Partition extends KafkaMetricsGroup {
interBrokerProtocolVersion =
replicaManager.config.interBrokerProtocolVersion,
localBrokerId = replicaManager.config.brokerId,
time = time,
- stateStore = zkIsrBackingStore,
+ topicConfigProvider = configProvider,
isrChangeListener = isrChangeListener,
delayedOperations = delayedOperations,
metadataCache = replicaManager.metadataCache,
@@ -259,7 +228,7 @@ class Partition(val topicPartition: TopicPartition,
interBrokerProtocolVersion: ApiVersion,
localBrokerId: Int,
time: Time,
- stateStore: PartitionStateStore,
+ topicConfigProvider: TopicConfigFetcher,
isrChangeListener: IsrChangeListener,
delayedOperations: DelayedOperations,
metadataCache: MetadataCache,
@@ -285,8 +254,6 @@ class Partition(val topicPartition: TopicPartition,
@volatile private[cluster] var isrState: IsrState = CommittedIsr(Set.empty)
@volatile var assignmentState: AssignmentState =
SimpleAssignmentState(Seq.empty)
- private val useAlterIsr: Boolean =
interBrokerProtocolVersion.isAlterIsrSupported
-
// Logs belonging to this partition. Majority of time it will be only one
log, but if log directory
// is getting changed (as a result of ReplicaAlterLogDirs command), we may
have two logs until copy
// completes and a switch to new location is performed.
@@ -375,7 +342,7 @@ class Partition(val topicPartition: TopicPartition,
// Visible for testing
private[cluster] def createLog(isNew: Boolean, isFutureReplica: Boolean,
offsetCheckpoints: OffsetCheckpoints): Log = {
def fetchLogConfig: LogConfig = {
- val props = stateStore.fetchTopicConfig()
+ val props = topicConfigProvider.fetch()
LogConfig.fromProps(logManager.currentDefaultConfig.originals, props)
}
@@ -1325,14 +1292,6 @@ class Partition(val topicPartition: TopicPartition,
}
private[cluster] def expandIsr(newInSyncReplica: Int): Unit = {
- if (useAlterIsr) {
- expandIsrWithAlterIsr(newInSyncReplica)
- } else {
- expandIsrWithZk(newInSyncReplica)
- }
- }
-
- private def expandIsrWithAlterIsr(newInSyncReplica: Int): Unit = {
// This is called from maybeExpandIsr which holds the ISR write lock
if (!isrState.isInflight) {
// When expanding the ISR, we can safely assume the new replica will
make it into the ISR since this puts us in
@@ -1343,26 +1302,7 @@ class Partition(val topicPartition: TopicPartition,
}
}
- private def expandIsrWithZk(newInSyncReplica: Int): Unit = {
- val newInSyncReplicaIds = isrState.isr + newInSyncReplica
- info(s"Expanding ISR from ${isrState.isr.mkString(",")} to
${newInSyncReplicaIds.mkString(",")}")
- val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch,
newInSyncReplicaIds.toList, zkVersion)
- val zkVersionOpt = stateStore.expandIsr(controllerEpoch, newLeaderAndIsr)
- if (zkVersionOpt.isDefined) {
- isrChangeListener.markExpand()
- }
- maybeUpdateIsrAndVersionWithZk(newInSyncReplicaIds, zkVersionOpt)
- }
-
private[cluster] def shrinkIsr(outOfSyncReplicas: Set[Int]): Unit = {
- if (useAlterIsr) {
- shrinkIsrWithAlterIsr(outOfSyncReplicas)
- } else {
- shrinkIsrWithZk(isrState.isr -- outOfSyncReplicas)
- }
- }
-
- private def shrinkIsrWithAlterIsr(outOfSyncReplicas: Set[Int]): Unit = {
// This is called from maybeShrinkIsr which holds the ISR write lock
if (!isrState.isInflight) {
// When shrinking the ISR, we cannot assume that the update will succeed
as this could erroneously advance the HW
@@ -1374,47 +1314,29 @@ class Partition(val topicPartition: TopicPartition,
}
}
- private def shrinkIsrWithZk(newIsr: Set[Int]): Unit = {
- val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch,
newIsr.toList, zkVersion)
- val zkVersionOpt = stateStore.shrinkIsr(controllerEpoch, newLeaderAndIsr)
- if (zkVersionOpt.isDefined) {
- isrChangeListener.markShrink()
- }
- maybeUpdateIsrAndVersionWithZk(newIsr, zkVersionOpt)
- }
-
- private def maybeUpdateIsrAndVersionWithZk(isr: Set[Int], zkVersionOpt:
Option[Int]): Unit = {
- zkVersionOpt match {
- case Some(newVersion) =>
- isrState = CommittedIsr(isr)
- zkVersion = newVersion
- info("ISR updated to [%s] and zkVersion updated to
[%d]".format(isr.mkString(","), zkVersion))
-
- case None =>
- info(s"Cached zkVersion $zkVersion not equal to that in zookeeper,
skip updating ISR")
- isrChangeListener.markFailed()
- }
- }
-
private def sendAlterIsrRequest(proposedIsrState: IsrState): Unit = {
val isrToSend: Set[Int] = proposedIsrState match {
case PendingExpandIsr(isr, newInSyncReplicaId) => isr +
newInSyncReplicaId
case PendingShrinkIsr(isr, outOfSyncReplicaIds) => isr --
outOfSyncReplicaIds
case state =>
- throw new IllegalStateException(s"Invalid state $state for `AlterIsr`
request for partition $topicPartition")
+ isrChangeListener.markFailed()
+ throw new IllegalStateException(s"Invalid state $state for ISR change
for partition $topicPartition")
}
val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch,
isrToSend.toList, zkVersion)
- val alterIsrItem = AlterIsrItem(topicPartition, newLeaderAndIsr,
handleAlterIsrResponse(proposedIsrState))
+ val alterIsrItem = AlterIsrItem(topicPartition, newLeaderAndIsr,
handleAlterIsrResponse(proposedIsrState), controllerEpoch)
- if (!alterIsrManager.enqueue(alterIsrItem)) {
+ val oldState = isrState
+ isrState = proposedIsrState
+
+ if (!alterIsrManager.submit(alterIsrItem)) {
+ // If the ISR manager did not accept our update, we need to revert back
to previous state
+ isrState = oldState
isrChangeListener.markFailed()
- throw new IllegalStateException(s"Failed to enqueue `AlterIsr` request
with state " +
- s"$newLeaderAndIsr for partition $topicPartition")
+ throw new IllegalStateException(s"Failed to enqueue ISR change state
$newLeaderAndIsr for partition $topicPartition")
}
- isrState = proposedIsrState
- debug(s"Sent `AlterIsr` request to change state to $newLeaderAndIsr after
transition to $proposedIsrState")
+ debug(s"Enqueued ISR change to state $newLeaderAndIsr after transition to
$proposedIsrState")
}
/**
@@ -1438,27 +1360,27 @@ class Partition(val topicPartition: TopicPartition,
isrChangeListener.markFailed()
error match {
case Errors.UNKNOWN_TOPIC_OR_PARTITION =>
- debug(s"Controller failed to update ISR to $proposedIsrState
since it doesn't know about this topic or partition. Giving up.")
+ debug(s"Failed to update ISR to $proposedIsrState since it
doesn't know about this topic or partition. Giving up.")
case Errors.FENCED_LEADER_EPOCH =>
- debug(s"Controller failed to update ISR to $proposedIsrState
since we sent an old leader epoch. Giving up.")
+ debug(s"Failed to update ISR to $proposedIsrState since we sent
an old leader epoch. Giving up.")
case Errors.INVALID_UPDATE_VERSION =>
- debug(s"Controller failed to update ISR to $proposedIsrState due
to invalid zk version. Giving up.")
+ debug(s"Failed to update ISR to $proposedIsrState due to invalid
zk version. Giving up.")
case _ =>
- warn(s"Controller failed to update ISR to $proposedIsrState due
to unexpected $error. Retrying.")
+ warn(s"Failed to update ISR to $proposedIsrState due to
unexpected $error. Retrying.")
sendAlterIsrRequest(proposedIsrState)
}
case Right(leaderAndIsr: LeaderAndIsr) =>
// Success from controller, still need to check a few things
if (leaderAndIsr.leaderEpoch != leaderEpoch) {
- debug(s"Ignoring ISR from AlterIsr with ${leaderAndIsr} since we
have a stale leader epoch $leaderEpoch.")
+ debug(s"Ignoring new ISR ${leaderAndIsr} since we have a stale
leader epoch $leaderEpoch.")
isrChangeListener.markFailed()
} else if (leaderAndIsr.zkVersion <= zkVersion) {
- debug(s"Ignoring ISR from AlterIsr with ${leaderAndIsr} since we
have a newer version $zkVersion.")
+ debug(s"Ignoring new ISR ${leaderAndIsr} since we have a newer
version $zkVersion.")
isrChangeListener.markFailed()
} else {
isrState = CommittedIsr(leaderAndIsr.isr.toSet)
zkVersion = leaderAndIsr.zkVersion
- info(s"ISR updated from AlterIsr to ${isrState.isr.mkString(",")}
and version updated to [$zkVersion]")
+ info(s"ISR updated to ${isrState.isr.mkString(",")} and version
updated to [$zkVersion]")
proposedIsrState match {
case PendingExpandIsr(_, _) => isrChangeListener.markExpand()
case PendingShrinkIsr(_, _) => isrChangeListener.markShrink()
diff --git a/core/src/main/scala/kafka/server/AlterIsrManager.scala
b/core/src/main/scala/kafka/server/AlterIsrManager.scala
index 16fe620..fa4616b 100644
--- a/core/src/main/scala/kafka/server/AlterIsrManager.scala
+++ b/core/src/main/scala/kafka/server/AlterIsrManager.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import kafka.api.LeaderAndIsr
import kafka.metrics.KafkaMetricsGroup
import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.{AlterIsrRequestData,
AlterIsrResponseData}
@@ -35,24 +36,51 @@ import scala.collection.mutable.ListBuffer
import scala.jdk.CollectionConverters._
/**
- * Handles the sending of AlterIsr requests to the controller. Updating the
ISR is an asynchronous operation,
- * so partitions will learn about updates through LeaderAndIsr messages sent
from the controller
+ * Handles updating the ISR by sending AlterIsr requests to the controller (as
of 2.7) or by updating ZK directly
+ * (prior to 2.7). Updating the ISR is an asynchronous operation, so
partitions will learn about the result of their
+ * request through a callback.
+ *
+ * Note that ISR state changes can still be initiated by the controller and
sent to the partitions via LeaderAndIsr
+ * requests.
*/
trait AlterIsrManager {
def start(): Unit
- def enqueue(alterIsrItem: AlterIsrItem): Boolean
+ def submit(alterIsrItem: AlterIsrItem): Boolean
def clearPending(topicPartition: TopicPartition): Unit
}
-case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr:
LeaderAndIsr, callback: Either[Errors, LeaderAndIsr] => Unit)
+case class AlterIsrItem(topicPartition: TopicPartition,
+ leaderAndIsr: LeaderAndIsr,
+ callback: Either[Errors, LeaderAndIsr] => Unit,
+ controllerEpoch: Int) // controllerEpoch needed for Zk
impl
+
+object AlterIsrManager {
+ /**
+ * Factory to AlterIsr based implementation, used when IBP >= 2.7-IV2
+ */
+ def apply(controllerChannelManager: BrokerToControllerChannelManager,
+ scheduler: Scheduler,
+ time: Time,
+ brokerId: Int,
+ brokerEpochSupplier: () => Long): AlterIsrManager = {
+ new DefaultAlterIsrManager(controllerChannelManager, scheduler, time,
brokerId, brokerEpochSupplier)
+ }
+
+ /**
+ * Factory for ZK based implementation, used when IBP < 2.7-IV2
+ */
+ def apply(scheduler: Scheduler, time: Time, zkClient: KafkaZkClient):
AlterIsrManager = {
+ new ZkIsrManager(scheduler, time, zkClient)
+ }
+}
-class AlterIsrManagerImpl(val controllerChannelManager:
BrokerToControllerChannelManager,
- val scheduler: Scheduler,
- val time: Time,
- val brokerId: Int,
- val brokerEpochSupplier: () => Long) extends
AlterIsrManager with Logging with KafkaMetricsGroup {
+class DefaultAlterIsrManager(val controllerChannelManager:
BrokerToControllerChannelManager,
+ val scheduler: Scheduler,
+ val time: Time,
+ val brokerId: Int,
+ val brokerEpochSupplier: () => Long) extends
AlterIsrManager with Logging with KafkaMetricsGroup {
// Used to allow only one pending ISR update per partition
private val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new
ConcurrentHashMap[TopicPartition, AlterIsrItem]()
@@ -66,7 +94,7 @@ class AlterIsrManagerImpl(val controllerChannelManager:
BrokerToControllerChanne
scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50,
TimeUnit.MILLISECONDS)
}
- override def enqueue(alterIsrItem: AlterIsrItem): Boolean = {
+ override def submit(alterIsrItem: AlterIsrItem): Boolean = {
unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) ==
null
}
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 3723eae..1e7bb19 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -444,8 +444,12 @@ class KafkaServer(val config: KafkaConfig, time: Time =
Time.SYSTEM, threadNameP
}
protected def createReplicaManager(isShuttingDown: AtomicBoolean):
ReplicaManager = {
- val alterIsrManager = new AlterIsrManagerImpl(alterIsrChannelManager,
kafkaScheduler,
- time, config.brokerId, () => kafkaController.brokerEpoch)
+ val alterIsrManager: AlterIsrManager = if
(config.interBrokerProtocolVersion.isAlterIsrSupported) {
+ AlterIsrManager(alterIsrChannelManager, kafkaScheduler,
+ time, config.brokerId, () => kafkaController.brokerEpoch)
+ } else {
+ AlterIsrManager(kafkaScheduler, time, zkClient)
+ }
new ReplicaManager(config, metrics, time, zkClient, kafkaScheduler,
logManager, isShuttingDown, quotaManagers,
brokerTopicStats, metadataCache, logDirFailureChannel, alterIsrManager)
}
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index a3934a5..5da3b69 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -19,7 +19,7 @@ package kafka.server
import java.io.File
import java.util.Optional
import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+import java.util.concurrent.atomic.{AtomicBoolean}
import java.util.concurrent.locks.Lock
import com.yammer.metrics.core.Meter
@@ -166,26 +166,8 @@ object HostedPartition {
final object Offline extends HostedPartition
}
-case class IsrChangePropagationConfig(
- // How often to check for ISR
- checkIntervalMs: Long,
-
- // Maximum time that an ISR change may be delayed before sending the
notification
- maxDelayMs: Long,
-
- // Maximum time to await additional changes before sending the notification
- lingerMs: Long
-)
-
object ReplicaManager {
val HighWatermarkFilename = "replication-offset-checkpoint"
-
- // This field is mutable to allow overriding change notification behavior in
test cases
- @volatile var DefaultIsrPropagationConfig: IsrChangePropagationConfig =
IsrChangePropagationConfig(
- checkIntervalMs = 2500,
- lingerMs = 5000,
- maxDelayMs = 60000,
- )
}
class ReplicaManager(val config: KafkaConfig,
@@ -251,11 +233,6 @@ class ReplicaManager(val config: KafkaConfig,
this.logIdent = s"[ReplicaManager broker=$localBrokerId] "
private val stateChangeLogger = new StateChangeLogger(localBrokerId,
inControllerContext = false, None)
- private val isrChangeNotificationConfig =
ReplicaManager.DefaultIsrPropagationConfig
- private val isrChangeSet: mutable.Set[TopicPartition] = new
mutable.HashSet[TopicPartition]()
- private val lastIsrChangeMs = new AtomicLong(time.milliseconds())
- private val lastIsrPropagationMs = new AtomicLong(time.milliseconds())
-
private var logDirFailureHandler: LogDirFailureHandler = null
private class LogDirFailureHandler(name: String, haltBrokerOnDirFailure:
Boolean) extends ShutdownableThread(name) {
@@ -294,34 +271,6 @@ class ReplicaManager(val config: KafkaConfig,
scheduler.schedule("highwatermark-checkpoint", checkpointHighWatermarks
_, period = config.replicaHighWatermarkCheckpointIntervalMs, unit =
TimeUnit.MILLISECONDS)
}
- def recordIsrChange(topicPartition: TopicPartition): Unit = {
- if (!config.interBrokerProtocolVersion.isAlterIsrSupported) {
- isrChangeSet synchronized {
- isrChangeSet += topicPartition
- lastIsrChangeMs.set(time.milliseconds())
- }
- }
- }
- /**
- * This function periodically runs to see if ISR needs to be propagated. It
propagates ISR when:
- * 1. There is ISR change not propagated yet.
- * 2. There is no ISR Change in the last five seconds, or it has been more
than 60 seconds since the last ISR propagation.
- * This allows an occasional ISR change to be propagated within a few
seconds, and avoids overwhelming controller and
- * other brokers when large amount of ISR change occurs.
- */
- def maybePropagateIsrChanges(): Unit = {
- val now = time.milliseconds()
- isrChangeSet synchronized {
- if (isrChangeSet.nonEmpty &&
- (lastIsrChangeMs.get() + isrChangeNotificationConfig.lingerMs < now ||
- lastIsrPropagationMs.get() + isrChangeNotificationConfig.maxDelayMs
< now)) {
- zkClient.propagateIsrChanges(isrChangeSet)
- isrChangeSet.clear()
- lastIsrPropagationMs.set(now)
- }
- }
- }
-
// When ReplicaAlterDirThread finishes replacing a current replica with a
future replica, it will
// remove the partition from the partition state map. But it will not close
itself even if the
// partition state map is empty. Thus we need to call
shutdownIdleReplicaAlterDirThread() periodically
@@ -343,14 +292,8 @@ class ReplicaManager(val config: KafkaConfig,
// start ISR expiration thread
// A follower can lag behind leader for up to config.replicaLagTimeMaxMs x
1.5 before it is removed from ISR
scheduler.schedule("isr-expiration", maybeShrinkIsr _, period =
config.replicaLagTimeMaxMs / 2, unit = TimeUnit.MILLISECONDS)
- // If using AlterIsr, we don't need the znode ISR propagation
- if (!config.interBrokerProtocolVersion.isAlterIsrSupported) {
- scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges _,
- period = isrChangeNotificationConfig.checkIntervalMs, unit =
TimeUnit.MILLISECONDS)
- } else {
- alterIsrManager.start()
- }
scheduler.schedule("shutdown-idle-replica-alter-log-dirs-thread",
shutdownIdleReplicaAlterLogDirsThread _, period = 10000L, unit =
TimeUnit.MILLISECONDS)
+ alterIsrManager.start()
// If inter-broker protocol (IBP) < 1.0, the controller will send
LeaderAndIsrRequest V0 which does not include isNew field.
// In this case, the broker receiving the request cannot determine whether
it is safe to create a partition if a log directory has failed.
diff --git a/core/src/main/scala/kafka/server/ZkIsrManager.scala
b/core/src/main/scala/kafka/server/ZkIsrManager.scala
new file mode 100644
index 0000000..2d88aac
--- /dev/null
+++ b/core/src/main/scala/kafka/server/ZkIsrManager.scala
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.utils.{Logging, ReplicationUtils, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.utils.Time
+
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicLong
+import scala.collection.mutable
+
+/**
+ * @param checkIntervalMs How often to check for ISR
+ * @param maxDelayMs Maximum time that an ISR change may be delayed before
sending the notification
+ * @param lingerMs Maximum time to await additional changes before sending
the notification
+ */
+case class IsrChangePropagationConfig(checkIntervalMs: Long, maxDelayMs: Long,
lingerMs: Long)
+
+object ZkIsrManager {
+ // This field is mutable to allow overriding change notification behavior in
test cases
+ @volatile var DefaultIsrPropagationConfig: IsrChangePropagationConfig =
IsrChangePropagationConfig(
+ checkIntervalMs = 2500,
+ lingerMs = 5000,
+ maxDelayMs = 60000,
+ )
+}
+
+class ZkIsrManager(scheduler: Scheduler, time: Time, zkClient: KafkaZkClient)
extends AlterIsrManager with Logging {
+
+ private val isrChangeNotificationConfig =
ZkIsrManager.DefaultIsrPropagationConfig
+ // Visible for testing
+ private[server] val isrChangeSet: mutable.Set[TopicPartition] = new
mutable.HashSet[TopicPartition]()
+ private val lastIsrChangeMs = new AtomicLong(time.milliseconds())
+ private val lastIsrPropagationMs = new AtomicLong(time.milliseconds())
+
+ override def start(): Unit = {
+ scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges _,
+ period = isrChangeNotificationConfig.checkIntervalMs, unit =
TimeUnit.MILLISECONDS)
+ }
+
+ override def clearPending(topicPartition: TopicPartition): Unit = {
+ // Since we always immediately process ZK updates and never actually
enqueue anything, there is nothing to
+ // clear here so this is a no-op. Even if there are changes that have not
been propagated, the write to ZK
+ // has already happened, so we may as well send the notification to the
controller.
+ }
+
+ override def submit(alterIsrItem: AlterIsrItem): Boolean = {
+ debug(s"Writing new ISR ${alterIsrItem.leaderAndIsr.isr} to ZooKeeper with
version " +
+ s"${alterIsrItem.leaderAndIsr.zkVersion} for partition
${alterIsrItem.topicPartition}")
+
+ val (updateSucceeded, newVersion) =
ReplicationUtils.updateLeaderAndIsr(zkClient, alterIsrItem.topicPartition,
+ alterIsrItem.leaderAndIsr, alterIsrItem.controllerEpoch)
+
+ if (updateSucceeded) {
+ // Track which partitions need to be propagated to the controller
+ isrChangeSet synchronized {
+ isrChangeSet += alterIsrItem.topicPartition
+ lastIsrChangeMs.set(time.milliseconds())
+ }
+
+ // We rely on Partition#isrState being properly set to the pending ISR
at this point since we are synchronously
+ // applying the callback
+
alterIsrItem.callback.apply(Right(alterIsrItem.leaderAndIsr.withZkVersion(newVersion)))
+ } else {
+ alterIsrItem.callback.apply(Left(Errors.INVALID_UPDATE_VERSION))
+ }
+
+ // Return true since we unconditionally accept the AlterIsrItem. The
result of the operation is indicated by the
+ // callback, not the return value of this method
+ true
+ }
+
+ /**
+ * This function periodically runs to see if ISR needs to be propagated. It
propagates ISR when:
+ * 1. There is ISR change not propagated yet.
+ * 2. There is no ISR Change in the last five seconds, or it has been more
than 60 seconds since the last ISR propagation.
+ * This allows an occasional ISR change to be propagated within a few
seconds, and avoids overwhelming controller and
+ * other brokers when large amount of ISR change occurs.
+ */
+ private[server] def maybePropagateIsrChanges(): Unit = {
+ val now = time.milliseconds()
+ isrChangeSet synchronized {
+ if (isrChangeSet.nonEmpty &&
+ (lastIsrChangeMs.get() + isrChangeNotificationConfig.lingerMs < now ||
+ lastIsrPropagationMs.get() + isrChangeNotificationConfig.maxDelayMs
< now)) {
+ zkClient.propagateIsrChanges(isrChangeSet)
+ isrChangeSet.clear()
+ lastIsrPropagationMs.set(now)
+ }
+ }
+ }
+}
diff --git
a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
index 6d5d02d..0b108d2 100644
---
a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
@@ -19,10 +19,9 @@ package kafka.admin
import java.io.Closeable
import java.util.{Collections, HashMap, List}
-
import kafka.admin.ReassignPartitionsCommand._
import kafka.api.KAFKA_2_7_IV1
-import kafka.server.{IsrChangePropagationConfig, KafkaConfig, KafkaServer,
ReplicaManager}
+import kafka.server.{IsrChangePropagationConfig, KafkaConfig, KafkaServer,
ZkIsrManager}
import kafka.utils.Implicits._
import kafka.utils.TestUtils
import kafka.zk.{KafkaZkClient, ZooKeeperTestHarness}
@@ -88,7 +87,7 @@ class ReassignPartitionsIntegrationTest extends
ZooKeeperTestHarness {
// Override change notification settings so that test is not delayed by ISR
// change notification delay
- ReplicaManager.DefaultIsrPropagationConfig = IsrChangePropagationConfig(
+ ZkIsrManager.DefaultIsrPropagationConfig = IsrChangePropagationConfig(
checkIntervalMs = 500,
lingerMs = 100,
maxDelayMs = 500
diff --git a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
index 603598e..99f5d54 100644
--- a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
@@ -43,7 +43,7 @@ class AbstractPartitionTest {
var alterIsrManager: MockAlterIsrManager = _
var isrChangeListener: MockIsrChangeListener = _
var logConfig: LogConfig = _
- val stateStore: PartitionStateStore = mock(classOf[PartitionStateStore])
+ var topicConfigProvider: TopicConfigFetcher = _
val delayedOperations: DelayedOperations = mock(classOf[DelayedOperations])
val metadataCache: MetadataCache = mock(classOf[MetadataCache])
val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints])
@@ -55,6 +55,7 @@ class AbstractPartitionTest {
val logProps = createLogProperties(Map.empty)
logConfig = LogConfig(logProps)
+ topicConfigProvider = TestUtils.createTopicConfigProvider(logProps)
tmpDir = TestUtils.tempDir()
logDir1 = TestUtils.randomPartitionLogDir(tmpDir)
@@ -70,14 +71,13 @@ class AbstractPartitionTest {
interBrokerProtocolVersion = ApiVersion.latestVersion,
localBrokerId = brokerId,
time,
- stateStore,
+ topicConfigProvider,
isrChangeListener,
delayedOperations,
metadataCache,
logManager,
alterIsrManager)
-
when(stateStore.fetchTopicConfig()).thenReturn(createLogProperties(Map.empty))
when(offsetCheckpoints.fetch(ArgumentMatchers.anyString,
ArgumentMatchers.eq(topicPartition)))
.thenReturn(None)
}
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
index e5dbec7..ab4cd8a 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
@@ -21,7 +21,7 @@ import java.util.Properties
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent._
-import kafka.api.{ApiVersion, LeaderAndIsr}
+import kafka.api.ApiVersion
import kafka.log._
import kafka.server._
import kafka.server.checkpoints.OffsetCheckpoints
@@ -248,7 +248,7 @@ class PartitionLockTest extends Logging {
val leaderEpoch = 1
val brokerId = 0
val topicPartition = new TopicPartition("test-topic", 0)
- val stateStore: PartitionStateStore = mock(classOf[PartitionStateStore])
+ val topicConfigProvider =
TestUtils.createTopicConfigProvider(createLogProperties(Map.empty))
val isrChangeListener: IsrChangeListener = mock(classOf[IsrChangeListener])
val delayedOperations: DelayedOperations = mock(classOf[DelayedOperations])
val metadataCache: MetadataCache = mock(classOf[MetadataCache])
@@ -261,7 +261,7 @@ class PartitionLockTest extends Logging {
interBrokerProtocolVersion = ApiVersion.latestVersion,
localBrokerId = brokerId,
mockTime,
- stateStore,
+ topicConfigProvider,
isrChangeListener,
delayedOperations,
metadataCache,
@@ -282,14 +282,9 @@ class PartitionLockTest extends Logging {
new SlowLog(log, mockTime, appendSemaphore)
}
}
-
when(stateStore.fetchTopicConfig()).thenReturn(createLogProperties(Map.empty))
when(offsetCheckpoints.fetch(ArgumentMatchers.anyString,
ArgumentMatchers.eq(topicPartition)))
.thenReturn(None)
- when(stateStore.shrinkIsr(ArgumentMatchers.anyInt,
ArgumentMatchers.any[LeaderAndIsr]))
- .thenReturn(Some(2))
- when(stateStore.expandIsr(ArgumentMatchers.anyInt,
ArgumentMatchers.any[LeaderAndIsr]))
- .thenReturn(Some(2))
- when(alterIsrManager.enqueue(ArgumentMatchers.any[AlterIsrItem]))
+ when(alterIsrManager.submit(ArgumentMatchers.any[AlterIsrItem]))
.thenReturn(true)
partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
offsetCheckpoints)
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index ccf5c9a..bad66f2 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -20,13 +20,14 @@ import java.nio.ByteBuffer
import java.util.Optional
import java.util.concurrent.{CountDownLatch, Semaphore}
import com.yammer.metrics.core.Metric
-import kafka.api.{ApiVersion, LeaderAndIsr}
+import kafka.api.{ApiVersion, KAFKA_2_6_IV0}
import kafka.common.UnexpectedAppendOffsetException
import kafka.log.{Defaults => _, _}
import kafka.metrics.KafkaYammerMetrics
import kafka.server._
import kafka.server.checkpoints.OffsetCheckpoints
import kafka.utils._
+import kafka.zk.KafkaZkClient
import org.apache.kafka.common.errors.{ApiException,
NotLeaderOrFollowerException, OffsetNotAvailableException,
OffsetOutOfRangeException}
import org.apache.kafka.common.message.FetchResponseData
import
org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
@@ -40,6 +41,7 @@ import org.apache.kafka.common.{IsolationLevel,
TopicPartition}
import org.junit.Assert._
import org.junit.Test
import org.mockito.ArgumentMatchers
+import org.mockito.ArgumentMatchers.{any, anyString}
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.scalatest.Assertions.assertThrows
@@ -227,7 +229,7 @@ class PartitionTest extends AbstractPartitionTest {
interBrokerProtocolVersion = ApiVersion.latestVersion,
localBrokerId = brokerId,
time,
- stateStore,
+ topicConfigProvider,
isrChangeListener,
delayedOperations,
metadataCache,
@@ -606,16 +608,15 @@ class PartitionTest extends AbstractPartitionTest {
}
}
- when(stateStore.expandIsr(controllerEpoch, new LeaderAndIsr(leader,
leaderEpoch,
- List(leader, follower2, follower1), 1)))
- .thenReturn(Some(2))
-
updateFollowerFetchState(follower1, LogOffsetMetadata(0))
updateFollowerFetchState(follower1, LogOffsetMetadata(2))
updateFollowerFetchState(follower2, LogOffsetMetadata(0))
updateFollowerFetchState(follower2, LogOffsetMetadata(2))
+ // Simulate successful ISR update
+ alterIsrManager.completeIsrUpdate(2)
+
// At this point, the leader has gotten 5 writes, but followers have only
fetched two
assertEquals(2, partition.localLogOrException.highWatermark)
@@ -699,14 +700,13 @@ class PartitionTest extends AbstractPartitionTest {
case Left(e: ApiException) => fail(s"Should have seen
OffsetNotAvailableException, saw $e")
}
- when(stateStore.expandIsr(controllerEpoch, new LeaderAndIsr(leader,
leaderEpoch + 2,
- List(leader, follower2, follower1), 5)))
- .thenReturn(Some(2))
-
// Next fetch from replicas, HW is moved up to 5 (ahead of the LEO)
updateFollowerFetchState(follower1, LogOffsetMetadata(5))
updateFollowerFetchState(follower2, LogOffsetMetadata(5))
+ // Simulate successful ISR update
+ alterIsrManager.completeIsrUpdate(6)
+
// Error goes away
fetchOffsetsForTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP,
Some(IsolationLevel.READ_UNCOMMITTED)) match {
case Right(Some(offsetAndTimestamp)) => assertEquals(5,
offsetAndTimestamp.offset)
@@ -1006,7 +1006,7 @@ class PartitionTest extends AbstractPartitionTest {
// Expansion does not affect the ISR
assertEquals("ISR", Set[Integer](leader, follower2),
partition.isrState.isr)
assertEquals("ISR", Set[Integer](leader, follower1, follower2),
partition.isrState.maximalIsr)
- assertEquals("AlterIsr",
alterIsrManager.isrUpdates.dequeue().leaderAndIsr.isr.toSet,
+ assertEquals("AlterIsr",
alterIsrManager.isrUpdates.head.leaderAndIsr.isr.toSet,
Set(leader, follower1, follower2))
}
@@ -1165,7 +1165,7 @@ class PartitionTest extends AbstractPartitionTest {
leaderEndOffset = 6L)
assertEquals(alterIsrManager.isrUpdates.size, 1)
- val isrItem = alterIsrManager.isrUpdates.dequeue()
+ val isrItem = alterIsrManager.isrUpdates.head
assertEquals(isrItem.leaderAndIsr.isr, List(brokerId, remoteBrokerId))
assertEquals(Set(brokerId), partition.isrState.isr)
assertEquals(Set(brokerId, remoteBrokerId), partition.isrState.maximalIsr)
@@ -1173,7 +1173,7 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(0L, remoteReplica.logStartOffset)
// Complete the ISR expansion
- isrItem.callback.apply(Right(new LeaderAndIsr(brokerId, leaderEpoch,
List(brokerId, remoteBrokerId), 2)))
+ alterIsrManager.completeIsrUpdate(2)
assertEquals(Set(brokerId, remoteBrokerId), partition.isrState.isr)
assertEquals(isrChangeListener.expands.get, 1)
@@ -1224,8 +1224,7 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(0L, remoteReplica.logStartOffset)
// Simulate failure callback
- val alterIsrItem = alterIsrManager.isrUpdates.dequeue()
- alterIsrItem.callback.apply(Left(Errors.INVALID_UPDATE_VERSION))
+ alterIsrManager.failIsrUpdate(Errors.INVALID_UPDATE_VERSION)
// Still no ISR change
assertEquals(Set(brokerId), partition.inSyncReplicaIds)
@@ -1281,7 +1280,7 @@ class PartitionTest extends AbstractPartitionTest {
// Shrink the ISR
partition.maybeShrinkIsr()
assertEquals(alterIsrManager.isrUpdates.size, 1)
- assertEquals(alterIsrManager.isrUpdates.dequeue().leaderAndIsr.isr,
List(brokerId))
+ assertEquals(alterIsrManager.isrUpdates.head.leaderAndIsr.isr,
List(brokerId))
assertEquals(Set(brokerId, remoteBrokerId), partition.isrState.isr)
assertEquals(Set(brokerId, remoteBrokerId), partition.isrState.maximalIsr)
assertEquals(0L, partition.localLogOrException.highWatermark)
@@ -1450,8 +1449,7 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(0L, partition.localLogOrException.highWatermark)
// Simulate failure callback
- val alterIsrItem = alterIsrManager.isrUpdates.dequeue()
- alterIsrItem.callback.apply(Left(Errors.INVALID_UPDATE_VERSION))
+ alterIsrManager.failIsrUpdate(Errors.INVALID_UPDATE_VERSION)
// Ensure ISR hasn't changed
assertEquals(partition.isrState.getClass, classOf[PendingShrinkIsr])
@@ -1534,7 +1532,7 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(0L, remoteReplica.logStartOffset)
// Failure
- alterIsrManager.isrUpdates.dequeue().callback(Left(error))
+ alterIsrManager.failIsrUpdate(error)
callback(brokerId, remoteBrokerId, partition)
}
@@ -1582,6 +1580,74 @@ class PartitionTest extends AbstractPartitionTest {
}
@Test
+ def testZkIsrManagerAsyncCallback(): Unit = {
+ // We need a real scheduler here so that the ISR write lock works properly
+ val scheduler = new KafkaScheduler(1, "zk-isr-test")
+ scheduler.startup()
+ val kafkaZkClient = mock(classOf[KafkaZkClient])
+
+ doAnswer(_ => (true, 2))
+ .when(kafkaZkClient)
+ .conditionalUpdatePath(anyString(), any(), ArgumentMatchers.eq(1), any())
+
+ val zkIsrManager = AlterIsrManager(scheduler, time, kafkaZkClient)
+ zkIsrManager.start()
+
+ val partition = new Partition(topicPartition,
+ replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
+ interBrokerProtocolVersion = KAFKA_2_6_IV0, // shouldn't matter, but set
this to a ZK isr version
+ localBrokerId = brokerId,
+ time,
+ topicConfigProvider,
+ isrChangeListener,
+ delayedOperations,
+ metadataCache,
+ logManager,
+ zkIsrManager)
+
+ val log = logManager.getOrCreateLog(topicPartition, () => logConfig)
+ seedLogData(log, numRecords = 10, leaderEpoch = 4)
+
+ val controllerEpoch = 0
+ val leaderEpoch = 5
+ val follower1 = brokerId + 1
+ val follower2 = brokerId + 2
+ val follower3 = brokerId + 3
+ val replicas = List[Integer](brokerId, follower1, follower2,
follower3).asJava
+ val isr = List[Integer](brokerId, follower1, follower2).asJava
+
+ doNothing().when(delayedOperations).checkAndCompleteAll()
+
+ partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
offsetCheckpoints)
+ assertTrue("Expected become leader transition to succeed",
+ partition.makeLeader(
+ new LeaderAndIsrPartitionState()
+ .setControllerEpoch(controllerEpoch)
+ .setLeader(brokerId)
+ .setLeaderEpoch(leaderEpoch)
+ .setIsr(isr)
+ .setZkVersion(1)
+ .setReplicas(replicas)
+ .setIsNew(true),
+ offsetCheckpoints))
+ assertEquals(Set(brokerId, follower1, follower2), partition.isrState.isr)
+ assertEquals(0L, partition.localLogOrException.highWatermark)
+
+ // Expand ISR
+ partition.expandIsr(follower3)
+
+ // Try avoiding a race
+ TestUtils.waitUntilTrue(() => !partition.isrState.isInflight, "Expected
ISR state to be committed", 100)
+
+ partition.isrState match {
+ case committed: CommittedIsr => assertEquals(Set(brokerId, follower1,
follower2, follower3), committed.isr)
+ case _ => fail("Expected a committed ISR following Zk expansion")
+ }
+
+ scheduler.shutdown()
+ }
+
+ @Test
def testUseCheckpointToInitializeHighWatermark(): Unit = {
val log = logManager.getOrCreateLog(topicPartition, () => logConfig)
seedLogData(log, numRecords = 6, leaderEpoch = 5)
@@ -1653,7 +1719,7 @@ class PartitionTest extends AbstractPartitionTest {
val topicPartition = new TopicPartition("test", 1)
val partition = new Partition(
topicPartition, 1000, ApiVersion.latestVersion, 0,
- new SystemTime(), mock(classOf[PartitionStateStore]),
mock(classOf[IsrChangeListener]), mock(classOf[DelayedOperations]),
+ new SystemTime(), topicConfigProvider, mock(classOf[IsrChangeListener]),
mock(classOf[DelayedOperations]),
mock(classOf[MetadataCache]), mock(classOf[LogManager]),
mock(classOf[AlterIsrManager]))
val replicas = Seq(0, 1, 2, 3)
@@ -1690,12 +1756,13 @@ class PartitionTest extends AbstractPartitionTest {
@Test
def testLogConfigNotDirty(): Unit = {
val spyLogManager = spy(logManager)
+ val spyConfigProvider = spy(topicConfigProvider)
val partition = new Partition(topicPartition,
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
interBrokerProtocolVersion = ApiVersion.latestVersion,
localBrokerId = brokerId,
time,
- stateStore,
+ spyConfigProvider,
isrChangeListener,
delayedOperations,
metadataCache,
@@ -1711,7 +1778,7 @@ class PartitionTest extends AbstractPartitionTest {
ArgumentMatchers.any()) // This doesn't get evaluated, but needed to
satisfy compilation
// We should get config from ZK only once
- verify(stateStore).fetchTopicConfig()
+ verify(spyConfigProvider, times(1)).fetch()
}
/**
@@ -1720,6 +1787,7 @@ class PartitionTest extends AbstractPartitionTest {
*/
@Test
def testLogConfigDirtyAsTopicUpdated(): Unit = {
+ val spyConfigProvider = spy(topicConfigProvider)
val spyLogManager = spy(logManager)
doAnswer((invocation: InvocationOnMock) => {
logManager.initializingLog(topicPartition)
@@ -1731,7 +1799,7 @@ class PartitionTest extends AbstractPartitionTest {
interBrokerProtocolVersion = ApiVersion.latestVersion,
localBrokerId = brokerId,
time,
- stateStore,
+ spyConfigProvider,
isrChangeListener,
delayedOperations,
metadataCache,
@@ -1748,7 +1816,7 @@ class PartitionTest extends AbstractPartitionTest {
// We should get config from ZK twice, once before log is created, and
second time once
// we find log config is dirty and refresh it.
- verify(stateStore, times(2)).fetchTopicConfig()
+ verify(spyConfigProvider, times(2)).fetch()
}
/**
@@ -1757,6 +1825,7 @@ class PartitionTest extends AbstractPartitionTest {
*/
@Test
def testLogConfigDirtyAsBrokerUpdated(): Unit = {
+ val spyConfigProvider = spy(topicConfigProvider)
val spyLogManager = spy(logManager)
doAnswer((invocation: InvocationOnMock) => {
logManager.initializingLog(topicPartition)
@@ -1768,7 +1837,7 @@ class PartitionTest extends AbstractPartitionTest {
interBrokerProtocolVersion = ApiVersion.latestVersion,
localBrokerId = brokerId,
time,
- stateStore,
+ spyConfigProvider,
isrChangeListener,
delayedOperations,
metadataCache,
@@ -1785,7 +1854,7 @@ class PartitionTest extends AbstractPartitionTest {
// We should get config from ZK twice, once before log is created, and
second time once
// we find log config is dirty and refresh it.
- verify(stateStore, times(2)).fetchTopicConfig()
+ verify(spyConfigProvider, times(2)).fetch()
}
private def seedLogData(log: Log, numRecords: Int, leaderEpoch: Int): Unit =
{
diff --git a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala
b/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala
index 1f451fe..a2e269b 100644
--- a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala
@@ -19,10 +19,10 @@ package unit.kafka.server
import java.util.Collections
import java.util.concurrent.atomic.AtomicInteger
-
import kafka.api.LeaderAndIsr
-import kafka.server.{AlterIsrItem, AlterIsrManager, AlterIsrManagerImpl,
BrokerToControllerChannelManager, ControllerRequestCompletionHandler}
+import kafka.server.{AlterIsrItem, AlterIsrManager,
BrokerToControllerChannelManager, ControllerRequestCompletionHandler,
DefaultAlterIsrManager, ZkIsrManager}
import kafka.utils.{MockScheduler, MockTime}
+import kafka.zk.KafkaZkClient
import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.AlterIsrResponseData
@@ -32,6 +32,8 @@ import org.apache.kafka.common.requests.{AbstractRequest,
AlterIsrRequest, Alter
import org.easymock.EasyMock
import org.junit.Assert._
import org.junit.{Before, Test}
+import org.mockito.ArgumentMatchers.{any, anyString}
+import org.mockito.{ArgumentMatchers, Mockito}
class AlterIsrManagerTest {
@@ -59,9 +61,9 @@ class AlterIsrManagerTest {
EasyMock.replay(brokerToController)
val scheduler = new MockScheduler(time)
- val alterIsrManager = new AlterIsrManagerImpl(brokerToController,
scheduler, time, brokerId, () => 2)
+ val alterIsrManager = new DefaultAlterIsrManager(brokerToController,
scheduler, time, brokerId, () => 2)
alterIsrManager.start()
- alterIsrManager.enqueue(AlterIsrItem(tp0, new LeaderAndIsr(1, 1,
List(1,2,3), 10), _ => {}))
+ alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1,
List(1,2,3), 10), _ => {}, 0))
time.sleep(50)
scheduler.tick()
@@ -75,12 +77,12 @@ class AlterIsrManagerTest {
EasyMock.replay(brokerToController)
val scheduler = new MockScheduler(time)
- val alterIsrManager = new AlterIsrManagerImpl(brokerToController,
scheduler, time, brokerId, () => 2)
+ val alterIsrManager = new DefaultAlterIsrManager(brokerToController,
scheduler, time, brokerId, () => 2)
alterIsrManager.start()
// Only send one ISR update for a given topic+partition
- assertTrue(alterIsrManager.enqueue(AlterIsrItem(tp0, new LeaderAndIsr(1,
1, List(1,2,3), 10), _ => {})))
- assertFalse(alterIsrManager.enqueue(AlterIsrItem(tp0, new LeaderAndIsr(1,
1, List(1,2), 10), _ => {})))
+ assertTrue(alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1,
List(1,2,3), 10), _ => {}, 0)))
+ assertFalse(alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1,
1, List(1,2), 10), _ => {}, 0)))
time.sleep(50)
scheduler.tick()
@@ -99,12 +101,12 @@ class AlterIsrManagerTest {
EasyMock.replay(brokerToController)
val scheduler = new MockScheduler(time)
- val alterIsrManager = new AlterIsrManagerImpl(brokerToController,
scheduler, time, brokerId, () => 2)
+ val alterIsrManager = new DefaultAlterIsrManager(brokerToController,
scheduler, time, brokerId, () => 2)
alterIsrManager.start()
for (i <- 0 to 9) {
- alterIsrManager.enqueue(AlterIsrItem(new TopicPartition(topic, i),
- new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}))
+ alterIsrManager.submit(AlterIsrItem(new TopicPartition(topic, i),
+ new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0))
time.sleep(1)
}
@@ -112,8 +114,8 @@ class AlterIsrManagerTest {
scheduler.tick()
// This should not be included in the batch
- alterIsrManager.enqueue(AlterIsrItem(new TopicPartition(topic, 10),
- new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}))
+ alterIsrManager.submit(AlterIsrItem(new TopicPartition(topic, 10),
+ new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0))
EasyMock.verify(brokerToController)
@@ -124,26 +126,26 @@ class AlterIsrManagerTest {
@Test
def testAuthorizationFailed(): Unit = {
- val isrs = Seq(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10),
_ => { }))
+ val isrs = Seq(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10),
_ => { }, 0))
val manager = testTopLevelError(isrs, Errors.CLUSTER_AUTHORIZATION_FAILED)
// On authz error, we log the exception and keep retrying
- assertFalse(manager.enqueue(AlterIsrItem(tp0, null, _ => { })))
+ assertFalse(manager.submit(AlterIsrItem(tp0, null, _ => { }, 0)))
}
@Test
def testStaleBrokerEpoch(): Unit = {
- val isrs = Seq(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10),
_ => { }))
+ val isrs = Seq(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10),
_ => { }, 0))
val manager = testTopLevelError(isrs, Errors.STALE_BROKER_EPOCH)
// On stale broker epoch, we want to retry, so we don't clear items from
the pending map
- assertFalse(manager.enqueue(AlterIsrItem(tp0, null, _ => { })))
+ assertFalse(manager.submit(AlterIsrItem(tp0, null, _ => { }, 0)))
}
@Test
def testOtherErrors(): Unit = {
- val isrs = Seq(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10),
_ => { }))
+ val isrs = Seq(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10),
_ => { }, 0))
val manager = testTopLevelError(isrs, Errors.UNKNOWN_SERVER_ERROR)
// On other unexpected errors, we also want to retry
- assertFalse(manager.enqueue(AlterIsrItem(tp0, null, _ => { })))
+ assertFalse(manager.submit(AlterIsrItem(tp0, null, _ => { }, 0)))
}
def testTopLevelError(isrs: Seq[AlterIsrItem], error: Errors):
AlterIsrManager = {
@@ -153,9 +155,9 @@ class AlterIsrManagerTest {
EasyMock.replay(brokerToController)
val scheduler = new MockScheduler(time)
- val alterIsrManager = new AlterIsrManagerImpl(brokerToController,
scheduler, time, brokerId, () => 2)
+ val alterIsrManager = new DefaultAlterIsrManager(brokerToController,
scheduler, time, brokerId, () => 2)
alterIsrManager.start()
- isrs.foreach(alterIsrManager.enqueue)
+ isrs.foreach(alterIsrManager.submit)
time.sleep(100)
scheduler.tick()
@@ -175,7 +177,7 @@ class AlterIsrManagerTest {
errors.foreach(error => {
val alterIsrManager = testPartitionError(tp0, error)
// Any partition-level error should clear the item from the pending
queue allowing for future updates
- assertTrue(alterIsrManager.enqueue(AlterIsrItem(tp0, null, _ => { })))
+ assertTrue(alterIsrManager.submit(AlterIsrItem(tp0, null, _ => { }, 0)))
})
}
@@ -186,7 +188,7 @@ class AlterIsrManagerTest {
EasyMock.replay(brokerToController)
val scheduler = new MockScheduler(time)
- val alterIsrManager = new AlterIsrManagerImpl(brokerToController,
scheduler, time, brokerId, () => 2)
+ val alterIsrManager = new DefaultAlterIsrManager(brokerToController,
scheduler, time, brokerId, () => 2)
alterIsrManager.start()
var capturedError: Option[Errors] = None
@@ -197,7 +199,7 @@ class AlterIsrManagerTest {
}
}
- alterIsrManager.enqueue(AlterIsrItem(tp, new LeaderAndIsr(1, 1,
List(1,2,3), 10), callback))
+ alterIsrManager.submit(AlterIsrItem(tp, new LeaderAndIsr(1, 1,
List(1,2,3), 10), callback, 0))
time.sleep(100)
scheduler.tick()
@@ -228,16 +230,16 @@ class AlterIsrManagerTest {
EasyMock.replay(brokerToController)
val scheduler = new MockScheduler(time)
- val alterIsrManager = new AlterIsrManagerImpl(brokerToController,
scheduler, time, brokerId, () => 2)
+ val alterIsrManager = new DefaultAlterIsrManager(brokerToController,
scheduler, time, brokerId, () => 2)
alterIsrManager.start()
- alterIsrManager.enqueue(AlterIsrItem(tp0, new LeaderAndIsr(1, 1,
List(1,2,3), 10), _ => {}))
+ alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1,
List(1,2,3), 10), _ => {}, 0))
time.sleep(100)
scheduler.tick() // Triggers a request
// Enqueue more updates
- alterIsrManager.enqueue(AlterIsrItem(tp1, new LeaderAndIsr(1, 1,
List(1,2,3), 10), _ => {}))
- alterIsrManager.enqueue(AlterIsrItem(tp2, new LeaderAndIsr(1, 1,
List(1,2,3), 10), _ => {}))
+ alterIsrManager.submit(AlterIsrItem(tp1, new LeaderAndIsr(1, 1,
List(1,2,3), 10), _ => {}, 0))
+ alterIsrManager.submit(AlterIsrItem(tp2, new LeaderAndIsr(1, 1,
List(1,2,3), 10), _ => {}, 0))
time.sleep(100)
scheduler.tick() // Trigger the schedule again, but no request this time
@@ -267,7 +269,7 @@ class AlterIsrManagerTest {
EasyMock.replay(brokerToController)
val scheduler = new MockScheduler(time)
- val alterIsrManager = new AlterIsrManagerImpl(brokerToController,
scheduler, time, brokerId, () => 2)
+ val alterIsrManager = new DefaultAlterIsrManager(brokerToController,
scheduler, time, brokerId, () => 2)
alterIsrManager.start()
val count = new AtomicInteger(0)
@@ -275,9 +277,9 @@ class AlterIsrManagerTest {
count.incrementAndGet()
return
}
- alterIsrManager.enqueue(AlterIsrItem(tp0, new LeaderAndIsr(1, 1,
List(1,2,3), 10), callback))
- alterIsrManager.enqueue(AlterIsrItem(tp1, new LeaderAndIsr(1, 1,
List(1,2,3), 10), callback))
- alterIsrManager.enqueue(AlterIsrItem(tp2, new LeaderAndIsr(1, 1,
List(1,2,3), 10), callback))
+ alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1,
List(1,2,3), 10), callback, 0))
+ alterIsrManager.submit(AlterIsrItem(tp1, new LeaderAndIsr(1, 1,
List(1,2,3), 10), callback, 0))
+ alterIsrManager.submit(AlterIsrItem(tp2, new LeaderAndIsr(1, 1,
List(1,2,3), 10), callback, 0))
time.sleep(100)
@@ -300,4 +302,33 @@ class AlterIsrManagerTest {
assertEquals("Expected all callbacks to run", count.get, 3)
}
+
+ @Test
+ def testZkBasic(): Unit = {
+ val scheduler = new MockScheduler(time)
+ scheduler.startup()
+
+ val kafkaZkClient = Mockito.mock(classOf[KafkaZkClient])
+ Mockito.doAnswer(_ => (true, 2))
+ .when(kafkaZkClient)
+ .conditionalUpdatePath(anyString(), any(), ArgumentMatchers.eq(1), any())
+ Mockito.doAnswer(_ => (false, 2))
+ .when(kafkaZkClient)
+ .conditionalUpdatePath(anyString(), any(), ArgumentMatchers.eq(3), any())
+
+ val zkIsrManager = new ZkIsrManager(scheduler, time, kafkaZkClient)
+ zkIsrManager.start()
+
+ def expectMatch(expect: Either[Errors, LeaderAndIsr])(result:
Either[Errors, LeaderAndIsr]): Unit = {
+ assertEquals(expect, result)
+ }
+
+ // Correct ZK version
+ assertTrue(zkIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1,
List(1,2,3), 1),
+ expectMatch(Right(new LeaderAndIsr(1, 1, List(1,2,3), 2))), 0)))
+
+ // Wrong ZK version
+ assertTrue(zkIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1,
List(1,2,3), 3),
+ expectMatch(Left(Errors.INVALID_UPDATE_VERSION)), 0)))
+ }
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
index 7342be6..8cce64d 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
@@ -17,13 +17,14 @@
package kafka.server
-import java.util.Properties
+import kafka.api.ApiVersion
+import java.util.Properties
import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness
import org.apache.zookeeper.client.ZKClientConfig
import org.junit.Test
-import org.junit.Assert.assertEquals
+import org.junit.Assert.{assertEquals, fail}
import org.scalatest.Assertions.intercept
class KafkaServerTest extends ZooKeeperTestHarness {
@@ -102,6 +103,32 @@ class KafkaServerTest extends ZooKeeperTestHarness {
assertEquals(zkClientValueToExpect(kafkaProp),
zkClientConfig.get.getProperty(KafkaConfig.ZkSslConfigToSystemPropertyMap(kafkaProp))))
}
+ @Test
+ def testZkIsrManager(): Unit = {
+ val props = TestUtils.createBrokerConfigs(1, zkConnect).head
+ props.put(KafkaConfig.InterBrokerProtocolVersionProp, "2.7-IV1")
+
+ val server = TestUtils.createServer(KafkaConfig.fromProps(props))
+ server.replicaManager.alterIsrManager match {
+ case _: ZkIsrManager =>
+ case _ => fail("Should use ZK for ISR manager in versions before
2.7-IV2")
+ }
+ server.shutdown()
+ }
+
+ @Test
+ def testAlterIsrManager(): Unit = {
+ val props = TestUtils.createBrokerConfigs(1, zkConnect).head
+ props.put(KafkaConfig.InterBrokerProtocolVersionProp,
ApiVersion.latestVersion.toString)
+
+ val server = TestUtils.createServer(KafkaConfig.fromProps(props))
+ server.replicaManager.alterIsrManager match {
+ case _: DefaultAlterIsrManager =>
+ case _ => fail("Should use AlterIsr for ISR manager in versions after
2.7-IV2")
+ }
+ server.shutdown()
+ }
+
def createServer(nodeId: Int, hostName: String, port: Int): KafkaServer = {
val props = TestUtils.createBrokerConfig(nodeId, zkConnect)
props.put(KafkaConfig.AdvertisedListenersProp,
s"PLAINTEXT://$hostName:$port")
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 80fdc60..c2d7c7aa 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -64,6 +64,7 @@ class ReplicaManagerTest {
val topic = "test-topic"
val time = new MockTime
+ val scheduler = new MockScheduler(time)
val metrics = new Metrics
var kafkaZkClient: KafkaZkClient = _
var alterIsrManager: AlterIsrManager = _
@@ -1708,9 +1709,11 @@ class ReplicaManagerTest {
result
}
- private def setupReplicaManagerWithMockedPurgatories(timer: MockTimer,
aliveBrokerIds: Seq[Int] = Seq(0, 1)): ReplicaManager = {
+ private def setupReplicaManagerWithMockedPurgatories(timer: MockTimer,
aliveBrokerIds: Seq[Int] = Seq(0, 1),
+ propsModifier:
Properties => Unit = _ => {}): ReplicaManager = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
props.put("log.dirs", TestUtils.tempRelativeDir("data").getAbsolutePath +
"," + TestUtils.tempRelativeDir("data2").getAbsolutePath)
+ propsModifier.apply(props)
val config = KafkaConfig.fromProps(props)
val logProps = new Properties()
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new
File(_)), LogConfig(logProps))
@@ -1733,7 +1736,7 @@ class ReplicaManagerTest {
val mockDelayedElectLeaderPurgatory = new
DelayedOperationPurgatory[DelayedElectLeader](
purgatoryName = "DelayedElectLeader", timer, reaperEnabled = false)
- new ReplicaManager(config, metrics, time, kafkaZkClient, new
MockScheduler(time), mockLogMgr,
+ new ReplicaManager(config, metrics, time, kafkaZkClient, scheduler,
mockLogMgr,
new AtomicBoolean(false), quotaManager, new BrokerTopicStats,
metadataCache, new LogDirFailureChannel(config.logDirs.size),
mockProducePurgatory, mockFetchPurgatory,
mockDeleteRecordsPurgatory, mockDelayedElectLeaderPurgatory,
Option(this.getClass.getName), alterIsrManager)
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 5368c20..462e802 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -23,13 +23,12 @@ import java.nio.charset.{Charset, StandardCharsets}
import java.nio.file.{Files, StandardOpenOption}
import java.security.cert.X509Certificate
import java.time.Duration
-import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.{Arrays, Collections, Properties}
import java.util.concurrent.{Callable, ExecutionException, Executors, TimeUnit}
-
import javax.net.ssl.X509TrustManager
import kafka.api._
-import kafka.cluster.{Broker, EndPoint, IsrChangeListener}
+import kafka.cluster.{Broker, EndPoint, IsrChangeListener, TopicConfigFetcher}
import kafka.log._
import kafka.security.auth.{Acl, Resource, Authorizer => LegacyAuthorizer}
import kafka.server._
@@ -50,6 +49,7 @@ import org.apache.kafka.common.errors.{KafkaStorageException,
UnknownTopicOrPart
import org.apache.kafka.common.header.Header
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.network.{ListenerName, Mode}
+import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
import org.apache.kafka.common.record._
import org.apache.kafka.common.resource.ResourcePattern
@@ -1067,17 +1067,40 @@ object TestUtils extends Logging {
class MockAlterIsrManager extends AlterIsrManager {
val isrUpdates: mutable.Queue[AlterIsrItem] = new
mutable.Queue[AlterIsrItem]()
-
- override def enqueue(alterIsrItem: AlterIsrItem): Boolean = {
- isrUpdates += alterIsrItem
- true
+ val inFlight: AtomicBoolean = new AtomicBoolean(false)
+
+ override def submit(alterIsrItem: AlterIsrItem): Boolean = {
+ if (inFlight.compareAndSet(false, true)) {
+ isrUpdates += alterIsrItem
+ true
+ } else {
+ false
+ }
}
override def clearPending(topicPartition: TopicPartition): Unit = {
- isrUpdates.clear()
+ inFlight.set(false);
}
override def start(): Unit = { }
+
+ def completeIsrUpdate(newZkVersion: Int): Unit = {
+ if (inFlight.compareAndSet(true, false)) {
+ val item = isrUpdates.head
+
item.callback.apply(Right(item.leaderAndIsr.withZkVersion(newZkVersion)))
+ } else {
+ fail("Expected an in-flight ISR update, but there was none")
+ }
+ }
+
+ def failIsrUpdate(error: Errors): Unit = {
+ if (inFlight.compareAndSet(true, false)) {
+ val item = isrUpdates.dequeue()
+ item.callback.apply(Left(error))
+ } else {
+ fail("Expected an in-flight ISR update, but there was none")
+ }
+ }
}
def createAlterIsrManager(): MockAlterIsrManager = {
@@ -1106,6 +1129,14 @@ object TestUtils extends Logging {
new MockIsrChangeListener()
}
+ class MockTopicConfigFetcher(var props: Properties) extends
TopicConfigFetcher {
+ override def fetch(): Properties = props
+ }
+
+ def createTopicConfigProvider(props: Properties): MockTopicConfigFetcher = {
+ new MockTopicConfigFetcher(props)
+ }
+
def produceMessages(servers: Seq[KafkaServer],
records: Seq[ProducerRecord[Array[Byte], Array[Byte]]],
acks: Int = -1): Unit = {
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index a0a3387..26fb960 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -22,7 +22,6 @@ import kafka.cluster.BrokerEndPoint;
import kafka.cluster.DelayedOperations;
import kafka.cluster.IsrChangeListener;
import kafka.cluster.Partition;
-import kafka.cluster.PartitionStateStore;
import kafka.log.CleanerConfig;
import kafka.log.Defaults;
import kafka.log.LogAppendInfo;
@@ -153,14 +152,12 @@ public class ReplicaFetcherThreadBenchmark {
.setReplicas(replicas)
.setIsNew(true);
- PartitionStateStore partitionStateStore =
Mockito.mock(PartitionStateStore.class);
-
Mockito.when(partitionStateStore.fetchTopicConfig()).thenReturn(new
Properties());
IsrChangeListener isrChangeListener =
Mockito.mock(IsrChangeListener.class);
OffsetCheckpoints offsetCheckpoints =
Mockito.mock(OffsetCheckpoints.class);
Mockito.when(offsetCheckpoints.fetch(logDir.getAbsolutePath(),
tp)).thenReturn(Option.apply(0L));
AlterIsrManager isrChannelManager =
Mockito.mock(AlterIsrManager.class);
Partition partition = new Partition(tp, 100,
ApiVersion$.MODULE$.latestVersion(),
- 0, Time.SYSTEM, partitionStateStore, isrChangeListener,
new DelayedOperationsMock(tp),
+ 0, Time.SYSTEM, Properties::new, isrChangeListener, new
DelayedOperationsMock(tp),
Mockito.mock(MetadataCache.class), logManager,
isrChannelManager);
partition.makeFollower(partitionState, offsetCheckpoints);
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
index 4323a2c..9598390 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
@@ -21,7 +21,6 @@ import kafka.api.ApiVersion$;
import kafka.cluster.DelayedOperations;
import kafka.cluster.IsrChangeListener;
import kafka.cluster.Partition;
-import kafka.cluster.PartitionStateStore;
import kafka.log.CleanerConfig;
import kafka.log.Defaults;
import kafka.log.LogConfig;
@@ -116,14 +115,12 @@ public class PartitionMakeFollowerBenchmark {
TopicPartition tp = new TopicPartition("topic", 0);
- PartitionStateStore partitionStateStore =
Mockito.mock(PartitionStateStore.class);
- Mockito.when(partitionStateStore.fetchTopicConfig()).thenReturn(new
Properties());
Mockito.when(offsetCheckpoints.fetch(logDir.getAbsolutePath(),
tp)).thenReturn(Option.apply(0L));
IsrChangeListener isrChangeListener =
Mockito.mock(IsrChangeListener.class);
AlterIsrManager alterIsrManager = Mockito.mock(AlterIsrManager.class);
partition = new Partition(tp, 100,
ApiVersion$.MODULE$.latestVersion(), 0, Time.SYSTEM,
- partitionStateStore, isrChangeListener, delayedOperations,
+ Properties::new, isrChangeListener, delayedOperations,
Mockito.mock(MetadataCache.class), logManager, alterIsrManager);
partition.createLogIfNotExists(true, false, offsetCheckpoints);
executorService.submit((Runnable) () -> {
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
index 54e5f48..a58bd7d 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
@@ -21,7 +21,6 @@ import kafka.api.ApiVersion$;
import kafka.cluster.DelayedOperations;
import kafka.cluster.IsrChangeListener;
import kafka.cluster.Partition;
-import kafka.cluster.PartitionStateStore;
import kafka.log.CleanerConfig;
import kafka.log.Defaults;
import kafka.log.LogConfig;
@@ -115,13 +114,11 @@ public class UpdateFollowerFetchStateBenchmark {
.setZkVersion(1)
.setReplicas(replicas)
.setIsNew(true);
- PartitionStateStore partitionStateStore =
Mockito.mock(PartitionStateStore.class);
- Mockito.when(partitionStateStore.fetchTopicConfig()).thenReturn(new
Properties());
IsrChangeListener isrChangeListener =
Mockito.mock(IsrChangeListener.class);
AlterIsrManager alterIsrManager = Mockito.mock(AlterIsrManager.class);
partition = new Partition(topicPartition, 100,
ApiVersion$.MODULE$.latestVersion(), 0, Time.SYSTEM,
- partitionStateStore, isrChangeListener, delayedOperations,
+ Properties::new, isrChangeListener, delayedOperations,
Mockito.mock(MetadataCache.class), logManager,
alterIsrManager);
partition.makeLeader(partitionState, offsetCheckpoints);
}
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
index 7c9c6de..27afe59 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
@@ -18,7 +18,6 @@ package org.apache.kafka.jmh.server;
import java.util.Properties;
import kafka.cluster.Partition;
-import kafka.cluster.PartitionStateStore;
import kafka.log.CleanerConfig;
import kafka.log.LogConfig;
import kafka.log.LogManager;
@@ -39,7 +38,6 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
-import org.mockito.Mockito;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
@@ -150,8 +148,6 @@ public class CheckpointBench {
}
}
- PartitionStateStore partitionStateStore =
Mockito.mock(PartitionStateStore.class);
- Mockito.when(partitionStateStore.fetchTopicConfig()).thenReturn(new
Properties());
OffsetCheckpoints checkpoints = (logDir, topicPartition) ->
Option.apply(0L);
for (TopicPartition topicPartition : topicPartitions) {
final Partition partition =
this.replicaManager.createPartition(topicPartition);