This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5dcdf71dec4 MINOR: Improved error handling in ZK migration (#13372)
5dcdf71dec4 is described below
commit 5dcdf71dec4124b67f13fcd1561faef3dac38d55
Author: David Arthur <[email protected]>
AuthorDate: Thu Mar 16 17:21:18 2023 -0400
MINOR: Improved error handling in ZK migration (#13372)
This patch fixes many small issues to improve error handling and logging
during the ZK migration. A test was added
to simulate a ZK session expiration to ensure the correctness of the
migration driver.
With this change, ZK errors thrown during the migration will not hit the
fault handler registered with with
KRaftMigrationDriver, but they will be logged.
Reviewers: Colin P. McCabe <[email protected]>
---
.../main/scala/kafka/zk/ZkMigrationClient.scala | 160 +++++++++++++--------
.../kafka/zk/ZkMigrationIntegrationTest.scala | 5 +-
.../metadata/migration/KRaftMigrationDriver.java | 84 +++++++----
.../kafka/metadata/migration/MigrationClient.java | 13 --
.../migration/MigrationClientAuthException.java | 26 ++++
.../migration/MigrationClientException.java | 38 +++++
.../migration/KRaftMigrationDriverTest.java | 77 ++++++++--
7 files changed, 288 insertions(+), 115 deletions(-)
diff --git a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
index 33e9dfb3342..cfcf7221761 100644
--- a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
+++ b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
@@ -28,11 +28,10 @@ import
org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData
import org.apache.kafka.common.metadata._
import org.apache.kafka.common.quota.ClientQuotaEntity
import org.apache.kafka.common.{TopicPartition, Uuid}
-import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
-import org.apache.kafka.metadata.migration.{MigrationClient,
ZkMigrationLeadershipState}
+import org.apache.kafka.metadata.migration.{MigrationClient,
MigrationClientAuthException, MigrationClientException,
ZkMigrationLeadershipState}
import org.apache.kafka.server.common.{ApiMessageAndVersion, ProducerIdsBlock}
-import org.apache.zookeeper.KeeperException.Code
+import org.apache.zookeeper.KeeperException.{AuthFailedException, Code,
NoAuthException, SessionClosedRequireAuthException}
import org.apache.zookeeper.{CreateMode, KeeperException}
import java.util
@@ -43,23 +42,48 @@ import scala.jdk.CollectionConverters._
/**
* Migration client in KRaft controller responsible for handling communication
to Zookeeper and
- * the ZkBrokers present in the cluster.
+ * the ZkBrokers present in the cluster. Methods that directly use
KafkaZkClient should use the wrapZkException
+ * wrapper function in order to translate KeeperExceptions into something
usable by the caller.
*/
class ZkMigrationClient(
zkClient: KafkaZkClient,
zkConfigEncoder: PasswordEncoder
) extends MigrationClient with Logging {
- override def getOrCreateMigrationRecoveryState(initialState:
ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
- zkClient.createTopLevelPaths()
- zkClient.getOrCreateMigrationState(initialState)
+ /**
+ * Wrap a function such that any KeeperExceptions is captured and converted
to a MigrationClientException.
+ * Any authentication related exception is converted to a
MigrationClientAuthException which may be treated
+ * differently by the caller.
+ */
+ @throws(classOf[MigrationClientException])
+ private def wrapZkException[T](fn: => T): T = {
+ try {
+ fn
+ } catch {
+ case e @ (_: MigrationClientException | _: MigrationClientAuthException)
=> throw e
+ case e @ (_: AuthFailedException | _: NoAuthException | _:
SessionClosedRequireAuthException) =>
+ // We don't expect authentication errors to be recoverable, so treat
them differently
+ throw new MigrationClientAuthException(e)
+ case e: KeeperException => throw new MigrationClientException(e)
+ }
}
- override def setMigrationRecoveryState(state: ZkMigrationLeadershipState):
ZkMigrationLeadershipState = {
+ override def getOrCreateMigrationRecoveryState(
+ initialState: ZkMigrationLeadershipState
+ ): ZkMigrationLeadershipState = wrapZkException {
+ zkClient.createTopLevelPaths()
+ zkClient.getOrCreateMigrationState(initialState)
+ }
+
+ override def setMigrationRecoveryState(
+ state: ZkMigrationLeadershipState
+ ): ZkMigrationLeadershipState = wrapZkException {
zkClient.updateMigrationState(state)
}
- override def claimControllerLeadership(state: ZkMigrationLeadershipState):
ZkMigrationLeadershipState = {
+ override def claimControllerLeadership(
+ state: ZkMigrationLeadershipState
+ ): ZkMigrationLeadershipState = wrapZkException {
zkClient.tryRegisterKRaftControllerAsActiveController(state.kraftControllerId(),
state.kraftControllerEpoch()) match {
case SuccessfulRegistrationResult(controllerEpoch,
controllerEpochZkVersion) =>
state.withZkController(controllerEpoch, controllerEpochZkVersion)
@@ -67,7 +91,9 @@ class ZkMigrationClient(
}
}
- override def releaseControllerLeadership(state: ZkMigrationLeadershipState):
ZkMigrationLeadershipState = {
+ override def releaseControllerLeadership(
+ state: ZkMigrationLeadershipState
+ ): ZkMigrationLeadershipState = wrapZkException {
try {
zkClient.deleteController(state.zkControllerEpochZkVersion())
state.withUnknownZkController()
@@ -76,14 +102,14 @@ class ZkMigrationClient(
// If the controller moved, no need to release
state.withUnknownZkController()
case t: Throwable =>
- throw new RuntimeException("Could not release controller leadership
due to underlying error", t)
+ throw new MigrationClientException("Could not release controller
leadership due to underlying error", t)
}
}
def migrateTopics(
recordConsumer: Consumer[util.List[ApiMessageAndVersion]],
brokerIdConsumer: Consumer[Integer]
- ): Unit = {
+ ): Unit = wrapZkException {
val topics = zkClient.getAllTopicsInCluster()
val topicConfigs = zkClient.getEntitiesConfigs(ConfigType.Topic, topics)
val replicaAssignmentAndTopicIds =
zkClient.getReplicaAssignmentAndTopicIdForTopics(topics)
@@ -137,7 +163,9 @@ class ZkMigrationClient(
}
}
- def migrateBrokerConfigs(recordConsumer:
Consumer[util.List[ApiMessageAndVersion]]): Unit = {
+ def migrateBrokerConfigs(
+ recordConsumer: Consumer[util.List[ApiMessageAndVersion]]
+ ): Unit = wrapZkException {
val batch = new util.ArrayList[ApiMessageAndVersion]()
val brokerEntities = zkClient.getAllEntitiesWithConfig(ConfigType.Broker)
@@ -165,7 +193,9 @@ class ZkMigrationClient(
}
}
- def migrateClientQuotas(recordConsumer:
Consumer[util.List[ApiMessageAndVersion]]): Unit = {
+ def migrateClientQuotas(
+ recordConsumer: Consumer[util.List[ApiMessageAndVersion]]
+ ): Unit = wrapZkException {
val adminZkClient = new AdminZkClient(zkClient)
def migrateEntityType(entityType: String): Unit = {
@@ -207,7 +237,9 @@ class ZkMigrationClient(
migrateEntityType(ConfigType.Ip)
}
- def migrateProducerId(recordConsumer:
Consumer[util.List[ApiMessageAndVersion]]): Unit = {
+ def migrateProducerId(
+ recordConsumer: Consumer[util.List[ApiMessageAndVersion]]
+ ): Unit = wrapZkException {
val (dataOpt, _) = zkClient.getDataAndVersion(ProducerIdBlockZNode.path)
dataOpt match {
case Some(data) =>
@@ -220,19 +252,21 @@ class ZkMigrationClient(
}
}
- override def readAllMetadata(batchConsumer:
Consumer[util.List[ApiMessageAndVersion]],
- brokerIdConsumer: Consumer[Integer]): Unit = {
+ override def readAllMetadata(
+ batchConsumer: Consumer[util.List[ApiMessageAndVersion]],
+ brokerIdConsumer: Consumer[Integer]
+ ): Unit = {
migrateTopics(batchConsumer, brokerIdConsumer)
migrateBrokerConfigs(batchConsumer)
migrateClientQuotas(batchConsumer)
migrateProducerId(batchConsumer)
}
- override def readBrokerIds(): util.Set[Integer] = {
- zkClient.getSortedBrokerList.map(Integer.valueOf).toSet.asJava
+ override def readBrokerIds(): util.Set[Integer] = wrapZkException {
+ new
util.HashSet[Integer](zkClient.getSortedBrokerList.map(Integer.valueOf).toSet.asJava)
}
- override def readBrokerIdsFromTopicAssignments(): util.Set[Integer] = {
+ override def readBrokerIdsFromTopicAssignments(): util.Set[Integer] =
wrapZkException {
val topics = zkClient.getAllTopicsInCluster()
val replicaAssignmentAndTopicIds =
zkClient.getReplicaAssignmentAndTopicIdForTopics(topics)
val brokersWithAssignments = new util.HashSet[Integer]()
@@ -244,10 +278,12 @@ class ZkMigrationClient(
brokersWithAssignments
}
- override def createTopic(topicName: String,
- topicId: Uuid,
- partitions: util.Map[Integer,
PartitionRegistration],
- state: ZkMigrationLeadershipState):
ZkMigrationLeadershipState = {
+ override def createTopic(
+ topicName: String,
+ topicId: Uuid,
+ partitions: util.Map[Integer, PartitionRegistration],
+ state: ZkMigrationLeadershipState
+ ): ZkMigrationLeadershipState = wrapZkException {
val assignments = partitions.asScala.map { case (partitionId, partition) =>
new TopicPartition(topicName, partitionId) ->
ReplicaAssignment(partition.replicas, partition.addingReplicas,
partition.removingReplicas)
@@ -289,18 +325,22 @@ class ZkMigrationClient(
state.withMigrationZkVersion(migrationZkVersion)
} else {
// not ok
- throw new RuntimeException(s"Failed to create or update topic
$topicName. ZK operation had results $resultCodes")
+ throw new MigrationClientException(s"Failed to create or update topic
$topicName. ZK operation had results $resultCodes")
}
}
- private def createTopicPartition(topicPartition: TopicPartition):
CreateRequest = {
+ private def createTopicPartition(
+ topicPartition: TopicPartition
+ ): CreateRequest = wrapZkException {
val path = TopicPartitionZNode.path(topicPartition)
CreateRequest(path, null, zkClient.defaultAcls(path),
CreateMode.PERSISTENT, Some(topicPartition))
}
- private def partitionStatePathAndData(topicPartition: TopicPartition,
- partitionRegistration:
PartitionRegistration,
- controllerEpoch: Int): (String,
Array[Byte]) = {
+ private def partitionStatePathAndData(
+ topicPartition: TopicPartition,
+ partitionRegistration: PartitionRegistration,
+ controllerEpoch: Int
+ ): (String, Array[Byte]) = {
val path = TopicPartitionStateZNode.path(topicPartition)
val data = TopicPartitionStateZNode.encode(LeaderIsrAndControllerEpoch(new
LeaderAndIsr(
partitionRegistration.leader,
@@ -311,22 +351,28 @@ class ZkMigrationClient(
(path, data)
}
- private def createTopicPartitionState(topicPartition: TopicPartition,
- partitionRegistration:
PartitionRegistration,
- controllerEpoch: Int): CreateRequest =
{
+ private def createTopicPartitionState(
+ topicPartition: TopicPartition,
+ partitionRegistration: PartitionRegistration,
+ controllerEpoch: Int
+ ): CreateRequest = {
val (path, data) = partitionStatePathAndData(topicPartition,
partitionRegistration, controllerEpoch)
CreateRequest(path, data, zkClient.defaultAcls(path),
CreateMode.PERSISTENT, Some(topicPartition))
}
- private def updateTopicPartitionState(topicPartition: TopicPartition,
- partitionRegistration:
PartitionRegistration,
- controllerEpoch: Int): SetDataRequest
= {
+ private def updateTopicPartitionState(
+ topicPartition: TopicPartition,
+ partitionRegistration: PartitionRegistration,
+ controllerEpoch: Int
+ ): SetDataRequest = {
val (path, data) = partitionStatePathAndData(topicPartition,
partitionRegistration, controllerEpoch)
SetDataRequest(path, data, ZkVersion.MatchAnyVersion, Some(topicPartition))
}
- override def updateTopicPartitions(topicPartitions: util.Map[String,
util.Map[Integer, PartitionRegistration]],
- state: ZkMigrationLeadershipState):
ZkMigrationLeadershipState = {
+ override def updateTopicPartitions(
+ topicPartitions: util.Map[String, util.Map[Integer,
PartitionRegistration]],
+ state: ZkMigrationLeadershipState
+ ): ZkMigrationLeadershipState = wrapZkException {
val requests = topicPartitions.asScala.flatMap { case (topicName,
partitionRegistrations) =>
partitionRegistrations.asScala.flatMap { case (partitionId,
partitionRegistration) =>
val topicPartition = new TopicPartition(topicName, partitionId)
@@ -341,18 +387,20 @@ class ZkMigrationClient(
if (resultCodes.forall { case (_, code) => code.equals(Code.OK) } ) {
state.withMigrationZkVersion(migrationZkVersion)
} else {
- throw new RuntimeException(s"Failed to update partition states:
$topicPartitions. ZK transaction had results $resultCodes")
+ throw new MigrationClientException(s"Failed to update partition
states: $topicPartitions. ZK transaction had results $resultCodes")
}
}
}
// Try to update an entity config and the migration state. If NoNode is
encountered, it probably means we
// need to recursively create the parent ZNode. In this case, return None.
- def tryWriteEntityConfig(entityType: String,
- path: String,
- props: Properties,
- create: Boolean,
- state: ZkMigrationLeadershipState):
Option[ZkMigrationLeadershipState] = {
+ def tryWriteEntityConfig(
+ entityType: String,
+ path: String,
+ props: Properties,
+ create: Boolean,
+ state: ZkMigrationLeadershipState
+ ): Option[ZkMigrationLeadershipState] = wrapZkException {
val configData = ConfigEntityZNode.encode(props)
val requests = if (create) {
@@ -375,8 +423,7 @@ class ZkMigrationClient(
entity: util.Map[String, String],
quotas: util.Map[String, java.lang.Double],
state: ZkMigrationLeadershipState
- ): ZkMigrationLeadershipState = {
-
+ ): ZkMigrationLeadershipState = wrapZkException {
val entityMap = entity.asScala
val hasUser = entityMap.contains(ClientQuotaEntity.USER)
val hasClient = entityMap.contains(ClientQuotaEntity.CLIENT_ID)
@@ -418,13 +465,16 @@ class ZkMigrationClient(
tryWriteEntityConfig(configType.get, path.get, props, create=true,
state) match {
case Some(newStateSecondTry) => newStateSecondTry
- case None => throw new RuntimeException(
+ case None => throw new MigrationClientException(
s"Could not write client quotas for $entity on second attempt when
using Create instead of SetData")
}
}
}
- override def writeProducerId(nextProducerId: Long, state:
ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
+ override def writeProducerId(
+ nextProducerId: Long,
+ state: ZkMigrationLeadershipState
+ ): ZkMigrationLeadershipState = wrapZkException {
val newProducerIdBlockData =
ProducerIdBlockZNode.generateProducerIdBlockJson(
new ProducerIdsBlock(-1, nextProducerId,
ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE))
@@ -433,9 +483,11 @@ class ZkMigrationClient(
state.withMigrationZkVersion(migrationZkVersion)
}
- override def writeConfigs(resource: ConfigResource,
- configs: util.Map[String, String],
- state: ZkMigrationLeadershipState):
ZkMigrationLeadershipState = {
+ override def writeConfigs(
+ resource: ConfigResource,
+ configs: util.Map[String, String],
+ state: ZkMigrationLeadershipState
+ ): ZkMigrationLeadershipState = wrapZkException {
val configType = resource.`type`() match {
case ConfigResource.Type.BROKER => Some(ConfigType.Broker)
case ConfigResource.Type.TOPIC => Some(ConfigType.Topic)
@@ -456,7 +508,7 @@ class ZkMigrationClient(
tryWriteEntityConfig(configType.get, configName, props, create=true,
state) match {
case Some(newStateSecondTry) => newStateSecondTry
- case None => throw new RuntimeException(
+ case None => throw new MigrationClientException(
s"Could not write ${configType.get} configs on second attempt
when using Create instead of SetData.")
}
}
@@ -465,10 +517,4 @@ class ZkMigrationClient(
state
}
}
-
- override def writeMetadataDeltaToZookeeper(delta: MetadataDelta,
- image: MetadataImage,
- state:
ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
- state
- }
}
diff --git
a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
index e1c9ffbcdd8..52c7fafa8e6 100644
--- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
@@ -34,6 +34,7 @@ import
org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion,
ProducerIdsBlock}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse,
assertNotNull, assertTrue}
+import org.junit.jupiter.api.Timeout
import org.junit.jupiter.api.extension.ExtendWith
import org.slf4j.LoggerFactory
@@ -43,7 +44,9 @@ import java.util.concurrent.TimeUnit
import scala.collection.Seq
import scala.jdk.CollectionConverters._
+
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@Timeout(120)
class ZkMigrationIntegrationTest {
val log = LoggerFactory.getLogger(classOf[ZkMigrationIntegrationTest])
@@ -85,7 +88,7 @@ class ZkMigrationIntegrationTest {
quotas.add(new ClientQuotaAlteration(
new ClientQuotaEntity(Map("ip" -> "8.8.8.8").asJava),
List(new ClientQuotaAlteration.Op("connection_creation_rate",
10.0)).asJava))
- admin.alterClientQuotas(quotas)
+ admin.alterClientQuotas(quotas).all().get(60, TimeUnit.SECONDS)
val zkClient =
clusterInstance.asInstanceOf[ZkClusterInstance].getUnderlying().zkClient
val kafkaConfig =
clusterInstance.asInstanceOf[ZkClusterInstance].getUnderlying.servers.head.config
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
index b29edc25807..c6b1c0fc268 100644
---
a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
+++
b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
@@ -34,7 +34,6 @@ import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Collections;
@@ -90,10 +89,11 @@ public class KRaftMigrationDriver implements
MetadataPublisher {
this.zkMigrationClient = zkMigrationClient;
this.propagator = propagator;
this.time = Time.SYSTEM;
- this.log = LoggerFactory.getLogger(KRaftMigrationDriver.class);
+ LogContext logContext = new
LogContext(String.format("[KRaftMigrationDriver nodeId=%d] ", nodeId));
+ this.log = logContext.logger(KRaftMigrationDriver.class);
this.migrationState = MigrationDriverState.UNINITIALIZED;
this.migrationLeadershipState = ZkMigrationLeadershipState.EMPTY;
- this.eventQueue = new KafkaEventQueue(Time.SYSTEM, new
LogContext("KRaftMigrationDriver"), "kraft-migration");
+ this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext,
"kraft-migration");
this.image = MetadataImage.EMPTY;
this.leaderAndEpoch = LeaderAndEpoch.UNKNOWN;
this.initialZkLoadHandler = initialZkLoadHandler;
@@ -132,26 +132,45 @@ public class KRaftMigrationDriver implements
MetadataPublisher {
return true;
}
+ private boolean imageDoesNotContainAllBrokers(MetadataImage image,
Set<Integer> brokerIds) {
+ for (BrokerRegistration broker : image.cluster().brokers().values()) {
+ if (broker.isMigratingZkBroker()) {
+ brokerIds.remove(broker.id());
+ }
+ }
+ return !brokerIds.isEmpty();
+ }
+
private boolean areZkBrokersReadyForMigration() {
if (image == MetadataImage.EMPTY) {
// TODO maybe add WAIT_FOR_INITIAL_METADATA_PUBLISH state to avoid
this kind of check?
log.info("Waiting for initial metadata publish before checking if
Zk brokers are registered.");
return false;
}
- Set<Integer> zkRegisteredZkBrokers =
zkMigrationClient.readBrokerIdsFromTopicAssignments();
- for (BrokerRegistration broker : image.cluster().brokers().values()) {
- if (broker.isMigratingZkBroker()) {
- zkRegisteredZkBrokers.remove(broker.id());
- }
+
+ // First check the brokers registered in ZK
+ Set<Integer> zkBrokerRegistrations = zkMigrationClient.readBrokerIds();
+ if (imageDoesNotContainAllBrokers(image, zkBrokerRegistrations)) {
+ log.info("Still waiting for ZK brokers {} to register with
KRaft.", zkBrokerRegistrations);
+ return false;
}
- if (zkRegisteredZkBrokers.isEmpty()) {
- return true;
- } else {
- log.info("Still waiting for ZK brokers {} to register with
KRaft.", zkRegisteredZkBrokers);
+
+ // Once all of those are found, check the topic assignments. This is
much more expensive than listing /brokers
+ Set<Integer> zkBrokersWithAssignments =
zkMigrationClient.readBrokerIdsFromTopicAssignments();
+ if (imageDoesNotContainAllBrokers(image, zkBrokersWithAssignments)) {
+ log.info("Still waiting for ZK brokers {} to register with
KRaft.", zkBrokersWithAssignments);
return false;
}
+
+ return true;
}
+ /**
+ * Apply a function which transforms our internal migration state.
+ *
+ * @param name A descriptive name of the function that is being applied
+ * @param stateMutator A function which performs some migration
operations and possibly transforms our internal state
+ */
private void apply(String name, Function<ZkMigrationLeadershipState,
ZkMigrationLeadershipState> stateMutator) {
ZkMigrationLeadershipState beforeState = this.migrationLeadershipState;
ZkMigrationLeadershipState afterState =
stateMutator.apply(beforeState);
@@ -269,15 +288,24 @@ public class KRaftMigrationDriver implements
MetadataPublisher {
// Events handled by Migration Driver.
abstract class MigrationEvent implements EventQueue.Event {
+ @SuppressWarnings("ThrowableNotThrown")
@Override
public void handleException(Throwable e) {
- if (e instanceof RejectedExecutionException) {
- log.info("Not processing {} because the event queue is
closed.", this);
+ if (e instanceof MigrationClientAuthException) {
+
KRaftMigrationDriver.this.faultHandler.handleFault("Encountered ZooKeeper
authentication in " + this, e);
+ } else if (e instanceof MigrationClientException) {
+ log.info(String.format("Encountered ZooKeeper error during
event %s. Will retry.", this), e.getCause());
+ } else if (e instanceof RejectedExecutionException) {
+ log.debug("Not processing {} because the event queue is
closed.", this);
} else {
- KRaftMigrationDriver.this.faultHandler.handleFault(
- "Unhandled error in " + this.getClass().getSimpleName(),
e);
+ KRaftMigrationDriver.this.faultHandler.handleFault("Unhandled
error in " + this, e);
}
}
+
+ @Override
+ public String toString() {
+ return this.getClass().getSimpleName();
+ }
}
class PollEvent extends MigrationEvent {
@@ -381,23 +409,17 @@ public class KRaftMigrationDriver implements
MetadataPublisher {
class BecomeZkControllerEvent extends MigrationEvent {
@Override
public void run() throws Exception {
- switch (migrationState) {
- case BECOME_CONTROLLER:
- // TODO: Handle unhappy path.
- apply("BecomeZkLeaderEvent",
zkMigrationClient::claimControllerLeadership);
- if (migrationLeadershipState.zkControllerEpochZkVersion()
== -1) {
- // We could not claim leadership, stay in
BECOME_CONTROLLER to retry
+ if (migrationState == MigrationDriverState.BECOME_CONTROLLER) {
+ apply("BecomeZkLeaderEvent",
zkMigrationClient::claimControllerLeadership);
+ if (migrationLeadershipState.zkControllerEpochZkVersion() ==
-1) {
+ log.debug("Unable to claim leadership, will retry until we
learn of a different KRaft leader");
+ } else {
+ if (!migrationLeadershipState.zkMigrationComplete()) {
+ transitionTo(MigrationDriverState.ZK_MIGRATION);
} else {
- if (!migrationLeadershipState.zkMigrationComplete()) {
- transitionTo(MigrationDriverState.ZK_MIGRATION);
- } else {
-
transitionTo(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM);
- }
+
transitionTo(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM);
}
- break;
- default:
- // Ignore the event as we're not trying to become
controller anymore.
- break;
+ }
}
}
}
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java
b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java
index 11284b399c1..b026897db4b 100644
---
a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java
+++
b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java
@@ -18,8 +18,6 @@ package org.apache.kafka.metadata.migration;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
-import org.apache.kafka.image.MetadataDelta;
-import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.server.common.ApiMessageAndVersion;
@@ -108,15 +106,4 @@ public interface MigrationClient {
Set<Integer> readBrokerIds();
Set<Integer> readBrokerIdsFromTopicAssignments();
-
- /**
- * Convert the Metadata delta to Zookeeper writes and persist the changes.
On successful
- * write, update the migration state with new metadata offset and epoch.
- * @param delta Changes in the cluster metadata
- * @param image New metadata after the changes in `delta` are applied
- * @param state Current migration state before writing to Zookeeper.
- */
- ZkMigrationLeadershipState writeMetadataDeltaToZookeeper(MetadataDelta
delta,
- MetadataImage
image,
-
ZkMigrationLeadershipState state);
}
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClientAuthException.java
b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClientAuthException.java
new file mode 100644
index 00000000000..a10b8ae3f49
--- /dev/null
+++
b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClientAuthException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata.migration;
+
+/**
+ * Wrapped for authentication exceptions in the migration client such as
ZooKeeper AuthFailedException
+ */
+public class MigrationClientAuthException extends MigrationClientException {
+ public MigrationClientAuthException(Throwable t) {
+ super(t);
+ }
+}
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClientException.java
b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClientException.java
new file mode 100644
index 00000000000..6c816423761
--- /dev/null
+++
b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClientException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata.migration;
+
+import org.apache.kafka.common.KafkaException;
+
+/**
+ * Unchecked exception that can be thrown by the migration client.
+ *
+ * Authentication related errors should use {@link
MigrationClientAuthException}.
+ */
+public class MigrationClientException extends KafkaException {
+ public MigrationClientException(String message, Throwable t) {
+ super(message, t);
+ }
+
+ public MigrationClientException(Throwable t) {
+ super(t);
+ }
+
+ public MigrationClientException(String message) {
+ super(message);
+ }
+}
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
index 01b749ebc9a..18645996c38 100644
---
a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
+++
b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
@@ -36,6 +36,8 @@ import org.apache.kafka.server.fault.MockFaultHandler;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.util.Arrays;
import java.util.HashMap;
@@ -45,11 +47,12 @@ import java.util.Map;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
public class KRaftMigrationDriverTest {
- class NoOpRecordConsumer implements ZkRecordConsumer {
+ static class NoOpRecordConsumer implements ZkRecordConsumer {
@Override
public void beginMigration() {
@@ -71,7 +74,7 @@ public class KRaftMigrationDriverTest {
}
}
- class CapturingMigrationClient implements MigrationClient {
+ static class CapturingMigrationClient implements MigrationClient {
private final Set<Integer> brokerIds;
public final Map<ConfigResource, Map<String, String>> capturedConfigs
= new HashMap<>();
@@ -162,18 +165,9 @@ public class KRaftMigrationDriverTest {
public Set<Integer> readBrokerIdsFromTopicAssignments() {
return brokerIds;
}
-
- @Override
- public ZkMigrationLeadershipState writeMetadataDeltaToZookeeper(
- MetadataDelta delta,
- MetadataImage image,
- ZkMigrationLeadershipState state
- ) {
- return state;
- }
}
- class CountingMetadataPropagator implements LegacyPropagator {
+ static class CountingMetadataPropagator implements LegacyPropagator {
public int deltas = 0;
public int images = 0;
@@ -316,4 +310,61 @@ public class KRaftMigrationDriverTest {
driver.close();
}
-}
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testMigrationWithClientException(boolean authException) throws
Exception {
+ CountingMetadataPropagator metadataPropagator = new
CountingMetadataPropagator();
+ CountDownLatch claimLeaderAttempts = new CountDownLatch(3);
+ CapturingMigrationClient migrationClient = new
CapturingMigrationClient(new HashSet<>(Arrays.asList(1, 2, 3))) {
+ @Override
+ public ZkMigrationLeadershipState
claimControllerLeadership(ZkMigrationLeadershipState state) {
+ if (claimLeaderAttempts.getCount() == 0) {
+ return super.claimControllerLeadership(state);
+ } else {
+ claimLeaderAttempts.countDown();
+ if (authException) {
+ throw new MigrationClientAuthException(new
RuntimeException("Some kind of ZK auth error!"));
+ } else {
+ throw new MigrationClientException("Some kind of ZK
error!");
+ }
+ }
+
+ }
+ };
+ MockFaultHandler faultHandler = new
MockFaultHandler("testMigrationClientExpiration");
+ try (KRaftMigrationDriver driver = new KRaftMigrationDriver(
+ 3000,
+ new NoOpRecordConsumer(),
+ migrationClient,
+ metadataPropagator,
+ metadataPublisher -> { },
+ faultHandler
+ )) {
+ MetadataImage image = MetadataImage.EMPTY;
+ MetadataDelta delta = new MetadataDelta(image);
+
+ driver.start();
+ delta.replay(zkBrokerRecord(1));
+ delta.replay(zkBrokerRecord(2));
+ delta.replay(zkBrokerRecord(3));
+ MetadataProvenance provenance = new MetadataProvenance(100, 1, 1);
+ image = delta.apply(provenance);
+
+ // Notify the driver that it is the leader
+ driver.onControllerChange(new LeaderAndEpoch(OptionalInt.of(3000),
1));
+ // Publish metadata of all the ZK brokers being ready
+ driver.onMetadataUpdate(delta, image, new
LogDeltaManifest(provenance,
+ new LeaderAndEpoch(OptionalInt.of(3000), 1), 1, 100, 42));
+ Assertions.assertTrue(claimLeaderAttempts.await(1,
TimeUnit.MINUTES));
+ TestUtils.waitForCondition(() -> driver.migrationState().get(1,
TimeUnit.MINUTES).equals(MigrationDriverState.ZK_MIGRATION),
+ "Waiting for KRaftMigrationDriver to enter ZK_MIGRATION
state");
+
+ if (authException) {
+ Assertions.assertEquals(MigrationClientAuthException.class,
faultHandler.firstException().getCause().getClass());
+ } else {
+ Assertions.assertNull(faultHandler.firstException());
+ }
+ }
+ }
+}
\ No newline at end of file