This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.6 by this push:
new 254335d24ab KAFKA-15799 Handle full metadata updates on ZK brokers
(#14719)
254335d24ab is described below
commit 254335d24ab6b6d13142dcdb53fec3856c16de9e
Author: David Arthur <[email protected]>
AuthorDate: Thu Nov 16 17:38:44 2023 -0500
KAFKA-15799 Handle full metadata updates on ZK brokers (#14719)
This patch adds the concept of a "Full" UpdateMetadataRequest, similar to
what is used in
LeaderAndIsr. A new tagged field is added to UpdateMetadataRequest at
version 8 which allows the
KRaft controller to indicate if a UMR contains all the metadata or not.
Since UMR is implicitly
treated as incremental by the ZK broker, we needed a way to detect topic
deletions when the KRaft
broker sends a metadata snapshot to the ZK broker. By sending a "Full"
flag, the broker can now
compare existing topic IDs to incoming topic IDs and calculate which topics
should be removed from
the MetadataCache.
This patch only removes deleted topics from the MetadataCache.
Partition/log management was
implemented in KAFKA-15605.
Reviewers: Colin P. McCabe <[email protected]>
---
.../common/requests/AbstractControlRequest.java | 28 +++
.../kafka/common/requests/LeaderAndIsrRequest.java | 24 ---
.../common/requests/UpdateMetadataRequest.java | 18 +-
.../common/message/UpdateMetadataRequest.json | 3 +
.../controller/ControllerChannelManager.scala | 21 +-
.../kafka/migration/MigrationPropagator.scala | 5 +-
core/src/main/scala/kafka/server/KafkaServer.scala | 3 +-
.../main/scala/kafka/server/MetadataCache.scala | 5 +-
.../main/scala/kafka/server/ReplicaManager.scala | 2 +-
.../kafka/server/metadata/ZkMetadataCache.scala | 89 +++++++--
.../kafka/zk/ZkMigrationIntegrationTest.scala | 63 ++++--
.../unit/kafka/server/MetadataCacheTest.scala | 214 ++++++++++++++++++++-
.../unit/kafka/server/ReplicaManagerTest.scala | 4 +-
.../jmh/fetcher/ReplicaFetcherThreadBenchmark.java | 2 +-
.../jmh/metadata/MetadataRequestBenchmark.java | 2 +-
.../apache/kafka/jmh/server/CheckpointBench.java | 2 +-
.../kafka/jmh/server/PartitionCreationBench.java | 2 +-
17 files changed, 406 insertions(+), 81 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractControlRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractControlRequest.java
index 76342dd124a..6516de3f9ac 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractControlRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractControlRequest.java
@@ -21,6 +21,34 @@ import org.apache.kafka.common.protocol.ApiKeys;
// Abstract class for all control requests including UpdateMetadataRequest,
LeaderAndIsrRequest and StopReplicaRequest
public abstract class AbstractControlRequest extends AbstractRequest {
+ /**
+ * Indicates if a controller request is incremental, full, or unknown.
+ * Used by LeaderAndIsrRequest.Type and UpdateMetadataRequest.Type fields.
+ */
+ public enum Type {
+ UNKNOWN(0),
+ INCREMENTAL(1),
+ FULL(2);
+
+ private final byte type;
+ private Type(int type) {
+ this.type = (byte) type;
+ }
+
+ public byte toByte() {
+ return type;
+ }
+
+ public static Type fromByte(byte type) {
+ for (Type t : Type.values()) {
+ if (t.type == type) {
+ return t;
+ }
+ }
+ return UNKNOWN;
+ }
+ }
+
public static final long UNKNOWN_BROKER_EPOCH = -1L;
public static abstract class Builder<T extends AbstractRequest> extends
AbstractRequest.Builder<T> {
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
index dbd59b5b692..9a07a88a35d 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
@@ -42,30 +42,6 @@ import java.util.stream.Collectors;
public class LeaderAndIsrRequest extends AbstractControlRequest {
- public enum Type {
- UNKNOWN(0),
- INCREMENTAL(1),
- FULL(2);
-
- private final byte type;
- private Type(int type) {
- this.type = (byte) type;
- }
-
- public byte toByte() {
- return type;
- }
-
- public static Type fromByte(byte type) {
- for (Type t : Type.values()) {
- if (t.type == type) {
- return t;
- }
- }
- return UNKNOWN;
- }
- }
-
public static class Builder extends
AbstractControlRequest.Builder<LeaderAndIsrRequest> {
private final List<LeaderAndIsrPartitionState> partitionStates;
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
index c0fd3000cc5..ea9ae814198 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
@@ -47,21 +47,28 @@ public class UpdateMetadataRequest extends
AbstractControlRequest {
private final List<UpdateMetadataPartitionState> partitionStates;
private final List<UpdateMetadataBroker> liveBrokers;
private final Map<String, Uuid> topicIds;
+ private final Type updateType;
public Builder(short version, int controllerId, int controllerEpoch,
long brokerEpoch,
List<UpdateMetadataPartitionState> partitionStates,
List<UpdateMetadataBroker> liveBrokers,
Map<String, Uuid> topicIds) {
this(version, controllerId, controllerEpoch, brokerEpoch,
partitionStates,
- liveBrokers, topicIds, false);
+ liveBrokers, topicIds, false, Type.UNKNOWN);
}
public Builder(short version, int controllerId, int controllerEpoch,
long brokerEpoch,
List<UpdateMetadataPartitionState> partitionStates,
List<UpdateMetadataBroker> liveBrokers,
- Map<String, Uuid> topicIds, boolean kraftController) {
+ Map<String, Uuid> topicIds, boolean kraftController,
Type updateType) {
super(ApiKeys.UPDATE_METADATA, version, controllerId,
controllerEpoch, brokerEpoch, kraftController);
this.partitionStates = partitionStates;
this.liveBrokers = liveBrokers;
this.topicIds = topicIds;
+
+ if (version >= 8) {
+ this.updateType = updateType;
+ } else {
+ this.updateType = Type.UNKNOWN;
+ }
}
@Override
@@ -95,6 +102,7 @@ public class UpdateMetadataRequest extends
AbstractControlRequest {
if (version >= 8) {
data.setIsKRaftController(kraftController);
+ data.setType(updateType.toByte());
}
if (version >= 5) {
@@ -129,6 +137,8 @@ public class UpdateMetadataRequest extends
AbstractControlRequest {
bld.append("(type: UpdateMetadataRequest=").
append(", controllerId=").append(controllerId).
append(", controllerEpoch=").append(controllerEpoch).
+ append(", kraftController=").append(kraftController).
+ append(", type=").append(updateType).
append(", brokerEpoch=").append(brokerEpoch).
append(", partitionStates=").append(partitionStates).
append(", liveBrokers=").append(Utils.join(liveBrokers, ", ")).
@@ -196,6 +206,10 @@ public class UpdateMetadataRequest extends
AbstractControlRequest {
return data.isKRaftController();
}
+ public Type updateType() {
+ return Type.fromByte(data.type());
+ }
+
@Override
public int controllerEpoch() {
return data.controllerEpoch();
diff --git
a/clients/src/main/resources/common/message/UpdateMetadataRequest.json
b/clients/src/main/resources/common/message/UpdateMetadataRequest.json
index e876caa2bac..1b90dee6a7a 100644
--- a/clients/src/main/resources/common/message/UpdateMetadataRequest.json
+++ b/clients/src/main/resources/common/message/UpdateMetadataRequest.json
@@ -38,6 +38,9 @@
"about": "The controller id." },
{ "name": "isKRaftController", "type": "bool", "versions": "8+",
"default": "false",
"about": "If KRaft controller id is used during migration. See KIP-866"
},
+ { "name": "Type", "type": "int8", "versions": "8+",
+ "default": 0, "tag": 0, "taggedVersions": "8+",
+ "about": "Indicates if this request is a Full metadata snapshot (2),
Incremental (1), or Unknown (0). Using during ZK migration, see KIP-866"},
{ "name": "ControllerEpoch", "type": "int32", "versions": "0+",
"about": "The controller epoch." },
{ "name": "BrokerEpoch", "type": "int64", "versions": "5+", "ignorable":
true, "default": "-1",
diff --git
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 33a335dcc2f..18e211f62bd 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -377,7 +377,7 @@ abstract class AbstractControllerBrokerRequestBatch(config:
KafkaConfig,
val stopReplicaRequestMap = mutable.Map.empty[Int,
mutable.Map[TopicPartition, StopReplicaPartitionState]]
val updateMetadataRequestBrokerSet = mutable.Set.empty[Int]
val updateMetadataRequestPartitionInfoMap =
mutable.Map.empty[TopicPartition, UpdateMetadataPartitionState]
- private var updateType: LeaderAndIsrRequest.Type =
LeaderAndIsrRequest.Type.UNKNOWN
+ private var updateType: AbstractControlRequest.Type =
AbstractControlRequest.Type.UNKNOWN
private var metadataInstance: ControllerChannelContext = _
def sendRequest(brokerId: Int,
@@ -399,7 +399,7 @@ abstract class AbstractControllerBrokerRequestBatch(config:
KafkaConfig,
metadataInstance = metadataProvider()
}
- def setUpdateType(updateType: LeaderAndIsrRequest.Type): Unit = {
+ def setUpdateType(updateType: AbstractControlRequest.Type): Unit = {
this.updateType = updateType
}
@@ -409,7 +409,7 @@ abstract class AbstractControllerBrokerRequestBatch(config:
KafkaConfig,
updateMetadataRequestBrokerSet.clear()
updateMetadataRequestPartitionInfoMap.clear()
metadataInstance = null
- updateType = LeaderAndIsrRequest.Type.UNKNOWN
+ updateType = AbstractControlRequest.Type.UNKNOWN
}
def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int],
@@ -567,7 +567,6 @@ abstract class AbstractControllerBrokerRequestBatch(config:
KafkaConfig,
}
}
leaderAndIsrRequestMap.clear()
- updateType = LeaderAndIsrRequest.Type.UNKNOWN
}
def handleLeaderAndIsrResponse(response: LeaderAndIsrResponse, broker: Int):
Unit
@@ -621,8 +620,17 @@ abstract class
AbstractControllerBrokerRequestBatch(config: KafkaConfig,
.distinct
.filter(metadataInstance.topicIds.contains)
.map(topic => (topic, metadataInstance.topicIds(topic))).toMap
- val updateMetadataRequestBuilder = new
UpdateMetadataRequest.Builder(updateMetadataRequestVersion,
- controllerId, controllerEpoch, brokerEpoch, partitionStates.asJava,
liveBrokers.asJava, topicIds.asJava, kraftController)
+ val updateMetadataRequestBuilder = new UpdateMetadataRequest.Builder(
+ updateMetadataRequestVersion,
+ controllerId,
+ controllerEpoch,
+ brokerEpoch,
+ partitionStates.asJava,
+ liveBrokers.asJava,
+ topicIds.asJava,
+ kraftController,
+ updateType
+ )
sendRequest(broker, updateMetadataRequestBuilder, (r: AbstractResponse)
=> {
val updateMetadataResponse = r.asInstanceOf[UpdateMetadataResponse]
handleUpdateMetadataResponse(updateMetadataResponse, broker)
@@ -736,6 +744,7 @@ abstract class AbstractControllerBrokerRequestBatch(config:
KafkaConfig,
sendLeaderAndIsrRequest(controllerEpoch, stateChangeLog)
sendUpdateMetadataRequests(controllerEpoch, stateChangeLog)
sendStopReplicaRequests(controllerEpoch, stateChangeLog)
+ this.updateType = AbstractControlRequest.Type.UNKNOWN
} catch {
case e: Throwable =>
if (leaderAndIsrRequestMap.nonEmpty) {
diff --git a/core/src/main/scala/kafka/migration/MigrationPropagator.scala
b/core/src/main/scala/kafka/migration/MigrationPropagator.scala
index 1a18ca42fcb..2a02f5891ec 100644
--- a/core/src/main/scala/kafka/migration/MigrationPropagator.scala
+++ b/core/src/main/scala/kafka/migration/MigrationPropagator.scala
@@ -22,7 +22,7 @@ import kafka.controller.{ControllerChannelContext,
ControllerChannelManager, Rep
import kafka.server.KafkaConfig
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.requests.LeaderAndIsrRequest
+import org.apache.kafka.common.requests.AbstractControlRequest
import org.apache.kafka.common.utils.Time
import org.apache.kafka.image.{ClusterImage, MetadataDelta, MetadataImage,
TopicsImage}
import org.apache.kafka.metadata.PartitionRegistration
@@ -138,6 +138,7 @@ class MigrationPropagator(
}
requestBatch.sendRequestsToBrokers(zkControllerEpoch)
requestBatch.newBatch()
+ requestBatch.setUpdateType(AbstractControlRequest.Type.INCREMENTAL)
// Now send LISR, UMR and StopReplica requests for both new zk brokers and
existing zk
// brokers based on the topic changes.
@@ -226,7 +227,7 @@ class MigrationPropagator(
requestBatch.sendRequestsToBrokers(zkControllerEpoch)
requestBatch.newBatch()
- requestBatch.setUpdateType(LeaderAndIsrRequest.Type.FULL)
+ requestBatch.setUpdateType(AbstractControlRequest.Type.FULL)
// When we need to send RPCs from the image, we're sending 'full' requests
meaning we let
// every broker know about all the metadata and all the LISR requests it
needs to handle.
// Note that we cannot send StopReplica requests from the image. We don't
have any state
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 091391e10ca..8e8cec9514c 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -293,7 +293,8 @@ class KafkaServer(
config.brokerId,
config.interBrokerProtocolVersion,
brokerFeatures,
- kraftControllerNodes)
+ kraftControllerNodes,
+ config.migrationEnabled)
val controllerNodeProvider = new
MetadataCacheControllerNodeProvider(metadataCache, config)
/* initialize feature change listener */
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala
b/core/src/main/scala/kafka/server/MetadataCache.scala
index 2e60ce2ba7b..ba640286a2c 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -115,9 +115,10 @@ object MetadataCache {
def zkMetadataCache(brokerId: Int,
metadataVersion: MetadataVersion,
brokerFeatures: BrokerFeatures =
BrokerFeatures.createEmpty(),
- kraftControllerNodes: collection.Seq[Node] =
collection.Seq.empty[Node])
+ kraftControllerNodes: collection.Seq[Node] =
collection.Seq.empty[Node],
+ zkMigrationEnabled: Boolean = false)
: ZkMetadataCache = {
- new ZkMetadataCache(brokerId, metadataVersion, brokerFeatures,
kraftControllerNodes)
+ new ZkMetadataCache(brokerId, metadataVersion, brokerFeatures,
kraftControllerNodes, zkMigrationEnabled)
}
def kRaftMetadataCache(brokerId: Int): KRaftMetadataCache = {
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 8c47afd5a77..469321e14d5 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1902,7 +1902,7 @@ class ReplicaManager(val config: KafkaConfig,
if (
config.migrationEnabled &&
leaderAndIsrRequest.isKRaftController &&
- leaderAndIsrRequest.requestType() == LeaderAndIsrRequest.Type.FULL
+ leaderAndIsrRequest.requestType() ==
AbstractControlRequest.Type.FULL
) {
updateStrayLogs(findStrayPartitionsFromLeaderAndIsr(allTopicPartitionsInRequest))
}
diff --git a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
index 302d3fbf8de..84ef973b8a6 100755
--- a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
@@ -31,13 +31,14 @@ import kafka.utils.Logging
import kafka.utils.Implicits._
import org.apache.kafka.admin.BrokerMetadata
import org.apache.kafka.common.internals.Topic
-import
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState
+import
org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataPartitionState,
UpdateMetadataTopicState}
import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition,
Uuid}
import
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
import
org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition
+import org.apache.kafka.common.message.UpdateMetadataRequestData
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{ApiVersionsResponse,
MetadataResponse, UpdateMetadataRequest}
+import org.apache.kafka.common.requests.{AbstractControlRequest,
ApiVersionsResponse, MetadataResponse, UpdateMetadataRequest}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.server.common.{Features, MetadataVersion}
@@ -55,6 +56,60 @@ trait ZkFinalizedFeatureCache {
def getFeatureOption: Option[Features]
}
+case class MetadataSnapshot(partitionStates: mutable.AnyRefMap[String,
mutable.LongMap[UpdateMetadataPartitionState]],
+ topicIds: Map[String, Uuid],
+ controllerId: Option[CachedControllerId],
+ aliveBrokers: mutable.LongMap[Broker],
+ aliveNodes:
mutable.LongMap[collection.Map[ListenerName, Node]]) {
+ val topicNames: Map[Uuid, String] = topicIds.map { case (topicName, topicId)
=> (topicId, topicName) }
+}
+
+object ZkMetadataCache {
+ /**
+ * Create topic deletions (leader=-2) for topics that are missing in a FULL
UpdateMetadataRequest coming from a
+ * KRaft controller during a ZK migration. This will modify the
UpdateMetadataRequest object passed into this method.
+ */
+ def maybeInjectDeletedPartitionsFromFullMetadataRequest(
+ currentMetadata: MetadataSnapshot,
+ requestControllerEpoch: Int,
+ requestTopicStates: util.List[UpdateMetadataTopicState],
+ ): Seq[Uuid] = {
+ val prevTopicIds = currentMetadata.topicIds.values.toSet
+ val requestTopics = requestTopicStates.asScala.map { topicState =>
+ topicState.topicName() -> topicState.topicId()
+ }.toMap
+
+ val deleteTopics = prevTopicIds -- requestTopics.values.toSet
+ if (deleteTopics.isEmpty) {
+ return Seq.empty
+ }
+
+ deleteTopics.foreach { deletedTopicId =>
+ val topicName = currentMetadata.topicNames(deletedTopicId)
+ val topicState = new UpdateMetadataRequestData.UpdateMetadataTopicState()
+ .setTopicId(deletedTopicId)
+ .setTopicName(topicName)
+ .setPartitionStates(new util.ArrayList())
+
+ currentMetadata.partitionStates(topicName).foreach { case (partitionId,
partitionState) =>
+ val lisr =
LeaderAndIsr.duringDelete(partitionState.isr().asScala.map(_.intValue()).toList)
+ val newPartitionState = new UpdateMetadataPartitionState()
+ .setPartitionIndex(partitionId.toInt)
+ .setTopicName(topicName)
+ .setLeader(lisr.leader)
+ .setLeaderEpoch(lisr.leaderEpoch)
+ .setControllerEpoch(requestControllerEpoch)
+ .setReplicas(partitionState.replicas())
+ .setZkVersion(lisr.partitionEpoch)
+ .setIsr(lisr.isr.map(Integer.valueOf).asJava)
+ topicState.partitionStates().add(newPartitionState)
+ }
+ requestTopicStates.add(topicState)
+ }
+ deleteTopics.toSeq
+ }
+}
+
/**
* A cache for the state (e.g., current leader) of each partition. This cache
is updated through
* UpdateMetadataRequest from the controller. Every broker maintains the same
cache, asynchronously.
@@ -63,7 +118,8 @@ class ZkMetadataCache(
brokerId: Int,
metadataVersion: MetadataVersion,
brokerFeatures: BrokerFeatures,
- kraftControllerNodes: Seq[Node] = Seq.empty)
+ kraftControllerNodes: Seq[Node] = Seq.empty,
+ zkMigrationEnabled: Boolean = false)
extends MetadataCache with ZkFinalizedFeatureCache with Logging {
private val partitionMetadataLock = new ReentrantReadWriteLock()
@@ -376,6 +432,25 @@ class ZkMetadataCache(
// This method returns the deleted TopicPartitions received from
UpdateMetadataRequest
def updateMetadata(correlationId: Int, updateMetadataRequest:
UpdateMetadataRequest): Seq[TopicPartition] = {
inWriteLock(partitionMetadataLock) {
+ if (
+ updateMetadataRequest.isKRaftController &&
+ updateMetadataRequest.updateType() == AbstractControlRequest.Type.FULL
+ ) {
+ if (!zkMigrationEnabled) {
+ stateChangeLogger.error(s"Received UpdateMetadataRequest with
Type=FULL (2), but ZK migrations " +
+ s"are not enabled on this broker. Not treating this as a full
metadata update")
+ } else {
+ val deletedTopicIds =
ZkMetadataCache.maybeInjectDeletedPartitionsFromFullMetadataRequest(
+ metadataSnapshot, updateMetadataRequest.controllerEpoch(),
updateMetadataRequest.topicStates())
+ if (deletedTopicIds.isEmpty) {
+ stateChangeLogger.trace(s"Received UpdateMetadataRequest with
Type=FULL (2), " +
+ s"but no deleted topics were detected.")
+ } else {
+ stateChangeLogger.debug(s"Received UpdateMetadataRequest with
Type=FULL (2), " +
+ s"found ${deletedTopicIds.size} deleted topic ID(s):
$deletedTopicIds.")
+ }
+ }
+ }
val aliveBrokers = new
mutable.LongMap[Broker](metadataSnapshot.aliveBrokers.size)
val aliveNodes = new mutable.LongMap[collection.Map[ListenerName,
Node]](metadataSnapshot.aliveNodes.size)
@@ -477,14 +552,6 @@ class ZkMetadataCache(
}
}
- case class MetadataSnapshot(partitionStates: mutable.AnyRefMap[String,
mutable.LongMap[UpdateMetadataPartitionState]],
- topicIds: Map[String, Uuid],
- controllerId: Option[CachedControllerId],
- aliveBrokers: mutable.LongMap[Broker],
- aliveNodes:
mutable.LongMap[collection.Map[ListenerName, Node]]) {
- val topicNames: Map[Uuid, String] = topicIds.map { case (topicName,
topicId) => (topicId, topicName) }
- }
-
override def metadataVersion(): MetadataVersion = metadataVersion
override def features(): Features = _features match {
diff --git
a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
index 4e3fc4ed5f8..ec28484172a 100644
--- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
@@ -31,7 +31,7 @@ import org.apache.kafka.common.acl.AclOperation.{DESCRIBE,
READ, WRITE}
import org.apache.kafka.common.acl.AclPermissionType.ALLOW
import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding}
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
-import org.apache.kafka.common.errors.TimeoutException
+import org.apache.kafka.common.errors.{TimeoutException,
UnknownTopicOrPartitionException}
import org.apache.kafka.common.message.AllocateProducerIdsRequestData
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
import org.apache.kafka.common.requests.{AllocateProducerIdsRequest,
AllocateProducerIdsResponse}
@@ -46,13 +46,13 @@ import org.apache.kafka.metadata.authorizer.StandardAcl
import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion,
ProducerIdsBlock}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse,
assertNotEquals, assertNotNull, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse,
assertNotEquals, assertNotNull, assertTrue, fail}
import org.junit.jupiter.api.{Assumptions, Timeout}
import org.junit.jupiter.api.extension.ExtendWith
import org.slf4j.LoggerFactory
import java.util
-import java.util.concurrent.{CompletableFuture, TimeUnit}
+import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit}
import java.util.{Properties, UUID}
import scala.collection.Seq
import scala.jdk.CollectionConverters._
@@ -275,8 +275,6 @@ class ZkMigrationIntegrationTest {
newTopics.add(new NewTopic("test-topic-1", 10, 3.toShort))
newTopics.add(new NewTopic("test-topic-2", 10, 3.toShort))
newTopics.add(new NewTopic("test-topic-3", 10, 3.toShort))
- newTopics.add(new NewTopic("test-topic-4", 10, 3.toShort))
- newTopics.add(new NewTopic("test-topic-5", 10, 3.toShort))
val createTopicResult = admin.createTopics(newTopics)
createTopicResult.all().get(300, TimeUnit.SECONDS)
admin.close()
@@ -298,11 +296,6 @@ class ZkMigrationIntegrationTest {
kraftCluster.startup()
val readyFuture =
kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3)
- // Start a deletion that will take some time, but don't wait for it
- admin = zkCluster.createAdminClient()
- admin.deleteTopics(Seq("test-topic-1", "test-topic-2", "test-topic-3",
"test-topic-4", "test-topic-5").asJava)
- admin.close()
-
// Enable migration configs and restart brokers
log.info("Restart brokers in migration mode")
val clientProps = kraftCluster.controllerClientProperties()
@@ -313,12 +306,17 @@ class ZkMigrationIntegrationTest {
zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp,
"CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
zkCluster.rollingBrokerRestart()
+ // Emulate a ZK topic deletion
+ zkClient.createDeleteTopicPath("test-topic-1")
+ zkClient.createDeleteTopicPath("test-topic-2")
+ zkClient.createDeleteTopicPath("test-topic-3")
+
zkCluster.waitForReadyBrokers()
readyFuture.get(60, TimeUnit.SECONDS)
// Only continue with the test if there are some pending deletions to
verify. If there are not any pending
// deletions, this will mark the test as "skipped" instead of failed.
- val topicDeletions =
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkClient.getTopicDeletions
+ val topicDeletions = zkClient.getTopicDeletions
Assumptions.assumeTrue(topicDeletions.nonEmpty,
"This test needs pending topic deletions after a migration in order to
verify the behavior")
@@ -331,11 +329,21 @@ class ZkMigrationIntegrationTest {
// At this point, some of the topics may have been deleted by ZK
controller and the rest will be
// implicitly deleted by the KRaft controller and remove from the ZK
brokers as stray partitions
+ def topicsAllDeleted(admin: Admin): Boolean = {
+ val topics = admin.listTopics().names().get(60, TimeUnit.SECONDS)
+ topics.retainAll(util.Arrays.asList(
+ "test-topic-1", "test-topic-2", "test-topic-3"
+ ))
+ topics.isEmpty
+ }
+
admin = zkCluster.createAdminClient()
+ log.info("Waiting for topics to be deleted")
TestUtils.waitUntilTrue(
- () => admin.listTopics().names().get(60, TimeUnit.SECONDS).isEmpty,
+ () => topicsAllDeleted(admin),
"Timed out waiting for topics to be deleted",
- 300000)
+ 30000,
+ 1000)
val newTopics = new util.ArrayList[NewTopic]()
newTopics.add(new NewTopic("test-topic-1", 2, 3.toShort))
@@ -344,18 +352,33 @@ class ZkMigrationIntegrationTest {
val createTopicResult = admin.createTopics(newTopics)
createTopicResult.all().get(60, TimeUnit.SECONDS)
- val expectedNewTopics = Seq("test-topic-1", "test-topic-2",
"test-topic-3")
+ def topicsAllRecreated(admin: Admin): Boolean = {
+ val topics = admin.listTopics().names().get(60, TimeUnit.SECONDS)
+ topics.retainAll(util.Arrays.asList(
+ "test-topic-1", "test-topic-2", "test-topic-3"
+ ))
+ topics.size() == 3
+ }
+
+ log.info("Waiting for topics to be re-created")
TestUtils.waitUntilTrue(
- () => admin.listTopics().names().get(60,
TimeUnit.SECONDS).equals(expectedNewTopics.toSet.asJava),
+ () => topicsAllRecreated(admin),
"Timed out waiting for topics to be created",
- 300000)
+ 30000,
+ 1000)
TestUtils.retry(300000) {
// Need a retry here since topic metadata may be inconsistent between
brokers
- val topicDescriptions =
admin.describeTopics(expectedNewTopics.asJavaCollection)
- .topicNameValues().asScala.map { case (name, description) =>
+ val topicDescriptions = try {
+ admin.describeTopics(util.Arrays.asList(
+ "test-topic-1", "test-topic-2", "test-topic-3"
+ )).topicNameValues().asScala.map { case (name, description) =>
name -> description.get(60, TimeUnit.SECONDS)
- }.toMap
+ }.toMap
+ } catch {
+ case e: ExecutionException if
e.getCause.isInstanceOf[UnknownTopicOrPartitionException] => Map.empty[String,
TopicDescription]
+ case t: Throwable => fail("Error describing topics", t.getCause)
+ }
assertEquals(2, topicDescriptions("test-topic-1").partitions().size())
assertEquals(1, topicDescriptions("test-topic-2").partitions().size())
@@ -371,8 +394,6 @@ class ZkMigrationIntegrationTest {
assertTrue(absentTopics.contains("test-topic-1"))
assertTrue(absentTopics.contains("test-topic-2"))
assertTrue(absentTopics.contains("test-topic-3"))
- assertFalse(absentTopics.contains("test-topic-4"))
- assertFalse(absentTopics.contains("test-topic-5"))
}
admin.close()
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index ebcd063bc12..8a8a3f4d38f 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -21,20 +21,18 @@ import org.apache.kafka.common.{Node, TopicPartition, Uuid}
import java.util
import java.util.Arrays.asList
import java.util.Collections
-
import kafka.api.LeaderAndIsr
-import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
+import kafka.server.metadata.{KRaftMetadataCache, MetadataSnapshot,
ZkMetadataCache}
import
org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker,
UpdateMetadataEndpoint, UpdateMetadataPartitionState, UpdateMetadataTopicState}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
import org.apache.kafka.common.record.RecordBatch
-import org.apache.kafka.common.requests.UpdateMetadataRequest
+import org.apache.kafka.common.requests.{AbstractControlRequest,
UpdateMetadataRequest}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.metadata.{BrokerRegistrationChangeRecord,
PartitionRecord, RegisterBrokerRecord, RemoveTopicRecord, TopicRecord}
import org.apache.kafka.common.metadata.RegisterBrokerRecord.{BrokerEndpoint,
BrokerEndpointCollection}
import org.apache.kafka.image.{ClusterImage, MetadataDelta, MetadataImage,
MetadataProvenance}
import org.apache.kafka.server.common.MetadataVersion
-
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
@@ -782,7 +780,7 @@ class MetadataCacheTest {
.setSecurityProtocol(securityProtocol.id)
.setListener(listenerName.value)).asJava))
val updateMetadataRequest = new UpdateMetadataRequest.Builder(version,
controllerId, controllerEpoch, brokerEpoch,
- partitionStates.asJava, brokers.asJava, util.Collections.emptyMap(),
false).build()
+ partitionStates.asJava, brokers.asJava, util.Collections.emptyMap(),
false, AbstractControlRequest.Type.UNKNOWN).build()
MetadataCacheTest.updateCache(cache, updateMetadataRequest)
val partitionState = cache.getPartitionInfo(topic, partitionIndex).get
@@ -802,4 +800,210 @@ class MetadataCacheTest {
assertEquals(offlineReplicas, partitionState.offlineReplicas())
}
}
+
+ def setupInitialAndFullMetadata(): (
+ Map[String, Uuid], mutable.AnyRefMap[String,
mutable.LongMap[UpdateMetadataPartitionState]],
+ Map[String, Uuid], Seq[UpdateMetadataPartitionState]
+ ) = {
+ def addTopic(
+ name: String,
+ partitions: Int,
+ topicStates: mutable.AnyRefMap[String,
mutable.LongMap[UpdateMetadataPartitionState]]
+ ): Unit = {
+ val partitionMap = mutable.LongMap.empty[UpdateMetadataPartitionState]
+ for (i <- 0 until partitions) {
+ partitionMap.put(i, new UpdateMetadataPartitionState()
+ .setTopicName(name)
+ .setPartitionIndex(i)
+ .setControllerEpoch(2)
+ .setLeader(0)
+ .setLeaderEpoch(10)
+ .setIsr(asList(0, 1))
+ .setZkVersion(10)
+ .setReplicas(asList(0, 1, 2)))
+ }
+ topicStates.put(name, partitionMap)
+ }
+
+ val initialTopicStates = mutable.AnyRefMap.empty[String,
mutable.LongMap[UpdateMetadataPartitionState]]
+ addTopic("test-topic-1", 3, initialTopicStates)
+ addTopic("test-topic-2", 3, initialTopicStates)
+
+ val initialTopicIds = Map(
+ "test-topic-1" -> Uuid.fromString("IQ2F1tpCRoSbjfq4zBJwpg"),
+ "test-topic-2" -> Uuid.fromString("4N8_J-q7SdWHPFkos275pQ")
+ )
+
+ val newTopicIds = Map(
+ "different-topic" -> Uuid.fromString("DraFMNOJQOC5maTb1vtZ8Q")
+ )
+
+ val newPartitionStates = Seq(new UpdateMetadataPartitionState()
+ .setTopicName("different-topic")
+ .setPartitionIndex(0)
+ .setControllerEpoch(42)
+ .setLeader(0)
+ .setLeaderEpoch(10)
+ .setIsr(asList[Integer](0, 1, 2))
+ .setZkVersion(1)
+ .setReplicas(asList[Integer](0, 1, 2)))
+
+ (initialTopicIds, initialTopicStates, newTopicIds, newPartitionStates)
+ }
+
+ /**
+ * Verify that
ZkMetadataCache#maybeInjectDeletedPartitionsFromFullMetadataRequest correctly
+ * generates deleted topic partition state when deleted topics are detected.
This does not check
+ * any of the logic about when this method should be called, only that it
does the correct thing
+ * when called.
+ */
+ @Test
+ def testMaybeInjectDeletedPartitionsFromFullMetadataRequest(): Unit = {
+ val (initialTopicIds, initialTopicStates, newTopicIds, _) =
setupInitialAndFullMetadata()
+
+ val initialSnapshot = MetadataSnapshot(
+ partitionStates = initialTopicStates,
+ topicIds = initialTopicIds,
+ controllerId = Some(KRaftCachedControllerId(3000)),
+ aliveBrokers = mutable.LongMap.empty,
+ aliveNodes = mutable.LongMap.empty)
+
+ def verifyTopicStates(
+ updateMetadataRequest: UpdateMetadataRequest
+ )(
+ verifier: mutable.AnyRefMap[String,
mutable.LongMap[UpdateMetadataPartitionState]] => Unit
+ ): Unit = {
+ val finalTopicStates = mutable.AnyRefMap.empty[String,
mutable.LongMap[UpdateMetadataPartitionState]]
+ updateMetadataRequest.topicStates().forEach { topicState =>
+ finalTopicStates.put(topicState.topicName(),
mutable.LongMap.empty[UpdateMetadataPartitionState])
+ topicState.partitionStates().forEach { partitionState =>
+
finalTopicStates(topicState.topicName()).put(partitionState.partitionIndex(),
partitionState)
+ }
+ }
+ verifier.apply(finalTopicStates)
+ }
+
+ // Empty UMR, deletes everything
+ var updateMetadataRequest = new UpdateMetadataRequest.Builder(8, 1, 42,
brokerEpoch,
+ Seq.empty.asJava, Seq.empty.asJava, Map.empty[String, Uuid].asJava,
true, AbstractControlRequest.Type.FULL).build()
+ assertEquals(
+ Seq(Uuid.fromString("IQ2F1tpCRoSbjfq4zBJwpg"),
Uuid.fromString("4N8_J-q7SdWHPFkos275pQ")),
+ ZkMetadataCache.maybeInjectDeletedPartitionsFromFullMetadataRequest(
+ initialSnapshot, 42, updateMetadataRequest.topicStates())
+ )
+ verifyTopicStates(updateMetadataRequest) { topicStates =>
+ assertEquals(2, topicStates.size)
+ assertEquals(3,
topicStates("test-topic-1").values.toSeq.count(_.leader() == -2))
+ assertEquals(3,
topicStates("test-topic-2").values.toSeq.count(_.leader() == -2))
+ }
+
+ // One different topic, should remove other two
+ val oneTopicPartitionState = Seq(new UpdateMetadataPartitionState()
+ .setTopicName("different-topic")
+ .setPartitionIndex(0)
+ .setControllerEpoch(42)
+ .setLeader(0)
+ .setLeaderEpoch(10)
+ .setIsr(asList[Integer](0, 1, 2))
+ .setZkVersion(1)
+ .setReplicas(asList[Integer](0, 1, 2)))
+ updateMetadataRequest = new UpdateMetadataRequest.Builder(8, 1, 42,
brokerEpoch,
+ oneTopicPartitionState.asJava, Seq.empty.asJava, newTopicIds.asJava,
true, AbstractControlRequest.Type.FULL).build()
+ assertEquals(
+ Seq(Uuid.fromString("IQ2F1tpCRoSbjfq4zBJwpg"),
Uuid.fromString("4N8_J-q7SdWHPFkos275pQ")),
+ ZkMetadataCache.maybeInjectDeletedPartitionsFromFullMetadataRequest(
+ initialSnapshot, 42, updateMetadataRequest.topicStates())
+ )
+ verifyTopicStates(updateMetadataRequest) { topicStates =>
+ assertEquals(3, topicStates.size)
+ assertEquals(3,
topicStates("test-topic-1").values.toSeq.count(_.leader() == -2))
+ assertEquals(3,
topicStates("test-topic-2").values.toSeq.count(_.leader() == -2))
+ }
+
+ // Existing two plus one new topic, nothing gets deleted, all topics
should be present
+ val allTopicStates = initialTopicStates.flatMap(_._2.values).toSeq ++
oneTopicPartitionState
+ val allTopicIds = initialTopicIds ++ newTopicIds
+ updateMetadataRequest = new UpdateMetadataRequest.Builder(8, 1, 42,
brokerEpoch,
+ allTopicStates.asJava, Seq.empty.asJava, allTopicIds.asJava, true,
AbstractControlRequest.Type.FULL).build()
+ assertEquals(
+ Seq.empty,
+ ZkMetadataCache.maybeInjectDeletedPartitionsFromFullMetadataRequest(
+ initialSnapshot, 42, updateMetadataRequest.topicStates())
+ )
+ verifyTopicStates(updateMetadataRequest) { topicStates =>
+ assertEquals(3, topicStates.size)
+ // Ensure these two weren't deleted (leader = -2)
+ assertEquals(0,
topicStates("test-topic-1").values.toSeq.count(_.leader() == -2))
+ assertEquals(0,
topicStates("test-topic-2").values.toSeq.count(_.leader() == -2))
+ }
+ }
+
+ /**
+ * Verify the behavior of ZkMetadataCache when handling "Full"
UpdateMetadataRequest
+ */
+ @Test
+ def testHandleFullUpdateMetadataRequestInZkMigration(): Unit = {
+ val (initialTopicIds, initialTopicStates, newTopicIds, newPartitionStates)
= setupInitialAndFullMetadata()
+
+ val updateMetadataRequestBuilder = () => new
UpdateMetadataRequest.Builder(8, 1, 42, brokerEpoch,
+ newPartitionStates.asJava, Seq.empty.asJava, newTopicIds.asJava, true,
AbstractControlRequest.Type.FULL).build()
+
+ def verifyMetadataCache(
+ updateMetadataRequest: UpdateMetadataRequest,
+ zkMigrationEnabled: Boolean = true
+ )(
+ verifier: ZkMetadataCache => Unit
+ ): Unit = {
+ val cache = MetadataCache.zkMetadataCache(1, MetadataVersion.latest(),
zkMigrationEnabled = zkMigrationEnabled)
+ cache.updateMetadata(1, new UpdateMetadataRequest.Builder(8, 1, 42,
brokerEpoch,
+ initialTopicStates.flatMap(_._2.values).toList.asJava,
Seq.empty.asJava, initialTopicIds.asJava).build())
+ cache.updateMetadata(1, updateMetadataRequest)
+ verifier.apply(cache)
+ }
+
+ // KRaft=false Type=FULL, migration disabled
+ var updateMetadataRequest = updateMetadataRequestBuilder.apply()
+ updateMetadataRequest.data().setIsKRaftController(true)
+
updateMetadataRequest.data().setType(AbstractControlRequest.Type.FULL.toByte)
+ verifyMetadataCache(updateMetadataRequest, zkMigrationEnabled = false) {
cache =>
+ assertEquals(3, cache.getAllTopics().size)
+ assertTrue(cache.contains("test-topic-1"))
+ assertTrue(cache.contains("test-topic-1"))
+ }
+
+ // KRaft=true Type=FULL
+ updateMetadataRequest = updateMetadataRequestBuilder.apply()
+ verifyMetadataCache(updateMetadataRequest) { cache =>
+ assertEquals(1, cache.getAllTopics().size)
+ assertFalse(cache.contains("test-topic-1"))
+ assertFalse(cache.contains("test-topic-1"))
+ }
+
+ // KRaft=false Type=FULL
+ updateMetadataRequest = updateMetadataRequestBuilder.apply()
+ updateMetadataRequest.data().setIsKRaftController(false)
+ verifyMetadataCache(updateMetadataRequest) { cache =>
+ assertEquals(3, cache.getAllTopics().size)
+ assertTrue(cache.contains("test-topic-1"))
+ assertTrue(cache.contains("test-topic-1"))
+ }
+
+ // KRaft=true Type=INCREMENTAL
+ updateMetadataRequest = updateMetadataRequestBuilder.apply()
+
updateMetadataRequest.data().setType(AbstractControlRequest.Type.INCREMENTAL.toByte)
+ verifyMetadataCache(updateMetadataRequest) { cache =>
+ assertEquals(3, cache.getAllTopics().size)
+ assertTrue(cache.contains("test-topic-1"))
+ assertTrue(cache.contains("test-topic-1"))
+ }
+
+ // KRaft=true Type=UNKNOWN
+ updateMetadataRequest = updateMetadataRequestBuilder.apply()
+
updateMetadataRequest.data().setType(AbstractControlRequest.Type.UNKNOWN.toByte)
+ verifyMetadataCache(updateMetadataRequest) { cache =>
+ assertEquals(3, cache.getAllTopics().size)
+ assertTrue(cache.contains("test-topic-1"))
+ assertTrue(cache.contains("test-topic-1"))
+ }
+ }
}
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 6363a0ac790..7c953b5dbec 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -567,7 +567,7 @@ class ReplicaManagerTest {
Collections.singletonMap(topic, Uuid.randomUuid()),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava,
false,
- LeaderAndIsrRequest.Type.UNKNOWN
+ AbstractControlRequest.Type.UNKNOWN
).build()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _)
=> ())
replicaManager.getPartitionOrException(new TopicPartition(topic,
partition))
@@ -2696,7 +2696,7 @@ class ReplicaManagerTest {
topicIds.asJava,
Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava,
true,
- LeaderAndIsrRequest.Type.FULL
+ AbstractControlRequest.Type.FULL
).build()
replicaManager.becomeLeaderOrFollower(0, lisr, (_, _) => ())
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index 511f51064e1..1248ced89ec 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -218,7 +218,7 @@ public class ReplicaFetcherThreadBenchmark {
// TODO: fix to support raft
ZkMetadataCache metadataCache = MetadataCache.zkMetadataCache(0,
- config.interBrokerProtocolVersion(), BrokerFeatures.createEmpty(),
null);
+ config.interBrokerProtocolVersion(), BrokerFeatures.createEmpty(),
null, false);
metadataCache.updateMetadata(0, updateMetadataRequest);
replicaManager = new ReplicaManagerBuilder().
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
index 187743ce0b1..2cf574a1403 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
@@ -113,7 +113,7 @@ public class MetadataRequestBenchmark {
private Metrics metrics = new Metrics();
private int brokerId = 1;
private ZkMetadataCache metadataCache =
MetadataCache.zkMetadataCache(brokerId,
- MetadataVersion.latest(), BrokerFeatures.createEmpty(), null);
+ MetadataVersion.latest(), BrokerFeatures.createEmpty(), null, false);
private ClientQuotaManager clientQuotaManager =
Mockito.mock(ClientQuotaManager.class);
private ClientRequestQuotaManager clientRequestQuotaManager =
Mockito.mock(ClientRequestQuotaManager.class);
private ControllerMutationQuotaManager controllerMutationQuotaManager =
Mockito.mock(ControllerMutationQuotaManager.class);
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
index b0502c955a5..fe0335e92e6 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
@@ -115,7 +115,7 @@ public class CheckpointBench {
final MetadataCache metadataCache =
MetadataCache.zkMetadataCache(this.brokerProperties.brokerId(),
this.brokerProperties.interBrokerProtocolVersion(),
- BrokerFeatures.createEmpty(), null);
+ BrokerFeatures.createEmpty(), null, false);
this.quotaManagers =
QuotaFactory.instantiate(this.brokerProperties,
this.metrics,
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
index 0bbf934e6e8..d2a173e4748 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
@@ -162,7 +162,7 @@ public class PartitionCreationBench {
setBrokerTopicStats(brokerTopicStats).
setMetadataCache(MetadataCache.zkMetadataCache(this.brokerProperties.brokerId(),
this.brokerProperties.interBrokerProtocolVersion(),
BrokerFeatures.createEmpty(),
- null)).
+ null, false)).
setLogDirFailureChannel(failureChannel).
setAlterPartitionManager(alterPartitionManager).
build();