This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.4 by this push:
new d42fa503365 KAFKA-14656: Send UMR first during ZK migration (#13159)
d42fa503365 is described below
commit d42fa503365cc0a87f4e52e171ca399e9a59bc77
Author: David Arthur <[email protected]>
AuthorDate: Tue Jan 31 01:31:45 2023 -0500
KAFKA-14656: Send UMR first during ZK migration (#13159)
When in migration-from-ZK mode and sending RPCs to ZK-based brokers, the
KRaft controller must send
full UpdateMetadataRequests prior to sending full LeaderAndIsrRequests. If
the controller sends the
requests in the other order, and the ZK-based broker does not already know
about some of the nodes
referenced in the LeaderAndIsrRequest, it will reject the request.
This PR includes an integration test, and a number of other small fixes for
dual-write.
Co-authored-by: Akhilesh C <[email protected]>
Reviewers: Colin P. McCabe <[email protected]>
---
.../scala/kafka/common/InterBrokerSendThread.scala | 2 +-
.../kafka/migration/MigrationPropagator.scala | 30 ++--
.../server/BrokerToControllerChannelManager.scala | 20 ++-
.../main/scala/kafka/zk/ZkMigrationClient.scala | 37 ++---
.../test/junit/ZkClusterInvocationContext.java | 2 +
.../kafka/zk/ZkMigrationIntegrationTest.scala | 164 ++++++++++++++++++++-
.../kafka/integration/KafkaServerTestHarness.scala | 2 +-
.../unit/kafka/zk/ZkMigrationClientTest.scala | 16 +-
.../kafka/controller/ProducerIdControlManager.java | 8 +-
.../org/apache/kafka/image/ClientQuotaImage.java | 4 +
.../org/apache/kafka/image/ConfigurationImage.java | 4 +
.../apache/kafka/image/ConfigurationsImage.java | 13 ++
.../metadata/migration/KRaftMigrationDriver.java | 70 ++++++++-
.../kafka/metadata/migration/MigrationClient.java | 18 +++
14 files changed, 334 insertions(+), 56 deletions(-)
diff --git a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
index e1bea72ddd0..2b77c29bc24 100644
--- a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
+++ b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
@@ -33,7 +33,7 @@ import scala.jdk.CollectionConverters._
*/
abstract class InterBrokerSendThread(
name: String,
- var networkClient: KafkaClient,
+ @volatile var networkClient: KafkaClient,
requestTimeoutMs: Int,
time: Time,
isInterruptible: Boolean = true
diff --git a/core/src/main/scala/kafka/migration/MigrationPropagator.scala
b/core/src/main/scala/kafka/migration/MigrationPropagator.scala
index 308d8f06f15..355831166ee 100644
--- a/core/src/main/scala/kafka/migration/MigrationPropagator.scala
+++ b/core/src/main/scala/kafka/migration/MigrationPropagator.scala
@@ -92,14 +92,20 @@ class MigrationPropagator(
val oldZkBrokers = zkBrokers -- changedZkBrokers
val brokersChanged = !delta.clusterDelta().changedBrokers().isEmpty
+ // First send metadata about the live/dead brokers to all the zk brokers.
if (changedZkBrokers.nonEmpty) {
// Update new Zk brokers about all the metadata.
requestBatch.addUpdateMetadataRequestForBrokers(changedZkBrokers.toSeq,
image.topics().partitions().keySet().asScala)
- // Send these requests first to make sure, we don't add all the
partition metadata to the
- // old brokers as well.
- requestBatch.sendRequestsToBrokers(zkControllerEpoch)
- requestBatch.newBatch()
+ }
+ if (brokersChanged) {
+ requestBatch.addUpdateMetadataRequestForBrokers(oldZkBrokers.toSeq)
+ }
+ requestBatch.sendRequestsToBrokers(zkControllerEpoch)
+ requestBatch.newBatch()
+ // Now send LISR, UMR and StopReplica requests for both new zk brokers and
existing zk
+ // brokers based on the topic changes.
+ if (changedZkBrokers.nonEmpty) {
// For new the brokers, check if there are partition assignments and add
LISR appropriately.
image.topics().partitions().asScala.foreach { case (tp,
partitionRegistration) =>
val replicas = partitionRegistration.replicas.toSet
@@ -118,9 +124,8 @@ class MigrationPropagator(
}
}
- // If there are new brokers (including KRaft brokers) or if there are
changes in topic
- // metadata, let's send UMR about the changes to the old Zk brokers.
- if (brokersChanged || !delta.topicsDelta().deletedTopicIds().isEmpty ||
!delta.topicsDelta().changedTopics().isEmpty) {
+ // If there are changes in topic metadata, let's send UMR about the
changes to the old Zk brokers.
+ if (!delta.topicsDelta().deletedTopicIds().isEmpty ||
!delta.topicsDelta().changedTopics().isEmpty) {
requestBatch.addUpdateMetadataRequestForBrokers(oldZkBrokers.toSeq)
}
@@ -175,14 +180,20 @@ class MigrationPropagator(
override def sendRPCsToBrokersFromMetadataImage(image: MetadataImage,
zkControllerEpoch: Int): Unit = {
publishMetadata(image)
+
+ val zkBrokers =
image.cluster().zkBrokers().keySet().asScala.map(_.toInt).toSeq
+ val partitions = image.topics().partitions()
+ // First send all the metadata before sending any other requests to make
sure subsequent
+ // requests are handled correctly.
requestBatch.newBatch()
+ requestBatch.addUpdateMetadataRequestForBrokers(zkBrokers,
partitions.keySet().asScala)
+ requestBatch.sendRequestsToBrokers(zkControllerEpoch)
+ requestBatch.newBatch()
// When we need to send RPCs from the image, we're sending 'full' requests
meaning we let
// every broker know about all the metadata and all the LISR requests it
needs to handle.
// Note that we cannot send StopReplica requests from the image. We don't
have any state
// about brokers that host a replica but are not part of the replica set
known by the Controller.
- val zkBrokers =
image.cluster().zkBrokers().keySet().asScala.map(_.toInt).toSeq
- val partitions = image.topics().partitions()
partitions.asScala.foreach{ case (tp, partitionRegistration) =>
val leaderIsrAndControllerEpochOpt =
MigrationControllerChannelContext.partitionLeadershipInfo(image, tp)
leaderIsrAndControllerEpochOpt match {
@@ -194,7 +205,6 @@ class MigrationPropagator(
case None => None
}
}
- requestBatch.addUpdateMetadataRequestForBrokers(zkBrokers,
partitions.keySet().asScala)
requestBatch.sendRequestsToBrokers(zkControllerEpoch)
}
diff --git
a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
index b1c27287560..3c2a6a2acbd 100644
--- a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
@@ -315,12 +315,12 @@ class BrokerToControllerRequestThread(
private def maybeResetNetworkClient(controllerInformation:
ControllerInformation): Unit = {
if (isNetworkClientForZkController !=
controllerInformation.isZkController) {
debug("Controller changed to " + (if (isNetworkClientForZkController)
"kraft" else "zk") + " mode. " +
- "Resetting network client")
+ s"Resetting network client with new controller information :
${controllerInformation}")
// Close existing network client.
- if (networkClient != null) {
- networkClient.initiateClose()
- networkClient.close()
- }
+ val oldClient = networkClient
+ oldClient.initiateClose()
+ oldClient.close()
+
isNetworkClientForZkController = controllerInformation.isZkController
updateControllerAddress(controllerInformation.node.orNull)
controllerInformation.node.foreach(n =>
metadataUpdater.setNodes(Seq(n).asJava))
@@ -382,6 +382,7 @@ class BrokerToControllerRequestThread(
}
private[server] def handleResponse(queueItem:
BrokerToControllerQueueItem)(response: ClientResponse): Unit = {
+ debug(s"Request ${queueItem.request} received $response")
if (response.authenticationException != null) {
error(s"Request ${queueItem.request} failed due to authentication error
with controller",
response.authenticationException)
@@ -394,9 +395,16 @@ class BrokerToControllerRequestThread(
updateControllerAddress(null)
requestQueue.putFirst(queueItem)
} else if
(response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
+ debug(s"Request ${queueItem.request} received NOT_CONTROLLER exception.
Disconnecting the " +
+ s"connection to the stale controller
${activeControllerAddress().map(_.idString).getOrElse("null")}")
// just close the controller connection and wait for metadata cache
update in doWork
activeControllerAddress().foreach { controllerAddress =>
- networkClient.disconnect(controllerAddress.idString)
+ try {
+ // We don't care if disconnect has an error, just log it and get a
new network client
+ networkClient.disconnect(controllerAddress.idString)
+ } catch {
+ case t: Throwable => error("Had an error while disconnecting from
NetworkClient.", t)
+ }
updateControllerAddress(null)
}
diff --git a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
index 1ab05d5a75d..61b8b2afb51 100644
--- a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
+++ b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
@@ -208,7 +208,7 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends
MigrationClient with Lo
recordConsumer.accept(List(new ApiMessageAndVersion(new
ProducerIdsRecord()
.setBrokerEpoch(-1)
.setBrokerId(producerIdBlock.assignedBrokerId)
- .setNextProducerId(producerIdBlock.firstProducerId),
ProducerIdsRecord.HIGHEST_SUPPORTED_VERSION)).asJava)
+ .setNextProducerId(producerIdBlock.firstProducerId()),
ProducerIdsRecord.HIGHEST_SUPPORTED_VERSION)).asJava)
case None => // Nothing to migrate
}
}
@@ -364,24 +364,27 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends
MigrationClient with Lo
}
}
- def writeClientQuotas(entity: ClientQuotaEntity,
- quotas: util.Map[String, Double],
- state: ZkMigrationLeadershipState):
ZkMigrationLeadershipState = {
- val entityMap = entity.entries().asScala
- val hasUser = entityMap.contains(ConfigType.User)
- val hasClient = entityMap.contains(ConfigType.Client)
- val hasIp = entityMap.contains(ConfigType.Ip)
+ override def writeClientQuotas(
+ entity: util.Map[String, String],
+ quotas: util.Map[String, java.lang.Double],
+ state: ZkMigrationLeadershipState
+ ): ZkMigrationLeadershipState = {
+
+ val entityMap = entity.asScala
+ val hasUser = entityMap.contains(ClientQuotaEntity.USER)
+ val hasClient = entityMap.contains(ClientQuotaEntity.CLIENT_ID)
+ val hasIp = entityMap.contains(ClientQuotaEntity.IP)
val props = new Properties()
// We store client quota values as strings in the ZK JSON
quotas.forEach { case (key, value) => props.put(key, value.toString) }
val (configType, path) = if (hasUser && !hasClient) {
- (Some(ConfigType.User), Some(entityMap(ConfigType.User)))
+ (Some(ConfigType.User), Some(entityMap(ClientQuotaEntity.USER)))
} else if (hasUser && hasClient) {
- (Some(ConfigType.User),
Some(s"${entityMap(ConfigType.User)}/clients/${entityMap(ConfigType.Client)}"))
+ (Some(ConfigType.User),
Some(s"${entityMap(ClientQuotaEntity.USER)}/clients/${entityMap(ClientQuotaEntity.CLIENT_ID)}"))
} else if (hasClient) {
- (Some(ConfigType.Client), Some(entityMap(ConfigType.Client)))
+ (Some(ConfigType.Client), Some(entityMap(ClientQuotaEntity.CLIENT_ID)))
} else if (hasIp) {
- (Some(ConfigType.Ip), Some(entityMap(ConfigType.Ip)))
+ (Some(ConfigType.Ip), Some(entityMap(ClientQuotaEntity.IP)))
} else {
(None, None)
}
@@ -399,7 +402,7 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends
MigrationClient with Lo
// If we didn't update the migration state, we failed to write the
client quota. Try again
// after recursively create its parent znodes
val createPath = if (hasUser && hasClient) {
-
s"${ConfigEntityTypeZNode.path(configType.get)}/${entityMap(ConfigType.User)}/clients"
+
s"${ConfigEntityTypeZNode.path(configType.get)}/${entityMap(ClientQuotaEntity.USER)}/clients"
} else {
ConfigEntityTypeZNode.path(configType.get)
}
@@ -414,7 +417,7 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends
MigrationClient with Lo
}
}
- def writeProducerId(nextProducerId: Long, state:
ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
+ override def writeProducerId(nextProducerId: Long, state:
ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
val newProducerIdBlockData =
ProducerIdBlockZNode.generateProducerIdBlockJson(
new ProducerIdsBlock(-1, nextProducerId,
ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE))
@@ -423,9 +426,9 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends
MigrationClient with Lo
state.withMigrationZkVersion(migrationZkVersion)
}
- def writeConfigs(resource: ConfigResource,
- configs: util.Map[String, String],
- state: ZkMigrationLeadershipState):
ZkMigrationLeadershipState = {
+ override def writeConfigs(resource: ConfigResource,
+ configs: util.Map[String, String],
+ state: ZkMigrationLeadershipState):
ZkMigrationLeadershipState = {
val configType = resource.`type`() match {
case ConfigResource.Type.BROKER => Some(ConfigType.Broker)
case ConfigResource.Type.TOPIC => Some(ConfigType.Topic)
diff --git
a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
index 5154982e472..26c51d70197 100644
--- a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
@@ -27,6 +27,7 @@ import kafka.test.ClusterInstance;
import kafka.utils.EmptyTestInfo;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
@@ -319,6 +320,7 @@ public class ZkClusterInvocationContext implements
TestTemplateInvocationContext
clusterReference.get().killBroker(i);
}
clusterReference.get().restartDeadBrokers(true);
+
clusterReference.get().adminClientConfig().put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers());
}
@Override
diff --git
a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
index d9b2e286956..231bf01c917 100644
--- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
@@ -16,26 +16,38 @@
*/
package kafka.zk
+import kafka.server.{ConfigType, KafkaConfig}
import kafka.test.ClusterInstance
-import kafka.test.annotation.{ClusterTest, Type}
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, Type}
import kafka.test.junit.ClusterTestExtensions
import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance
-import org.apache.kafka.clients.admin.NewTopic
-import org.apache.kafka.common.config.TopicConfig
+import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasResult,
AlterConfigOp, AlterConfigsResult, ConfigEntry, NewTopic}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
+import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.image.{MetadataDelta, MetadataImage,
MetadataProvenance}
import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
-import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull}
+import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion,
ProducerIdsBlock}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse,
assertNotNull, assertTrue}
import org.junit.jupiter.api.extension.ExtendWith
+import org.slf4j.LoggerFactory
import java.util
+import java.util.Properties
import java.util.concurrent.TimeUnit
+import scala.collection.Seq
import scala.jdk.CollectionConverters._
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class ZkMigrationIntegrationTest {
+ val log = LoggerFactory.getLogger(classOf[ZkMigrationIntegrationTest])
+
class MetadataDeltaVerifier {
val metadataDelta = new MetadataDelta(MetadataImage.EMPTY)
var offset = 0
@@ -102,4 +114,146 @@ class ZkMigrationIntegrationTest {
migrationState =
migrationClient.releaseControllerLeadership(migrationState)
}
+
+ @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion =
MetadataVersion.IBP_3_4_IV0, serverProperties = Array(
+ new ClusterConfigProperty(key = "inter.broker.listener.name", value =
"EXTERNAL"),
+ new ClusterConfigProperty(key = "listeners", value =
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+ new ClusterConfigProperty(key = "advertised.listeners", value =
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+ new ClusterConfigProperty(key = "listener.security.protocol.map", value =
"EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
+ ))
+ def testDualWrite(zkCluster: ClusterInstance): Unit = {
+ // Create a topic in ZK mode
+ var admin = zkCluster.createAdminClient()
+ val newTopics = new util.ArrayList[NewTopic]()
+ newTopics.add(new NewTopic("test", 2, 3.toShort)
+ .configs(Map(TopicConfig.SEGMENT_BYTES_CONFIG -> "102400",
TopicConfig.SEGMENT_MS_CONFIG -> "300000").asJava))
+ val createTopicResult = admin.createTopics(newTopics)
+ createTopicResult.all().get(60, TimeUnit.SECONDS)
+ admin.close()
+
+ // Verify the configs exist in ZK
+ val zkClient =
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying().zkClient
+ val propsBefore = zkClient.getEntityConfigs(ConfigType.Topic, "test")
+ assertEquals("102400",
propsBefore.getProperty(TopicConfig.SEGMENT_BYTES_CONFIG))
+ assertEquals("300000",
propsBefore.getProperty(TopicConfig.SEGMENT_MS_CONFIG))
+
+ // Bootstrap the ZK cluster ID into KRaft
+ val clusterId = zkCluster.clusterId()
+ val kraftCluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder().
+ setBootstrapMetadataVersion(MetadataVersion.IBP_3_4_IV0).
+ setClusterId(Uuid.fromString(clusterId)).
+ setNumBrokerNodes(0).
+ setNumControllerNodes(1).build())
+ .setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
+ .setConfigProp(KafkaConfig.ZkConnectProp,
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
+ .build()
+ try {
+ kraftCluster.format()
+ kraftCluster.startup()
+ val readyFuture =
kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3)
+
+ // Allocate a transactional producer ID while in ZK mode
+ allocateProducerId(zkCluster.bootstrapServers())
+ val producerIdBlock = readProducerIdBlock(zkClient)
+
+ // Enable migration configs and restart brokers
+ log.info("Restart brokers in migration mode")
+ val clientProps = kraftCluster.controllerClientProperties()
+ val voters = clientProps.get(RaftConfig.QUORUM_VOTERS_CONFIG)
+
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp,
"true")
+
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG,
voters)
+
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp,
"CONTROLLER")
+
zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp,
"CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
+ zkCluster.rollingBrokerRestart()
+ zkCluster.waitForReadyBrokers()
+ readyFuture.get(30, TimeUnit.SECONDS)
+
+ // Wait for migration to begin
+ log.info("Waiting for ZK migration to begin")
+ TestUtils.waitUntilTrue(() => zkClient.getControllerId.contains(3000),
"Timed out waiting for KRaft controller to take over")
+
+ // Alter the metadata
+ log.info("Updating metadata with AdminClient")
+ admin = zkCluster.createAdminClient()
+ alterTopicConfig(admin).all().get(60, TimeUnit.SECONDS)
+ alterClientQuotas(admin).all().get(60, TimeUnit.SECONDS)
+
+ // Verify the changes made to KRaft are seen in ZK
+ log.info("Verifying metadata changes with ZK")
+ verifyTopicConfigs(zkClient)
+ verifyClientQuotas(zkClient)
+ allocateProducerId(zkCluster.bootstrapServers())
+ verifyProducerId(producerIdBlock, zkClient)
+
+ } finally {
+ zkCluster.stop()
+ kraftCluster.close()
+ }
+ }
+
+ def allocateProducerId(bootstrapServers: String): Unit = {
+ val props = new Properties()
+ props.put("bootstrap.servers", bootstrapServers)
+ props.put("transactional.id", "some-transaction-id")
+ val producer = new KafkaProducer[String, String](props, new
StringSerializer(), new StringSerializer())
+ producer.initTransactions()
+ producer.beginTransaction()
+ producer.send(new ProducerRecord[String, String]("test", "", "one"))
+ producer.commitTransaction()
+ producer.flush()
+ producer.close()
+ }
+
+ def readProducerIdBlock(zkClient: KafkaZkClient): ProducerIdsBlock = {
+ val (dataOpt, _) = zkClient.getDataAndVersion(ProducerIdBlockZNode.path)
+ dataOpt.map(ProducerIdBlockZNode.parseProducerIdBlockData).get
+ }
+
+ def alterTopicConfig(admin: Admin): AlterConfigsResult = {
+ val topicResource = new ConfigResource(ConfigResource.Type.TOPIC, "test")
+ val alterConfigs = Seq(
+ new AlterConfigOp(new ConfigEntry(TopicConfig.SEGMENT_BYTES_CONFIG,
"204800"), AlterConfigOp.OpType.SET),
+ new AlterConfigOp(new ConfigEntry(TopicConfig.SEGMENT_MS_CONFIG, null),
AlterConfigOp.OpType.DELETE)
+ ).asJavaCollection
+ admin.incrementalAlterConfigs(Map(topicResource -> alterConfigs).asJava)
+ }
+
+ def alterClientQuotas(admin: Admin): AlterClientQuotasResult = {
+ val quotas = new util.ArrayList[ClientQuotaAlteration]()
+ quotas.add(new ClientQuotaAlteration(
+ new ClientQuotaEntity(Map("user" -> "user1").asJava),
+ List(new ClientQuotaAlteration.Op("consumer_byte_rate", 1000.0)).asJava))
+ quotas.add(new ClientQuotaAlteration(
+ new ClientQuotaEntity(Map("user" -> "user1", "client-id" ->
"clientA").asJava),
+ List(new ClientQuotaAlteration.Op("consumer_byte_rate", 800.0), new
ClientQuotaAlteration.Op("producer_byte_rate", 100.0)).asJava))
+ quotas.add(new ClientQuotaAlteration(
+ new ClientQuotaEntity(Map("ip" -> "8.8.8.8").asJava),
+ List(new ClientQuotaAlteration.Op("connection_creation_rate",
10.0)).asJava))
+ admin.alterClientQuotas(quotas)
+ }
+
+ def verifyTopicConfigs(zkClient: KafkaZkClient): Unit = {
+ TestUtils.retry(10000) {
+ val propsAfter = zkClient.getEntityConfigs(ConfigType.Topic, "test")
+ assertEquals("204800",
propsAfter.getProperty(TopicConfig.SEGMENT_BYTES_CONFIG))
+ assertFalse(propsAfter.containsKey(TopicConfig.SEGMENT_MS_CONFIG))
+ }
+ }
+
+ def verifyClientQuotas(zkClient: KafkaZkClient): Unit = {
+ TestUtils.retry(10000) {
+ assertEquals("1000.0", zkClient.getEntityConfigs(ConfigType.User,
"user1").getProperty("consumer_byte_rate"))
+ assertEquals("800.0", zkClient.getEntityConfigs("users/user1/clients",
"clientA").getProperty("consumer_byte_rate"))
+ assertEquals("100.0", zkClient.getEntityConfigs("users/user1/clients",
"clientA").getProperty("producer_byte_rate"))
+ assertEquals("10.0", zkClient.getEntityConfigs(ConfigType.Ip,
"8.8.8.8").getProperty("connection_creation_rate"))
+ }
+ }
+
+ def verifyProducerId(firstProducerIdBlock: ProducerIdsBlock, zkClient:
KafkaZkClient): Unit = {
+ TestUtils.retry(10000) {
+ val producerIdBlock = readProducerIdBlock(zkClient)
+ assertTrue(firstProducerIdBlock.firstProducerId() <
producerIdBlock.firstProducerId())
+ }
+ }
}
diff --git
a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 1ef4d47995a..cf52b5ce9b3 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -358,7 +358,7 @@ abstract class KafkaServerTestHarness extends
QuorumTestHarness {
time = brokerTime(config.brokerId),
threadNamePrefix = None,
startup = false,
- enableZkApiForwarding = isZkMigrationTest()
+ enableZkApiForwarding = isZkMigrationTest() ||
(config.migrationEnabled &&
config.interBrokerProtocolVersion.isApiForwardingEnabled)
)
}
}
diff --git a/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala
b/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala
index b6d01787785..8694f10c14a 100644
--- a/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala
@@ -164,11 +164,11 @@ class ZkMigrationClientTest extends QuorumTestHarness {
adminZkClient: AdminZkClient,
migrationState:
ZkMigrationLeadershipState,
entity: Map[String, String],
- quotas: Map[String, Double],
+ quotas: Map[String, java.lang.Double],
zkEntityType: String,
zkEntityName: String):
ZkMigrationLeadershipState = {
val nextMigrationState = migrationClient.writeClientQuotas(
- new ClientQuotaEntity(entity.asJava),
+ entity.asJava,
quotas.asJava,
migrationState)
val newProps = ZkAdminManager.clientQuotaPropsToDoubleMap(
@@ -187,25 +187,25 @@ class ZkMigrationClientTest extends QuorumTestHarness {
assertEquals(0, migrationState.migrationZkVersion())
migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient,
migrationState,
- Map(ConfigType.User -> "user1"),
+ Map(ClientQuotaEntity.USER -> "user1"),
Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0),
ConfigType.User, "user1")
assertEquals(1, migrationState.migrationZkVersion())
migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient,
migrationState,
- Map(ConfigType.User -> "user1"),
+ Map(ClientQuotaEntity.USER -> "user1"),
Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 10000.0),
ConfigType.User, "user1")
assertEquals(2, migrationState.migrationZkVersion())
migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient,
migrationState,
- Map(ConfigType.User -> "user1"),
+ Map(ClientQuotaEntity.USER -> "user1"),
Map.empty,
ConfigType.User, "user1")
assertEquals(3, migrationState.migrationZkVersion())
migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient,
migrationState,
- Map(ConfigType.User -> "user1"),
+ Map(ClientQuotaEntity.USER -> "user1"),
Map(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 100.0),
ConfigType.User, "user1")
assertEquals(4, migrationState.migrationZkVersion())
@@ -215,14 +215,14 @@ class ZkMigrationClientTest extends QuorumTestHarness {
def testWriteNewClientQuotas(): Unit = {
assertEquals(0, migrationState.migrationZkVersion())
migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient,
migrationState,
- Map(ConfigType.User -> "user2"),
+ Map(ClientQuotaEntity.USER -> "user2"),
Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0,
QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 100.0),
ConfigType.User, "user2")
assertEquals(1, migrationState.migrationZkVersion())
migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient,
migrationState,
- Map(ConfigType.User -> "user2", ConfigType.Client -> "clientA"),
+ Map(ClientQuotaEntity.USER -> "user2", ClientQuotaEntity.CLIENT_ID ->
"clientA"),
Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 10000.0,
QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 200.0),
ConfigType.User, "user2/clients/clientA")
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java
index 47e4e1b430f..2c23bba9dd7 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java
@@ -64,10 +64,12 @@ public class ProducerIdControlManager {
}
void replay(ProducerIdsRecord record) {
- long currentNextProducerId = nextProducerBlock.get().firstProducerId();
- if (record.nextProducerId() <= currentNextProducerId) {
+ // During a migration, we may be calling replay() without ever having
called generateNextProducerId(),
+ // so the next producer block could be EMPTY
+ ProducerIdsBlock nextBlock = nextProducerBlock.get();
+ if (nextBlock != ProducerIdsBlock.EMPTY && record.nextProducerId() <=
nextBlock.firstProducerId()) {
throw new RuntimeException("Next Producer ID from replayed record
(" + record.nextProducerId() + ")" +
- " is not greater than current next Producer ID (" +
currentNextProducerId + ")");
+ " is not greater than current next Producer ID (" +
nextBlock.firstProducerId() + ")");
} else {
nextProducerBlock.set(new ProducerIdsBlock(record.brokerId(),
record.nextProducerId(), ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE));
brokerEpoch.set(record.brokerEpoch());
diff --git
a/metadata/src/main/java/org/apache/kafka/image/ClientQuotaImage.java
b/metadata/src/main/java/org/apache/kafka/image/ClientQuotaImage.java
index 51104c92ab2..6e0b84bbfbb 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ClientQuotaImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ClientQuotaImage.java
@@ -52,6 +52,10 @@ public final class ClientQuotaImage {
return quotas;
}
+ public Map<String, Double> quotaMap() {
+ return Collections.unmodifiableMap(quotas);
+ }
+
public void write(
ClientQuotaEntity entity,
ImageWriter writer,
diff --git
a/metadata/src/main/java/org/apache/kafka/image/ConfigurationImage.java
b/metadata/src/main/java/org/apache/kafka/image/ConfigurationImage.java
index cac197a224d..bf74bb1aeb7 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ConfigurationImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ConfigurationImage.java
@@ -58,6 +58,10 @@ public final class ConfigurationImage {
return properties;
}
+ public Map<String, String> toMap() {
+ return Collections.unmodifiableMap(data);
+ }
+
public void write(
ConfigResource configResource,
ImageWriter writer,
diff --git
a/metadata/src/main/java/org/apache/kafka/image/ConfigurationsImage.java
b/metadata/src/main/java/org/apache/kafka/image/ConfigurationsImage.java
index e1ec11e059a..101594615fb 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ConfigurationsImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ConfigurationsImage.java
@@ -61,6 +61,19 @@ public final class ConfigurationsImage {
}
}
+ /**
+ * Return the underlying config data for a given resource as an immutable
map. This does not apply
+ * configuration overrides or include entity defaults for the resource
type.
+ */
+ public Map<String, String> configMapForResource(ConfigResource
configResource) {
+ ConfigurationImage configurationImage = data.get(configResource);
+ if (configurationImage != null) {
+ return configurationImage.toMap();
+ } else {
+ return Collections.emptyMap();
+ }
+ }
+
public void write(ImageWriter writer, ImageWriterOptions options) {
for (Entry<ConfigResource, ConfigurationImage> entry :
data.entrySet()) {
ConfigResource configResource = entry.getKey();
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
index f3bdeb81507..fd2d85081c4 100644
---
a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
+++
b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.metadata.migration;
+import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.image.MetadataDelta;
@@ -28,18 +30,22 @@ import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
+import java.util.stream.Collectors;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -403,7 +409,11 @@ public class KRaftMigrationDriver implements
MetadataPublisher {
AtomicInteger count = new AtomicInteger(0);
zkMigrationClient.readAllMetadata(batch -> {
try {
- log.info("Migrating {} records from ZK: {}",
batch.size(), batch);
+ if (log.isTraceEnabled()) {
+ log.trace("Migrating {} records from ZK: {}",
batch.size(), recordBatchToString(batch));
+ } else {
+ log.info("Migrating {} records from ZK",
batch.size());
+ }
CompletableFuture<?> future =
zkRecordConsumer.acceptBatch(batch);
count.addAndGet(batch.size());
future.get();
@@ -446,9 +456,14 @@ public class KRaftMigrationDriver implements
MetadataPublisher {
// Ignore sending RPCs to the brokers since we're no longer in the
state.
if (migrationState ==
MigrationState.KRAFT_CONTROLLER_TO_BROKER_COMM) {
if
(image.highestOffsetAndEpoch().compareTo(migrationLeadershipState.offsetAndEpoch())
>= 0) {
+ log.trace("Sending RPCs to broker before moving to
dual-write mode using " +
+ "at offset and epoch {}",
image.highestOffsetAndEpoch());
propagator.sendRPCsToBrokersFromMetadataImage(image,
migrationLeadershipState.zkControllerEpoch());
// Migration leadership state doesn't change since we're
not doing any Zk writes.
transitionTo(MigrationState.DUAL_WRITE);
+ } else {
+ log.trace("Ignoring using metadata image since migration
leadership state is at a greater offset and epoch {}",
+ migrationLeadershipState.offsetAndEpoch());
}
}
}
@@ -470,10 +485,11 @@ public class KRaftMigrationDriver implements
MetadataPublisher {
@Override
public void run() throws Exception {
KRaftMigrationDriver.this.image = image;
+ String metadataType = isSnapshot ? "snapshot" : "delta";
if (migrationState != MigrationState.DUAL_WRITE) {
- log.trace("Received metadata change, but the controller is not
in dual-write " +
- "mode. Ignoring the change to be replicated to
Zookeeper");
+ log.trace("Received metadata {}, but the controller is not in
dual-write " +
+ "mode. Ignoring the change to be replicated to Zookeeper",
metadataType);
return;
}
if (delta.featuresDelta() != null) {
@@ -499,16 +515,60 @@ public class KRaftMigrationDriver implements
MetadataPublisher {
});
}
+ // For configs and client quotas, we need to send all of the
data to the ZK client since we persist
+ // everything for a given entity in a single ZK node.
+ if (delta.configsDelta() != null) {
+ delta.configsDelta().changes().forEach((configResource,
configDelta) ->
+ apply("Updating config resource " + configResource,
migrationState ->
+ zkMigrationClient.writeConfigs(configResource,
image.configs().configMapForResource(configResource), migrationState)));
+ }
+
+ if (delta.clientQuotasDelta() != null) {
+
delta.clientQuotasDelta().changes().forEach((clientQuotaEntity,
clientQuotaDelta) -> {
+ Map<String, Double> quotaMap =
image.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
+ apply("Updating client quota " + clientQuotaEntity,
migrationState ->
+
zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap,
migrationState));
+ });
+ }
+
+ if (delta.producerIdsDelta() != null) {
+ apply("Updating next producer ID", migrationState ->
+
zkMigrationClient.writeProducerId(delta.producerIdsDelta().nextProducerId(),
migrationState));
+ }
- apply("Write MetadataDelta to Zk", state ->
zkMigrationClient.writeMetadataDeltaToZookeeper(delta, image, state));
// TODO: Unhappy path: Probably relinquish leadership and let
new controller
// retry the write?
+ log.trace("Sending RPCs to brokers for metadata {}.",
metadataType);
propagator.sendRPCsToBrokersFromMetadataDelta(delta, image,
migrationLeadershipState.zkControllerEpoch());
} else {
- String metadataType = isSnapshot ? "snapshot" : "delta";
log.info("Ignoring {} {} which contains metadata that has
already been written to ZK.", metadataType, provenance);
}
}
}
+
+ static String recordBatchToString(Collection<ApiMessageAndVersion> batch) {
+ String batchString = batch.stream().map(apiMessageAndVersion -> {
+ if (apiMessageAndVersion.message().apiKey() ==
MetadataRecordType.CONFIG_RECORD.id()) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("ApiMessageAndVersion(");
+ ConfigRecord record = (ConfigRecord)
apiMessageAndVersion.message();
+ sb.append("ConfigRecord(");
+ sb.append("resourceType=");
+ sb.append(record.resourceType());
+ sb.append(", resourceName=");
+ sb.append(record.resourceName());
+ sb.append(", name=");
+ sb.append(record.name());
+ sb.append(")");
+ sb.append(" at version ");
+ sb.append(apiMessageAndVersion.version());
+ sb.append(")");
+ return sb.toString();
+ } else {
+ return apiMessageAndVersion.toString();
+ }
+ }).collect(Collectors.joining(","));
+ return "[" + batchString + "]";
+ }
}
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java
b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java
index 1710458cd57..11284b399c1 100644
---
a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java
+++
b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java
@@ -17,6 +17,7 @@
package org.apache.kafka.metadata.migration;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.metadata.PartitionRegistration;
@@ -85,6 +86,23 @@ public interface MigrationClient {
ZkMigrationLeadershipState state
);
+ ZkMigrationLeadershipState writeConfigs(
+ ConfigResource configResource,
+ Map<String, String> configMap,
+ ZkMigrationLeadershipState state
+ );
+
+ ZkMigrationLeadershipState writeClientQuotas(
+ Map<String, String> clientQuotaEntity,
+ Map<String, Double> quotas,
+ ZkMigrationLeadershipState state
+ );
+
+ ZkMigrationLeadershipState writeProducerId(
+ long nextProducerId,
+ ZkMigrationLeadershipState state
+ );
+
void readAllMetadata(Consumer<List<ApiMessageAndVersion>> batchConsumer,
Consumer<Integer> brokerIdConsumer);
Set<Integer> readBrokerIds();