This is an automated email from the ASF dual-hosted git repository. davidarthur pushed a commit to branch 3.5 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit f1bb8d17b0cc2586677737d6499a7fbd6dd620ea Author: David Arthur <[email protected]> AuthorDate: Thu Jun 1 10:25:46 2023 -0400 KAFKA-15010 ZK migration failover support (#13758) This patch adds snapshot reconciliation during ZK to KRaft migration. This reconciliation happens whenever a snapshot is loaded by KRaft, or during a controller failover. Prior to this patch, it was possible to miss metadata updates coming from KRaft when dual-writing to ZK. Internally this adds a new state SYNC_KRAFT_TO_ZK to the KRaftMigrationDriver state machine. The controller passes through this state after the initial ZK migration and each time a controller becomes active. Logging during dual-write was enhanced to include a count of write operations happening. Reviewers: Colin P. McCabe <[email protected]> --- .../main/scala/kafka/zk/ZkMigrationClient.scala | 6 + .../kafka/zk/ZkMigrationIntegrationTest.scala | 17 ++- .../zk/migration/ZkAclMigrationClientTest.scala | 5 +- .../zk/migration/ZkConfigMigrationClientTest.scala | 7 +- .../kafka/zk/migration/ZkMigrationClientTest.scala | 8 +- .../metadata/migration/KRaftMigrationDriver.java | 110 ++++++++++++---- .../migration/KRaftMigrationOperationConsumer.java | 23 ++++ .../metadata/migration/KRaftMigrationZkWriter.java | 145 +++++++++++++++------ .../kafka/metadata/migration/MigrationClient.java | 4 + .../metadata/migration/MigrationDriverState.java | 13 +- .../migration/ZkMigrationLeadershipState.java | 4 +- .../java/org/apache/kafka/image/AclsImageTest.java | 4 +- .../apache/kafka/image/ClientQuotasImageTest.java | 4 +- .../kafka/image/ConfigurationsImageTest.java | 4 +- .../org/apache/kafka/image/FeaturesImageTest.java | 4 +- .../apache/kafka/image/ProducerIdsImageTest.java | 2 +- .../org/apache/kafka/image/ScramImageTest.java | 4 +- .../org/apache/kafka/image/TopicsImageTest.java | 2 +- .../migration/CapturingMigrationClient.java | 12 ++ .../migration/KRaftMigrationDriverTest.java | 78 +++++++++-- .../migration/KRaftMigrationZkWriterTest.java | 143 ++++++++++++++++++++ tests/kafkatest/services/zookeeper.py | 50 +++++++ .../tests/core/zookeeper_migration_test.py | 119 ++++++++++++++++- 23 files changed, 649 insertions(+), 119 deletions(-) diff --git a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala index e94f435d71b..26907587a4f 100644 --- a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala +++ b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala @@ -39,6 +39,7 @@ import org.apache.zookeeper.KeeperException.{AuthFailedException, NoAuthExceptio import java.{lang, util} import java.util.function.Consumer import scala.collection.Seq +import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ object ZkMigrationClient { @@ -303,6 +304,11 @@ class ZkMigrationClient( new util.HashSet[Integer](zkClient.getSortedBrokerList.map(Integer.valueOf).toSet.asJava) } + override def readProducerId(): util.Optional[ProducerIdsBlock] = { + val (dataOpt, _) = zkClient.getDataAndVersion(ProducerIdBlockZNode.path) + dataOpt.map(ProducerIdBlockZNode.parseProducerIdBlockData).asJava + } + override def writeProducerId( nextProducerId: Long, state: ZkMigrationLeadershipState diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala index 7aa88977bc8..8d240825e1f 100644 --- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala @@ -57,7 +57,7 @@ import scala.jdk.CollectionConverters._ @ExtendWith(value = Array(classOf[ClusterTestExtensions])) -@Timeout(120) +@Timeout(300) class ZkMigrationIntegrationTest { val log = LoggerFactory.getLogger(classOf[ZkMigrationIntegrationTest]) @@ -243,8 +243,7 @@ class ZkMigrationIntegrationTest { log.info("Verifying metadata changes with ZK") verifyUserScramCredentials(zkClient) } finally { - zkCluster.stop() - kraftCluster.close() + shutdownInSequence(zkCluster, kraftCluster) } } @@ -321,8 +320,7 @@ class ZkMigrationIntegrationTest { verifyProducerId(producerIdBlock, zkClient) } finally { - zkCluster.stop() - kraftCluster.close() + shutdownInSequence(zkCluster, kraftCluster) } } @@ -383,8 +381,7 @@ class ZkMigrationIntegrationTest { verifyUserScramCredentials(zkClient) verifyClientQuotas(zkClient) } finally { - zkCluster.stop() - kraftCluster.close() + shutdownInSequence(zkCluster, kraftCluster) } } @@ -481,4 +478,10 @@ class ZkMigrationIntegrationTest { assertTrue(firstProducerIdBlock.firstProducerId() < producerIdBlock.firstProducerId()) } } + + def shutdownInSequence(zkCluster: ClusterInstance, kraftCluster: KafkaClusterTestKit): Unit = { + zkCluster.brokerIds().forEach(zkCluster.shutdownBroker(_)) + kraftCluster.close() + zkCluster.stop() + } } diff --git a/core/src/test/scala/unit/kafka/zk/migration/ZkAclMigrationClientTest.scala b/core/src/test/scala/unit/kafka/zk/migration/ZkAclMigrationClientTest.scala index 77db73ed54e..7913d09c665 100644 --- a/core/src/test/scala/unit/kafka/zk/migration/ZkAclMigrationClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/migration/ZkAclMigrationClientTest.scala @@ -170,9 +170,8 @@ class ZkAclMigrationClientTest extends ZkMigrationTestHarness { val image = delta.apply(MetadataProvenance.EMPTY) // load snapshot to Zookeeper. - val kraftMigrationZkWriter = new KRaftMigrationZkWriter(migrationClient, - (_, operation) => { migrationState = operation.apply(migrationState) }) - kraftMigrationZkWriter.handleSnapshot(image) + val kraftMigrationZkWriter = new KRaftMigrationZkWriter(migrationClient) + kraftMigrationZkWriter.handleSnapshot(image, (_, _, operation) => { migrationState = operation.apply(migrationState) }) // Verify the new ACLs in Zookeeper. val resource1AclsInZk = zkClient.getVersionedAclsForResource(resource1).acls diff --git a/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala b/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala index f3152697e86..0b6ecedd032 100644 --- a/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala @@ -311,9 +311,10 @@ class ZkConfigMigrationClientTest extends ZkMigrationTestHarness { val image = delta.apply(MetadataProvenance.EMPTY) // load snapshot to Zookeeper. - val kraftMigrationZkWriter = new KRaftMigrationZkWriter(migrationClient, - (_, operation) => { migrationState = operation.apply(migrationState) }) - kraftMigrationZkWriter.handleLoadSnapshot(image) + val kraftMigrationZkWriter = new KRaftMigrationZkWriter(migrationClient) + kraftMigrationZkWriter.handleSnapshot(image, (_, _, operation) => { + migrationState = operation.apply(migrationState) + }) val user1Props = zkClient.getEntityConfigs(ConfigType.User, "user1") assertEquals(0, user1Props.size()) diff --git a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala index 22447ee0e17..54d56ddc7a8 100644 --- a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala @@ -264,9 +264,7 @@ class ZkMigrationClientTest extends ZkMigrationTestHarness { @Test def testTopicAndBrokerConfigsMigrationWithSnapshots(): Unit = { - val kraftWriter = new KRaftMigrationZkWriter(migrationClient, (_, operation) => { - migrationState = operation.apply(migrationState) - }) + val kraftWriter = new KRaftMigrationZkWriter(migrationClient) // Add add some topics and broker configs and create new image. val topicName = "testTopic" @@ -320,7 +318,9 @@ class ZkMigrationClientTest extends ZkMigrationTestHarness { val image = delta.apply(MetadataProvenance.EMPTY) // Handle migration using the generated snapshot. - kraftWriter.handleSnapshot(image) + kraftWriter.handleSnapshot(image, (_, _, operation) => { + migrationState = operation(migrationState) + }) // Verify topic state. val topicIdReplicaAssignment = diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java index 3706c8d3617..0b8aa0341f9 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java @@ -41,12 +41,15 @@ import org.slf4j.Logger; import java.util.Collection; import java.util.EnumSet; import java.util.HashSet; +import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -115,7 +118,7 @@ public class KRaftMigrationDriver implements MetadataPublisher { this.initialZkLoadHandler = initialZkLoadHandler; this.faultHandler = faultHandler; this.quorumFeatures = quorumFeatures; - this.zkMetadataWriter = new KRaftMigrationZkWriter(zkMigrationClient, this::applyMigrationOperation); + this.zkMetadataWriter = new KRaftMigrationZkWriter(zkMigrationClient); } public KRaftMigrationDriver( @@ -149,10 +152,9 @@ public class KRaftMigrationDriver implements MetadataPublisher { } private void recoverMigrationStateFromZK() { - log.info("Recovering migration state from ZK"); - applyMigrationOperation("Recovery", zkMigrationClient::getOrCreateMigrationRecoveryState); - String maybeDone = migrationLeadershipState.zkMigrationComplete() ? "done" : "not done"; - log.info("Recovered migration state {}. ZK migration is {}.", migrationLeadershipState, maybeDone); + applyMigrationOperation("Recovering migration state from ZK", zkMigrationClient::getOrCreateMigrationRecoveryState); + String maybeDone = migrationLeadershipState.initialZkMigrationComplete() ? "done" : "not done"; + log.info("Initial migration of ZK metadata is {}.", maybeDone); // Once we've recovered the migration state from ZK, install this class as a metadata publisher // by calling the initialZkLoadHandler. @@ -230,11 +232,11 @@ public class KRaftMigrationDriver implements MetadataPublisher { ZkMigrationLeadershipState beforeState = this.migrationLeadershipState; ZkMigrationLeadershipState afterState = migrationOp.apply(beforeState); if (afterState.loggableChangeSinceState(beforeState)) { - log.info("{} transitioned migration state from {} to {}", name, beforeState, afterState); + log.info("{}. Transitioned migration state from {} to {}", name, beforeState, afterState); } else if (afterState.equals(beforeState)) { - log.trace("{} kept migration state as {}", name, afterState); + log.trace("{}. Kept migration state as {}", name, afterState); } else { - log.trace("{} transitioned migration state from {} to {}", name, beforeState, afterState); + log.trace("{}. Transitioned migration state from {} to {}", name, beforeState, afterState); } this.migrationLeadershipState = afterState; @@ -267,8 +269,12 @@ public class KRaftMigrationDriver implements MetadataPublisher { return newState == MigrationDriverState.INACTIVE || newState == MigrationDriverState.ZK_MIGRATION || - newState == MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM; + newState == MigrationDriverState.SYNC_KRAFT_TO_ZK; case ZK_MIGRATION: + return + newState == MigrationDriverState.INACTIVE || + newState == MigrationDriverState.SYNC_KRAFT_TO_ZK; + case SYNC_KRAFT_TO_ZK: return newState == MigrationDriverState.INACTIVE || newState == MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM; @@ -392,6 +398,9 @@ public class KRaftMigrationDriver implements MetadataPublisher { case ZK_MIGRATION: eventQueue.append(new MigrateMetadataEvent()); break; + case SYNC_KRAFT_TO_ZK: + eventQueue.append(new SyncKRaftMetadataEvent()); + break; case KRAFT_CONTROLLER_TO_BROKER_COMM: eventQueue.append(new SendRPCsToBrokersEvent()); break; @@ -429,15 +438,18 @@ public class KRaftMigrationDriver implements MetadataPublisher { boolean isActive = leaderAndEpoch.isLeader(KRaftMigrationDriver.this.nodeId); if (!isActive) { - applyMigrationOperation("KRaftLeaderEvent is not active", state -> + applyMigrationOperation("Became inactive migration driver", state -> state.withNewKRaftController( leaderAndEpoch.leaderId().orElse(ZkMigrationLeadershipState.EMPTY.kraftControllerId()), leaderAndEpoch.epoch()) ); transitionTo(MigrationDriverState.INACTIVE); } else { - // Apply the new KRaft state - applyMigrationOperation("KRaftLeaderEvent is active", state -> state.withNewKRaftController(nodeId, leaderAndEpoch.epoch())); + // Load the existing migration state and apply the new KRaft state + applyMigrationOperation("Became active migration driver", state -> { + ZkMigrationLeadershipState recoveredState = zkMigrationClient.getOrCreateMigrationRecoveryState(state); + return recoveredState.withNewKRaftController(nodeId, leaderAndEpoch.epoch()); + }); // Before becoming the controller fo ZkBrokers, we need to make sure the // Controller Quorum can handle migration. @@ -473,7 +485,7 @@ public class KRaftMigrationDriver implements MetadataPublisher { } break; case MIGRATION: - if (!migrationLeadershipState.zkMigrationComplete()) { + if (!migrationLeadershipState.initialZkMigrationComplete()) { log.error("KRaft controller indicates an active migration, but the ZK state does not."); transitionTo(MigrationDriverState.INACTIVE); } else { @@ -495,14 +507,14 @@ public class KRaftMigrationDriver implements MetadataPublisher { @Override public void run() throws Exception { if (migrationState == MigrationDriverState.BECOME_CONTROLLER) { - applyMigrationOperation("BecomeZkLeaderEvent", zkMigrationClient::claimControllerLeadership); + applyMigrationOperation("Claiming ZK controller leadership", zkMigrationClient::claimControllerLeadership); if (migrationLeadershipState.zkControllerEpochZkVersion() == -1) { log.debug("Unable to claim leadership, will retry until we learn of a different KRaft leader"); } else { - if (!migrationLeadershipState.zkMigrationComplete()) { + if (!migrationLeadershipState.initialZkMigrationComplete()) { transitionTo(MigrationDriverState.ZK_MIGRATION); } else { - transitionTo(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM); + transitionTo(MigrationDriverState.SYNC_KRAFT_TO_ZK); } } } @@ -542,7 +554,7 @@ public class KRaftMigrationDriver implements MetadataPublisher { log.info("Migrating {} records from ZK", batch.size()); } CompletableFuture<?> future = zkRecordConsumer.acceptBatch(batch); - FutureUtils.waitWithLogging(KRaftMigrationDriver.this.log, KRaftMigrationDriver.this.logContext.logPrefix(), + FutureUtils.waitWithLogging(KRaftMigrationDriver.this.log, "", "the metadata layer to commit migration record batch", future, Deadline.fromDelay(time, METADATA_COMMIT_MAX_WAIT_MS, TimeUnit.MILLISECONDS), time); count.addAndGet(batch.size()); @@ -552,7 +564,7 @@ public class KRaftMigrationDriver implements MetadataPublisher { }, brokersInMetadata::add); CompletableFuture<OffsetAndEpoch> completeMigrationFuture = zkRecordConsumer.completeMigration(); OffsetAndEpoch offsetAndEpochAfterMigration = FutureUtils.waitWithLogging( - KRaftMigrationDriver.this.log, KRaftMigrationDriver.this.logContext.logPrefix(), + KRaftMigrationDriver.this.log, "", "the metadata layer to complete the migration", completeMigrationFuture, Deadline.fromDelay(time, METADATA_COMMIT_MAX_WAIT_MS, TimeUnit.MILLISECONDS), time); log.info("Completed migration of metadata from Zookeeper to KRaft. A total of {} metadata records were " + @@ -566,8 +578,11 @@ public class KRaftMigrationDriver implements MetadataPublisher { ZkMigrationLeadershipState newState = migrationLeadershipState.withKRaftMetadataOffsetAndEpoch( offsetAndEpochAfterMigration.offset(), offsetAndEpochAfterMigration.epoch()); - applyMigrationOperation("Finished migrating ZK data", state -> zkMigrationClient.setMigrationRecoveryState(newState)); - transitionTo(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM); + applyMigrationOperation("Finished initial migration of ZK metadata to KRaft", state -> zkMigrationClient.setMigrationRecoveryState(newState)); + // Even though we just migrated everything, we still pass through the SYNC_KRAFT_TO_ZK state. This + // accomplishes two things: ensuring we have consistent metadata state between KRaft and ZK, and + // exercising the snapshot handling code in KRaftMigrationZkWriter. + transitionTo(MigrationDriverState.SYNC_KRAFT_TO_ZK); } catch (Throwable t) { zkRecordConsumer.abortMigration(); super.handleException(t); @@ -575,6 +590,37 @@ public class KRaftMigrationDriver implements MetadataPublisher { } } + static KRaftMigrationOperationConsumer countingOperationConsumer( + Map<String, Integer> dualWriteCounts, + BiConsumer<String, KRaftMigrationOperation> operationConsumer + ) { + return (opType, logMsg, operation) -> { + dualWriteCounts.compute(opType, (key, value) -> { + if (value == null) { + return 1; + } else { + return value + 1; + } + }); + operationConsumer.accept(logMsg, operation); + }; + } + + + class SyncKRaftMetadataEvent extends MigrationEvent { + @Override + public void run() throws Exception { + if (migrationState == MigrationDriverState.SYNC_KRAFT_TO_ZK) { + log.info("Performing a full metadata sync from KRaft to ZK."); + Map<String, Integer> dualWriteCounts = new TreeMap<>(); + zkMetadataWriter.handleSnapshot(image, countingOperationConsumer( + dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation)); + log.info("Made the following ZK writes when reconciling with KRaft state: {}", dualWriteCounts); + transitionTo(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM); + } + } + } + class SendRPCsToBrokersEvent extends MigrationEvent { @Override @@ -623,7 +669,7 @@ public class KRaftMigrationDriver implements MetadataPublisher { KRaftMigrationDriver.this.image = image; String metadataType = isSnapshot ? "snapshot" : "delta"; - if (migrationState != MigrationDriverState.DUAL_WRITE) { + if (!migrationState.allowDualWrite()) { log.trace("Received metadata {}, but the controller is not in dual-write " + "mode. Ignoring the change to be replicated to Zookeeper", metadataType); completionHandler.accept(null); @@ -633,20 +679,34 @@ public class KRaftMigrationDriver implements MetadataPublisher { if (image.highestOffsetAndEpoch().compareTo(migrationLeadershipState.offsetAndEpoch()) < 0) { log.info("Ignoring {} {} which contains metadata that has already been written to ZK.", metadataType, provenance); completionHandler.accept(null); + return; } + Map<String, Integer> dualWriteCounts = new TreeMap<>(); if (isSnapshot) { - zkMetadataWriter.handleSnapshot(image); + zkMetadataWriter.handleSnapshot(image, countingOperationConsumer( + dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation)); } else { - zkMetadataWriter.handleDelta(prevImage, image, delta); + zkMetadataWriter.handleDelta(prevImage, image, delta, countingOperationConsumer( + dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation)); } + if (dualWriteCounts.isEmpty()) { + log.trace("Did not make any ZK writes when handling KRaft {}", isSnapshot ? "snapshot" : "delta"); + } else { + log.debug("Made the following ZK writes when handling KRaft {}: {}", isSnapshot ? "snapshot" : "delta", dualWriteCounts); + } + + // Persist the offset of the metadata that was written to ZK + ZkMigrationLeadershipState zkStateAfterDualWrite = migrationLeadershipState.withKRaftMetadataOffsetAndEpoch( + image.highestOffsetAndEpoch().offset(), image.highestOffsetAndEpoch().epoch()); + applyMigrationOperation("Updating ZK migration state after " + metadataType, + state -> zkMigrationClient.setMigrationRecoveryState(zkStateAfterDualWrite)); // TODO: Unhappy path: Probably relinquish leadership and let new controller // retry the write? if (delta.topicsDelta() != null || delta.clusterDelta() != null) { log.trace("Sending RPCs to brokers for metadata {}.", metadataType); - propagator.sendRPCsToBrokersFromMetadataDelta(delta, image, - migrationLeadershipState.zkControllerEpoch()); + propagator.sendRPCsToBrokersFromMetadataDelta(delta, image, migrationLeadershipState.zkControllerEpoch()); } else { log.trace("Not sending RPCs to brokers for metadata {} since no relevant metadata has changed", metadataType); } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationOperationConsumer.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationOperationConsumer.java new file mode 100644 index 00000000000..c7f322b6ea8 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationOperationConsumer.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.migration; + +@FunctionalInterface +public interface KRaftMigrationOperationConsumer { + void accept(String opType, String logMsg, KRaftMigrationOperation operation); +} diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java index 01dc782f9dc..3aadb77a71e 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java @@ -35,6 +35,8 @@ import org.apache.kafka.image.ConfigurationsDelta; import org.apache.kafka.image.ConfigurationsImage; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.ProducerIdsDelta; +import org.apache.kafka.image.ProducerIdsImage; import org.apache.kafka.image.ScramImage; import org.apache.kafka.image.TopicImage; import org.apache.kafka.image.TopicsDelta; @@ -42,6 +44,7 @@ import org.apache.kafka.image.TopicsImage; import org.apache.kafka.metadata.PartitionRegistration; import org.apache.kafka.metadata.ScramCredentialData; import org.apache.kafka.metadata.authorizer.StandardAcl; +import org.apache.kafka.server.common.ProducerIdsBlock; import java.util.ArrayList; import java.util.Collections; @@ -58,42 +61,56 @@ import java.util.function.Function; import java.util.stream.Collectors; public class KRaftMigrationZkWriter { + + private static final String UPDATE_PRODUCER_ID = "UpdateProducerId"; + private static final String CREATE_TOPIC = "CreateTopic"; + private static final String DELETE_TOPIC = "DeleteTopic"; + private static final String UPDATE_PARTITON = "UpdatePartition"; + private static final String UPDATE_BROKER_CONFIG = "UpdateBrokerConfig"; + private static final String DELETE_BROKER_CONFIG = "DeleteBrokerConfig"; + private static final String UPDATE_TOPIC_CONFIG = "UpdateTopicConfig"; + private static final String DELETE_TOPIC_CONFIG = "DeleteTopicConfig"; + private static final String UPDATE_CLIENT_QUOTA = "UpdateClientQuota"; + private static final String UPDATE_ACL = "UpdateAcl"; + private static final String DELETE_ACL = "DeleteAcl"; + + private final MigrationClient migrationClient; - private final BiConsumer<String, KRaftMigrationOperation> operationConsumer; public KRaftMigrationZkWriter( - MigrationClient migrationClient, - BiConsumer<String, KRaftMigrationOperation> operationConsumer + MigrationClient migrationClient ) { this.migrationClient = migrationClient; - this.operationConsumer = operationConsumer; } - public void handleSnapshot(MetadataImage image) { - handleTopicsSnapshot(image.topics()); - handleConfigsSnapshot(image.configs()); - handleClientQuotasSnapshot(image.clientQuotas(), image.scram()); - operationConsumer.accept("Setting next producer ID", migrationState -> - migrationClient.writeProducerId(image.producerIds().nextProducerId(), migrationState)); - handleAclsSnapshot(image.acls()); + public void handleSnapshot(MetadataImage image, KRaftMigrationOperationConsumer operationConsumer) { + handleTopicsSnapshot(image.topics(), operationConsumer); + handleConfigsSnapshot(image.configs(), operationConsumer); + handleClientQuotasSnapshot(image.clientQuotas(), image.scram(), operationConsumer); + handleProducerIdSnapshot(image.producerIds(), operationConsumer); + handleAclsSnapshot(image.acls(), operationConsumer); } - public void handleDelta(MetadataImage previousImage, MetadataImage image, MetadataDelta delta) { + public void handleDelta( + MetadataImage previousImage, + MetadataImage image, + MetadataDelta delta, + KRaftMigrationOperationConsumer operationConsumer + ) { if (delta.topicsDelta() != null) { - handleTopicsDelta(previousImage.topics().topicIdToNameView()::get, delta.topicsDelta()); + handleTopicsDelta(previousImage.topics().topicIdToNameView()::get, delta.topicsDelta(), operationConsumer); } if (delta.configsDelta() != null) { - handleConfigsDelta(image.configs(), delta.configsDelta()); + handleConfigsDelta(image.configs(), delta.configsDelta(), operationConsumer); } if ((delta.clientQuotasDelta() != null) || (delta.scramDelta() != null)) { - handleClientQuotasDelta(image, delta); + handleClientQuotasDelta(image, delta, operationConsumer); } if (delta.producerIdsDelta() != null) { - operationConsumer.accept("Updating next producer ID", migrationState -> - migrationClient.writeProducerId(delta.producerIdsDelta().nextProducerId(), migrationState)); + handleProducerIdDelta(delta.producerIdsDelta(), operationConsumer); } if (delta.aclsDelta() != null) { - handleAclsDelta(image.acls(), delta.aclsDelta()); + handleAclsDelta(image.acls(), delta.aclsDelta(), operationConsumer); } } @@ -102,7 +119,7 @@ public class KRaftMigrationZkWriter { * in ZooKeeper to determine what has changed. Topic configs are not handled here since they exist in the * ConfigurationsImage. */ - void handleTopicsSnapshot(TopicsImage topicsImage) { + void handleTopicsSnapshot(TopicsImage topicsImage, KRaftMigrationOperationConsumer operationConsumer) { Map<Uuid, String> deletedTopics = new HashMap<>(); Set<Uuid> createdTopics = new HashSet<>(topicsImage.topicsById().keySet()); Map<Uuid, Map<Integer, PartitionRegistration>> changedPartitions = new HashMap<>(); @@ -142,6 +159,7 @@ public class KRaftMigrationZkWriter { createdTopics.forEach(topicId -> { TopicImage topic = topicsImage.getTopic(topicId); operationConsumer.accept( + CREATE_TOPIC, "Create Topic " + topic.name() + ", ID " + topicId, migrationState -> migrationClient.topicClient().createTopic(topic.name(), topicId, topic.partitions(), migrationState) ); @@ -149,11 +167,13 @@ public class KRaftMigrationZkWriter { deletedTopics.forEach((topicId, topicName) -> { operationConsumer.accept( + DELETE_TOPIC, "Delete Topic " + topicName + ", ID " + topicId, migrationState -> migrationClient.topicClient().deleteTopic(topicName, migrationState) ); ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName); operationConsumer.accept( + UPDATE_TOPIC_CONFIG, "Updating Configs for Topic " + topicName + ", ID " + topicId, migrationState -> migrationClient.configClient().deleteConfigs(resource, migrationState) ); @@ -162,6 +182,7 @@ public class KRaftMigrationZkWriter { changedPartitions.forEach((topicId, paritionMap) -> { TopicImage topic = topicsImage.getTopic(topicId); operationConsumer.accept( + UPDATE_PARTITON, "Updating Partitions for Topic " + topic.name() + ", ID " + topicId, migrationState -> migrationClient.topicClient().updateTopicPartitions( Collections.singletonMap(topic.name(), paritionMap), @@ -169,16 +190,21 @@ public class KRaftMigrationZkWriter { }); } - void handleTopicsDelta(Function<Uuid, String> deletedTopicNameResolver, TopicsDelta topicsDelta) { + void handleTopicsDelta( + Function<Uuid, String> deletedTopicNameResolver, + TopicsDelta topicsDelta, + KRaftMigrationOperationConsumer operationConsumer + ) { topicsDelta.deletedTopicIds().forEach(topicId -> { String name = deletedTopicNameResolver.apply(topicId); - operationConsumer.accept("Deleting topic " + name + ", ID " + topicId, + operationConsumer.accept(DELETE_TOPIC, "Deleting topic " + name + ", ID " + topicId, migrationState -> migrationClient.topicClient().deleteTopic(name, migrationState)); }); topicsDelta.changedTopics().forEach((topicId, topicDelta) -> { if (topicsDelta.createdTopicIds().contains(topicId)) { operationConsumer.accept( + CREATE_TOPIC, "Create Topic " + topicDelta.name() + ", ID " + topicId, migrationState -> migrationClient.topicClient().createTopic( topicDelta.name(), @@ -187,6 +213,7 @@ public class KRaftMigrationZkWriter { migrationState)); } else { operationConsumer.accept( + UPDATE_PARTITON, "Updating Partitions for Topic " + topicDelta.name() + ", ID " + topicId, migrationState -> migrationClient.topicClient().updateTopicPartitions( Collections.singletonMap(topicDelta.name(), topicDelta.partitionChanges()), @@ -195,7 +222,15 @@ public class KRaftMigrationZkWriter { }); } - void handleConfigsSnapshot(ConfigurationsImage configsImage) { + private String brokerOrTopicOpType(ConfigResource resource, String brokerOp, String topicOp) { + if (resource.type().equals(ConfigResource.Type.BROKER)) { + return brokerOp; + } else { + return topicOp; + } + } + + void handleConfigsSnapshot(ConfigurationsImage configsImage, KRaftMigrationOperationConsumer operationConsumer) { Set<ConfigResource> newResources = new HashSet<>(); configsImage.resourceData().keySet().forEach(resource -> { if (EnumSet.of(ConfigResource.Type.BROKER, ConfigResource.Type.TOPIC).contains(resource.type())) { @@ -225,7 +260,8 @@ public class KRaftMigrationZkWriter { newResources.forEach(resource -> { Map<String, String> props = configsImage.configMapForResource(resource); if (!props.isEmpty()) { - operationConsumer.accept("Create configs for " + resource.type().name() + " " + resource.name(), + String opType = brokerOrTopicOpType(resource, UPDATE_BROKER_CONFIG, UPDATE_TOPIC_CONFIG); + operationConsumer.accept(opType, "Create configs for " + resource.type().name() + " " + resource.name(), migrationState -> migrationClient.configClient().writeConfigs(resource, props, migrationState)); } }); @@ -233,10 +269,12 @@ public class KRaftMigrationZkWriter { resourcesToUpdate.forEach(resource -> { Map<String, String> props = configsImage.configMapForResource(resource); if (props.isEmpty()) { - operationConsumer.accept("Delete configs for " + resource.type().name() + " " + resource.name(), + String opType = brokerOrTopicOpType(resource, DELETE_BROKER_CONFIG, DELETE_TOPIC_CONFIG); + operationConsumer.accept(opType, "Delete configs for " + resource.type().name() + " " + resource.name(), migrationState -> migrationClient.configClient().deleteConfigs(resource, migrationState)); } else { - operationConsumer.accept("Update configs for " + resource.type().name() + " " + resource.name(), + String opType = brokerOrTopicOpType(resource, UPDATE_BROKER_CONFIG, UPDATE_TOPIC_CONFIG); + operationConsumer.accept(opType, "Update configs for " + resource.type().name() + " " + resource.name(), migrationState -> migrationClient.configClient().writeConfigs(resource, props, migrationState)); } }); @@ -256,7 +294,7 @@ public class KRaftMigrationZkWriter { return userScramCredentialStrings; } - void handleClientQuotasSnapshot(ClientQuotasImage clientQuotasImage, ScramImage scramImage) { + void handleClientQuotasSnapshot(ClientQuotasImage clientQuotasImage, ScramImage scramImage, KRaftMigrationOperationConsumer opConsumer) { Set<ClientQuotaEntity> changedNonUserEntities = new HashSet<>(); Set<String> changedUsers = new HashSet<>(); @@ -308,8 +346,8 @@ public class KRaftMigrationZkWriter { }); changedNonUserEntities.forEach(entity -> { - Map<String, Double> quotaMap = clientQuotasImage.entities().get(entity).quotaMap(); - operationConsumer.accept("Update client quotas for " + entity, migrationState -> + Map<String, Double> quotaMap = clientQuotasImage.entities().getOrDefault(entity, ClientQuotaImage.EMPTY).quotaMap(); + opConsumer.accept(UPDATE_CLIENT_QUOTA, "Update client quotas for " + entity, migrationState -> migrationClient.configClient().writeClientQuotas(entity.entries(), quotaMap, Collections.emptyMap(), migrationState)); }); @@ -318,27 +356,43 @@ public class KRaftMigrationZkWriter { Map<String, Double> quotaMap = clientQuotasImage.entities(). getOrDefault(entity, ClientQuotaImage.EMPTY).quotaMap(); Map<String, String> scramMap = getScramCredentialStringsForUser(scramImage, userName); - operationConsumer.accept("Update scram credentials for " + userName, migrationState -> + opConsumer.accept(UPDATE_CLIENT_QUOTA, "Update client quotas for " + userName, migrationState -> migrationClient.configClient().writeClientQuotas(entity.entries(), quotaMap, scramMap, migrationState)); }); + } + void handleProducerIdSnapshot(ProducerIdsImage image, KRaftMigrationOperationConsumer operationConsumer) { + if (image.isEmpty()) { + // No producer IDs have been allocated, nothing to dual-write + return; + } + Optional<ProducerIdsBlock> zkProducerId = migrationClient.readProducerId(); + if (zkProducerId.isPresent()) { + if (zkProducerId.get().nextBlockFirstId() != image.nextProducerId()) { + operationConsumer.accept(UPDATE_PRODUCER_ID, "Setting next producer ID", migrationState -> + migrationClient.writeProducerId(image.nextProducerId(), migrationState)); + } + } else { + operationConsumer.accept(UPDATE_PRODUCER_ID, "Setting next producer ID", migrationState -> + migrationClient.writeProducerId(image.nextProducerId(), migrationState)); + } } - void handleConfigsDelta(ConfigurationsImage configsImage, ConfigurationsDelta configsDelta) { + void handleConfigsDelta(ConfigurationsImage configsImage, ConfigurationsDelta configsDelta, KRaftMigrationOperationConsumer operationConsumer) { Set<ConfigResource> updatedResources = configsDelta.changes().keySet(); updatedResources.forEach(configResource -> { Map<String, String> props = configsImage.configMapForResource(configResource); if (props.isEmpty()) { - operationConsumer.accept("Delete configs for " + configResource, migrationState -> + operationConsumer.accept("DeleteConfig", "Delete configs for " + configResource, migrationState -> migrationClient.configClient().deleteConfigs(configResource, migrationState)); } else { - operationConsumer.accept("Update configs for " + configResource, migrationState -> + operationConsumer.accept("UpdateConfig", "Update configs for " + configResource, migrationState -> migrationClient.configClient().writeConfigs(configResource, props, migrationState)); } }); } - void handleClientQuotasDelta(MetadataImage metadataImage, MetadataDelta metadataDelta) { + void handleClientQuotasDelta(MetadataImage metadataImage, MetadataDelta metadataDelta, KRaftMigrationOperationConsumer operationConsumer) { if ((metadataDelta.clientQuotasDelta() != null) || (metadataDelta.scramDelta() != null)) { // A list of users with scram or quota changes HashSet<String> users = new HashSet<>(); @@ -361,7 +415,7 @@ public class KRaftMigrationZkWriter { users.add(userName); } else { Map<String, Double> quotaMap = metadataImage.clientQuotas().entities().get(clientQuotaEntity).quotaMap(); - operationConsumer.accept("Updating client quota " + clientQuotaEntity, migrationState -> + operationConsumer.accept(UPDATE_CLIENT_QUOTA, "Updating client quota " + clientQuotaEntity, migrationState -> migrationClient.configClient().writeClientQuotas(clientQuotaEntity.entries(), quotaMap, Collections.emptyMap(), migrationState)); } }); @@ -373,22 +427,27 @@ public class KRaftMigrationZkWriter { ClientQuotaEntity clientQuotaEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, userName)); if ((metadataImage.clientQuotas() == null) || (metadataImage.clientQuotas().entities().get(clientQuotaEntity) == null)) { - operationConsumer.accept("Updating client quota " + clientQuotaEntity, migrationState -> + operationConsumer.accept(UPDATE_CLIENT_QUOTA, "Updating scram credentials for " + clientQuotaEntity, migrationState -> migrationClient.configClient().writeClientQuotas(clientQuotaEntity.entries(), Collections.emptyMap(), userScramMap, migrationState)); } else { Map<String, Double> quotaMap = metadataImage.clientQuotas().entities().get(clientQuotaEntity).quotaMap(); - operationConsumer.accept("Updating client quota " + clientQuotaEntity, migrationState -> + operationConsumer.accept(UPDATE_CLIENT_QUOTA, "Updating client quota for " + clientQuotaEntity, migrationState -> migrationClient.configClient().writeClientQuotas(clientQuotaEntity.entries(), quotaMap, userScramMap, migrationState)); } }); } } + void handleProducerIdDelta(ProducerIdsDelta delta, KRaftMigrationOperationConsumer operationConsumer) { + operationConsumer.accept(UPDATE_PRODUCER_ID, "Setting next producer ID", migrationState -> + migrationClient.writeProducerId(delta.nextProducerId(), migrationState)); + } + private ResourcePattern resourcePatternFromAcl(StandardAcl acl) { return new ResourcePattern(acl.resourceType(), acl.resourceName(), acl.patternType()); } - void handleAclsSnapshot(AclsImage image) { + void handleAclsSnapshot(AclsImage image, KRaftMigrationOperationConsumer operationConsumer) { // Need to compare contents of image with all ACLs in ZK and issue updates Map<ResourcePattern, Set<AccessControlEntry>> allAclsInSnapshot = new HashMap<>(); @@ -417,24 +476,24 @@ public class KRaftMigrationZkWriter { newResources.forEach(resourcePattern -> { Set<AccessControlEntry> accessControlEntries = allAclsInSnapshot.get(resourcePattern); String name = "Writing " + accessControlEntries.size() + " for resource " + resourcePattern; - operationConsumer.accept(name, migrationState -> + operationConsumer.accept(UPDATE_ACL, name, migrationState -> migrationClient.aclClient().writeResourceAcls(resourcePattern, accessControlEntries, migrationState)); }); resourcesToDelete.forEach(deletedResource -> { String name = "Deleting resource " + deletedResource + " which has no ACLs in snapshot"; - operationConsumer.accept(name, migrationState -> + operationConsumer.accept(DELETE_ACL, name, migrationState -> migrationClient.aclClient().deleteResource(deletedResource, migrationState)); }); changedResources.forEach((resourcePattern, accessControlEntries) -> { String name = "Writing " + accessControlEntries.size() + " for resource " + resourcePattern; - operationConsumer.accept(name, migrationState -> + operationConsumer.accept(UPDATE_ACL, name, migrationState -> migrationClient.aclClient().writeResourceAcls(resourcePattern, accessControlEntries, migrationState)); }); } - void handleAclsDelta(AclsImage image, AclsDelta delta) { + void handleAclsDelta(AclsImage image, AclsDelta delta, KRaftMigrationOperationConsumer operationConsumer) { // Compute the resource patterns that were changed Set<ResourcePattern> resourcesWithChangedAcls = delta.changes().values() .stream() @@ -463,13 +522,13 @@ public class KRaftMigrationZkWriter { resourcesWithDeletedAcls.forEach(deletedResource -> { String name = "Deleting resource " + deletedResource + " which has no more ACLs"; - operationConsumer.accept(name, migrationState -> + operationConsumer.accept(DELETE_ACL, name, migrationState -> migrationClient.aclClient().deleteResource(deletedResource, migrationState)); }); aclsToWrite.forEach((resourcePattern, accessControlEntries) -> { String name = "Writing " + accessControlEntries.size() + " for resource " + resourcePattern; - operationConsumer.accept(name, migrationState -> + operationConsumer.accept(UPDATE_ACL, name, migrationState -> migrationClient.aclClient().writeResourceAcls(resourcePattern, accessControlEntries, migrationState)); }); } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java index 286e261b2c6..9c9a6d9eee6 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java @@ -17,8 +17,10 @@ package org.apache.kafka.metadata.migration; import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.ProducerIdsBlock; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.function.Consumer; @@ -74,6 +76,8 @@ public interface MigrationClient { AclMigrationClient aclClient(); + Optional<ProducerIdsBlock> readProducerId(); + ZkMigrationLeadershipState writeProducerId( long nextProducerId, ZkMigrationLeadershipState state diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationDriverState.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationDriverState.java index fa871123ddd..2f354855640 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationDriverState.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationDriverState.java @@ -44,17 +44,18 @@ public enum MigrationDriverState { WAIT_FOR_CONTROLLER_QUORUM(false), // Ensure all the quorum nodes are ready for migration. WAIT_FOR_BROKERS(false), // Wait for Zk brokers to be ready for migration. BECOME_CONTROLLER(false), // Become controller for the Zk Brokers. - ZK_MIGRATION(true), // The cluster has satisfied the migration criteria + ZK_MIGRATION(false), // The cluster has satisfied the migration criteria + SYNC_KRAFT_TO_ZK(false), // A full sync of metadata from KRaft to ZK. KRAFT_CONTROLLER_TO_BROKER_COMM(true), // First communication from Controller to send full RPCs to the Zk brokers. DUAL_WRITE(true); // The data has been migrated - private final boolean isActiveController; + private final boolean allowDualWrite; - MigrationDriverState(boolean isActiveController) { - this.isActiveController = isActiveController; + MigrationDriverState(boolean allowDualWrite) { + this.allowDualWrite = allowDualWrite; } - boolean isActiveController() { - return isActiveController; + boolean allowDualWrite() { + return allowDualWrite; } } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationLeadershipState.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationLeadershipState.java index 4a02235ce25..ad3af89acd2 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationLeadershipState.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationLeadershipState.java @@ -130,7 +130,7 @@ public class ZkMigrationLeadershipState { return zkControllerEpochZkVersion; } - public boolean zkMigrationComplete() { + public boolean initialZkMigrationComplete() { return kraftMetadataOffset > 0; } @@ -149,7 +149,7 @@ public class ZkMigrationLeadershipState { return this.kraftControllerId != other.kraftControllerId || this.kraftControllerEpoch != other.kraftControllerEpoch || - (!other.zkMigrationComplete() && this.zkMigrationComplete()); + (!other.initialZkMigrationComplete() && this.initialZkMigrationComplete()); } } diff --git a/metadata/src/test/java/org/apache/kafka/image/AclsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/AclsImageTest.java index 236a82f2b09..01910ab28b3 100644 --- a/metadata/src/test/java/org/apache/kafka/image/AclsImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/AclsImageTest.java @@ -39,9 +39,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals; @Timeout(value = 40) public class AclsImageTest { - final static AclsImage IMAGE1; + public final static AclsImage IMAGE1; - final static List<ApiMessageAndVersion> DELTA1_RECORDS; + public final static List<ApiMessageAndVersion> DELTA1_RECORDS; final static AclsDelta DELTA1; diff --git a/metadata/src/test/java/org/apache/kafka/image/ClientQuotasImageTest.java b/metadata/src/test/java/org/apache/kafka/image/ClientQuotasImageTest.java index bdb2c9bedff..1f656baa2e6 100644 --- a/metadata/src/test/java/org/apache/kafka/image/ClientQuotasImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/ClientQuotasImageTest.java @@ -40,9 +40,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals; @Timeout(value = 40) public class ClientQuotasImageTest { - final static ClientQuotasImage IMAGE1; + public final static ClientQuotasImage IMAGE1; - final static List<ApiMessageAndVersion> DELTA1_RECORDS; + public final static List<ApiMessageAndVersion> DELTA1_RECORDS; final static ClientQuotasDelta DELTA1; diff --git a/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java index 0f9a761972b..429fd8d9aa2 100644 --- a/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java @@ -38,9 +38,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals; @Timeout(value = 40) public class ConfigurationsImageTest { - final static ConfigurationsImage IMAGE1; + public final static ConfigurationsImage IMAGE1; - final static List<ApiMessageAndVersion> DELTA1_RECORDS; + public final static List<ApiMessageAndVersion> DELTA1_RECORDS; final static ConfigurationsDelta DELTA1; diff --git a/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java b/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java index 155973b4a4f..a510bf1d855 100644 --- a/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java @@ -40,8 +40,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue; @Timeout(value = 40) public class FeaturesImageTest { - final static FeaturesImage IMAGE1; - final static List<ApiMessageAndVersion> DELTA1_RECORDS; + public final static FeaturesImage IMAGE1; + public final static List<ApiMessageAndVersion> DELTA1_RECORDS; final static FeaturesDelta DELTA1; final static FeaturesImage IMAGE2; diff --git a/metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java index 126c2df2a6c..69695473d23 100644 --- a/metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java @@ -33,7 +33,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; @Timeout(value = 40) public class ProducerIdsImageTest { - final static ProducerIdsImage IMAGE1; + public final static ProducerIdsImage IMAGE1; final static List<ApiMessageAndVersion> DELTA1_RECORDS; diff --git a/metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java b/metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java index 9fd7f2bb8b7..3400be47b38 100644 --- a/metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java @@ -44,9 +44,9 @@ import static org.junit.jupiter.api.Assertions.fail; @Timeout(value = 40) public class ScramImageTest { - final static ScramImage IMAGE1; + public final static ScramImage IMAGE1; - final static List<ApiMessageAndVersion> DELTA1_RECORDS; + public final static List<ApiMessageAndVersion> DELTA1_RECORDS; final static ScramDelta DELTA1; diff --git a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java index 42d831130a0..09170d68d7e 100644 --- a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java @@ -90,7 +90,7 @@ public class TopicsImageTest { return map; } - private static final Uuid FOO_UUID = Uuid.fromString("ThIaNwRnSM2Nt9Mx1v0RvA"); + public static final Uuid FOO_UUID = Uuid.fromString("ThIaNwRnSM2Nt9Mx1v0RvA"); private static final Uuid BAR_UUID = Uuid.fromString("f62ptyETTjet8SL5ZeREiw"); diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingMigrationClient.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingMigrationClient.java index 8d4b70dc549..b96c17631e2 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingMigrationClient.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingMigrationClient.java @@ -18,9 +18,11 @@ package org.apache.kafka.metadata.migration; import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.ProducerIdsBlock; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -53,6 +55,11 @@ class CapturingMigrationClient implements MigrationClient { return this; } + public Builder setAclMigrationClient(AclMigrationClient aclMigrationClient) { + this.aclMigrationClient = aclMigrationClient; + return this; + } + public CapturingMigrationClient build() { return new CapturingMigrationClient( brokersInZk, @@ -124,6 +131,11 @@ class CapturingMigrationClient implements MigrationClient { return aclMigrationClient; } + @Override + public Optional<ProducerIdsBlock> readProducerId() { + return Optional.empty(); + } + @Override public ZkMigrationLeadershipState writeProducerId( long nextProducerId, diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java index afbc952c288..3b13e065668 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java @@ -69,14 +69,13 @@ import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.apache.kafka.image.TopicsImageTest.DELTA1_RECORDS; import static org.apache.kafka.image.TopicsImageTest.IMAGE1; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.NANOSECONDS; - public class KRaftMigrationDriverTest { List<Node> controllerNodes = Arrays.asList( new Node(4, "host4", 0), @@ -274,7 +273,8 @@ public class KRaftMigrationDriverTest { public void testMigrationWithClientException(boolean authException) throws Exception { CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator(); CountDownLatch claimLeaderAttempts = new CountDownLatch(3); - CapturingMigrationClient migrationClient = new CapturingMigrationClient(new HashSet<>(Arrays.asList(1, 2, 3)), new CapturingTopicMigrationClient(), null, null) { + CapturingMigrationClient migrationClient = new CapturingMigrationClient(new HashSet<>(Arrays.asList(1, 2, 3)), + new CapturingTopicMigrationClient(), new CapturingConfigMigrationClient(), new CapturingAclMigrationClient()) { @Override public ZkMigrationLeadershipState claimControllerLeadership(ZkMigrationLeadershipState state) { if (claimLeaderAttempts.getCount() == 0) { @@ -367,7 +367,7 @@ public class KRaftMigrationDriverTest { // Current apiVersions of node 6 has no zkMigrationReady set, should still stay at WAIT_FOR_CONTROLLER_QUORUM state apiVersions.update("6", NodeApiVersions.create()); - driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM); + assertEquals(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM, driver.migrationState().get(1, TimeUnit.MINUTES)); // all controller nodes are zkMigrationReady, should be able to move to next state apiVersions.update("6", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true)); @@ -379,7 +379,8 @@ public class KRaftMigrationDriverTest { @Test public void testSkipWaitForBrokersInDualWrite() throws Exception { CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator(); - CapturingMigrationClient migrationClient = new CapturingMigrationClient(Collections.emptySet(), null, null, null); + CapturingMigrationClient migrationClient = new CapturingMigrationClient(Collections.emptySet(), + new CapturingTopicMigrationClient(), new CapturingConfigMigrationClient(), new CapturingAclMigrationClient()); MockFaultHandler faultHandler = new MockFaultHandler("testMigrationClientExpiration"); try (KRaftMigrationDriver driver = new KRaftMigrationDriver( 3000, @@ -420,6 +421,7 @@ public class KRaftMigrationDriverTest { interface TopicDualWriteVerifier { void verify( KRaftMigrationDriver driver, + CapturingMigrationClient migrationClient, CapturingTopicMigrationClient topicClient, CapturingConfigMigrationClient configClient ) throws Exception; @@ -461,13 +463,13 @@ public class KRaftMigrationDriverTest { quorumFeatures, mockTime )) { - verifier.verify(driver, topicClient, configClient); + verifier.verify(driver, migrationClient, topicClient, configClient); } } @Test public void testTopicDualWriteSnapshot() throws Exception { - setupTopicDualWrite((driver, topicClient, configClient) -> { + setupTopicDualWrite((driver, migrationClient, topicClient, configClient) -> { MetadataImage image = new MetadataImage( MetadataProvenance.EMPTY, FeaturesImage.EMPTY, @@ -519,7 +521,7 @@ public class KRaftMigrationDriverTest { @Test public void testTopicDualWriteDelta() throws Exception { - setupTopicDualWrite((driver, topicClient, configClient) -> { + setupTopicDualWrite((driver, migrationClient, topicClient, configClient) -> { MetadataImage image = new MetadataImage( MetadataProvenance.EMPTY, FeaturesImage.EMPTY, @@ -568,4 +570,62 @@ public class KRaftMigrationDriverTest { assertEquals(new ConfigResource(ConfigResource.Type.TOPIC, "foo"), configClient.deletedResources.get(0)); }); } + + @Test + public void testControllerFailover() throws Exception { + setupTopicDualWrite((driver, migrationClient, topicClient, configClient) -> { + MetadataImage image = new MetadataImage( + MetadataProvenance.EMPTY, + FeaturesImage.EMPTY, + ClusterImage.EMPTY, + IMAGE1, + ConfigurationsImage.EMPTY, + ClientQuotasImage.EMPTY, + ProducerIdsImage.EMPTY, + AclsImage.EMPTY, + ScramImage.EMPTY); + MetadataDelta delta = new MetadataDelta(image); + + driver.start(); + delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message()); + delta.replay(zkBrokerRecord(0)); + delta.replay(zkBrokerRecord(1)); + delta.replay(zkBrokerRecord(2)); + delta.replay(zkBrokerRecord(3)); + delta.replay(zkBrokerRecord(4)); + delta.replay(zkBrokerRecord(5)); + MetadataProvenance provenance = new MetadataProvenance(100, 1, 1); + image = delta.apply(provenance); + + // Publish a delta making a different node the leader + LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(3001), 1); + driver.onControllerChange(newLeader); + driver.onMetadataUpdate(delta, image, new LogDeltaManifest(provenance, newLeader, 1, 100, 42)); + + // Fake a complete migration + migrationClient.setMigrationRecoveryState( + ZkMigrationLeadershipState.EMPTY.withKRaftMetadataOffsetAndEpoch(100, 1)); + + // Modify topics in a KRaft -- delete foo, modify bar, add baz + provenance = new MetadataProvenance(200, 1, 1); + delta = new MetadataDelta(image); + RecordTestUtils.replayAll(delta, DELTA1_RECORDS); + image = delta.apply(provenance); + + // Standby driver does not do anything with this delta besides remember the image + driver.onMetadataUpdate(delta, image, new LogDeltaManifest(provenance, newLeader, 1, 100, 42)); + + // Standby becomes leader + newLeader = new LeaderAndEpoch(OptionalInt.of(3000), 1); + driver.onControllerChange(newLeader); + TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE), + ""); + assertEquals(1, topicClient.deletedTopics.size()); + assertEquals("foo", topicClient.deletedTopics.get(0)); + assertEquals(1, topicClient.createdTopics.size()); + assertEquals("baz", topicClient.createdTopics.get(0)); + assertTrue(topicClient.updatedTopicPartitions.get("bar").contains(0)); + assertEquals(new ConfigResource(ConfigResource.Type.TOPIC, "foo"), configClient.deletedResources.get(0)); + }); + } } \ No newline at end of file diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriterTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriterTest.java new file mode 100644 index 00000000000..d6001128087 --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriterTest.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.metadata.migration; + +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.image.AclsImage; +import org.apache.kafka.image.AclsImageTest; +import org.apache.kafka.image.ClientQuotasImage; +import org.apache.kafka.image.ClusterImage; +import org.apache.kafka.image.ConfigurationsImage; +import org.apache.kafka.image.ConfigurationsImageTest; +import org.apache.kafka.image.FeaturesImage; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.MetadataProvenance; +import org.apache.kafka.image.ProducerIdsImage; +import org.apache.kafka.image.ProducerIdsImageTest; +import org.apache.kafka.image.ScramImage; +import org.apache.kafka.image.TopicsImageTest; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +public class KRaftMigrationZkWriterTest { + /** + * If ZK is empty, ensure that the writer will sync all metadata from the MetadataImage to ZK + */ + @Test + public void testReconcileSnapshotEmptyZk() { + + // These test clients don't return any data in their iterates, so this simulates an empty ZK + CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient(); + CapturingConfigMigrationClient configClient = new CapturingConfigMigrationClient(); + CapturingAclMigrationClient aclClient = new CapturingAclMigrationClient(); + CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder() + .setBrokersInZk(0) + .setTopicMigrationClient(topicClient) + .setConfigMigrationClient(configClient) + .setAclMigrationClient(aclClient) + .build(); + + KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient); + + MetadataImage image = new MetadataImage( + MetadataProvenance.EMPTY, + FeaturesImage.EMPTY, // Features are not used in ZK mode, so we don't migrate or dual-write them + ClusterImage.EMPTY, // Broker registrations are not dual-written + TopicsImageTest.IMAGE1, + ConfigurationsImageTest.IMAGE1, + ClientQuotasImage.EMPTY, // TODO KAFKA-15017 + ProducerIdsImageTest.IMAGE1, + AclsImageTest.IMAGE1, + ScramImage.EMPTY // TODO KAFKA-15017 + ); + + Map<String, Integer> opCounts = new HashMap<>(); + KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts, + (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY)); + writer.handleSnapshot(image, consumer); + assertEquals(2, opCounts.remove("CreateTopic")); + assertEquals(2, opCounts.remove("UpdateBrokerConfig")); + assertEquals(1, opCounts.remove("UpdateProducerId")); + assertEquals(4, opCounts.remove("UpdateAcl")); + assertEquals(0, opCounts.size()); + + assertEquals(2, topicClient.createdTopics.size()); + assertTrue(topicClient.createdTopics.contains("foo")); + assertTrue(topicClient.createdTopics.contains("bar")); + assertEquals("bar", configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.BROKER, "0")).get("foo")); + assertEquals("quux", configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.BROKER, "0")).get("baz")); + assertEquals("foobaz", configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.BROKER, "1")).get("foobar")); + assertEquals(4, aclClient.updatedResources.size()); + } + + /** + * Only return one of two topics in the ZK topic iterator, ensure that the topic client creates the missing topic + */ + @Test + public void testReconcileSnapshotTopics() { + CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() { + @Override + public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor) { + Map<Integer, List<Integer>> assignments = new HashMap<>(); + assignments.put(0, Arrays.asList(2, 3, 4)); + assignments.put(1, Arrays.asList(3, 4, 5)); + assignments.put(2, Arrays.asList(2, 4, 5)); + visitor.visitTopic("foo", TopicsImageTest.FOO_UUID, assignments); + } + }; + + CapturingConfigMigrationClient configClient = new CapturingConfigMigrationClient(); + CapturingAclMigrationClient aclClient = new CapturingAclMigrationClient(); + CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder() + .setBrokersInZk(0) + .setTopicMigrationClient(topicClient) + .setConfigMigrationClient(configClient) + .setAclMigrationClient(aclClient) + .build(); + + KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient); + + MetadataImage image = new MetadataImage( + MetadataProvenance.EMPTY, + FeaturesImage.EMPTY, + ClusterImage.EMPTY, + TopicsImageTest.IMAGE1, // Two topics, foo and bar + ConfigurationsImage.EMPTY, + ClientQuotasImage.EMPTY, + ProducerIdsImage.EMPTY, + AclsImage.EMPTY, + ScramImage.EMPTY + ); + + Map<String, Integer> opCounts = new HashMap<>(); + KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts, + (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY)); + writer.handleSnapshot(image, consumer); + assertEquals(1, opCounts.remove("CreateTopic")); + assertEquals(0, opCounts.size()); + assertEquals("bar", topicClient.createdTopics.get(0)); + } +} diff --git a/tests/kafkatest/services/zookeeper.py b/tests/kafkatest/services/zookeeper.py index c9d86e84b66..e580bbf47ca 100644 --- a/tests/kafkatest/services/zookeeper.py +++ b/tests/kafkatest/services/zookeeper.py @@ -203,6 +203,56 @@ class ZookeeperService(KafkaPathResolverMixin, Service): result = match.groups()[0] return result + def get_children(self, path, chroot=None): + """ + Queries zookeeper for data associated with 'path' and returns all fields in the schema + """ + self._check_chroot(chroot) + + chroot_path = ('' if chroot is None else chroot) + path + + kafka_run_class = self.path.script("kafka-run-class.sh", DEV_BRANCH) + cmd = "%s %s -server %s %s ls %s" % \ + (kafka_run_class, self.java_cli_class_name(), self.connect_setting(force_tls=self.zk_client_secure_port), + self.zkTlsConfigFileOption(True), + chroot_path) + self.logger.debug(cmd) + + node = self.nodes[0] + result = None + for line in node.account.ssh_capture(cmd, allow_fail=True): + # loop through all lines in the output, but only hold on to the first match + if result is None: + match = re.match("^(\\[.+\\])$", line) + if match is not None: + result = match.groups()[0] + if result is None: + return [] + else: + return result.strip("[]").split(", ") + + def delete(self, path, recursive, chroot=None): + """ + Queries zookeeper for data associated with 'path' and returns all fields in the schema + """ + self._check_chroot(chroot) + + chroot_path = ('' if chroot is None else chroot) + path + + kafka_run_class = self.path.script("kafka-run-class.sh", DEV_BRANCH) + if recursive: + op = "deleteall" + else: + op = "delete" + cmd = "%s %s -server %s %s %s %s" % \ + (kafka_run_class, self.java_cli_class_name(), self.connect_setting(force_tls=self.zk_client_secure_port), + self.zkTlsConfigFileOption(True), + op, chroot_path) + self.logger.debug(cmd) + + node = self.nodes[0] + node.account.ssh_capture(cmd) + def create(self, path, chroot=None, value=""): """ Create an znode at the given path diff --git a/tests/kafkatest/tests/core/zookeeper_migration_test.py b/tests/kafkatest/tests/core/zookeeper_migration_test.py index a1be092d302..1713d970f17 100644 --- a/tests/kafkatest/tests/core/zookeeper_migration_test.py +++ b/tests/kafkatest/tests/core/zookeeper_migration_test.py @@ -50,7 +50,7 @@ class TestMigration(ProduceConsumeValidateTest): wait_until(lambda: len(self.kafka.isr_idx_list(self.topic, partition)) == self.replication_factor, timeout_sec=60, backoff_sec=1, err_msg="Replicas did not rejoin the ISR in a reasonable amount of time") - def do_migration(self): + def do_migration(self, roll_controller = False, downgrade_to_zk = False): # Start up KRaft controller in migration mode remote_quorum = partial(ServiceQuorumInfo, isolated_kraft) controller = KafkaService(self.test_context, num_nodes=1, zk=self.zk, version=DEV_BRANCH, @@ -78,7 +78,15 @@ class TestMigration(ProduceConsumeValidateTest): self.kafka.start_node(node) self.wait_until_rejoin() - def test_online_migration(self): + if roll_controller: + self.logger.info("Restarting KRaft quorum") + for node in controller.nodes: + controller.stop_node(node) + controller.start_node(node) + + @parametrize(roll_controller = True) + @parametrize(roll_controller = False) + def test_online_migration(self, roll_controller): zk_quorum = partial(ServiceQuorumInfo, zk) self.zk = ZookeeperService(self.test_context, num_nodes=1, version=DEV_BRANCH) self.kafka = KafkaService(self.test_context, @@ -116,7 +124,7 @@ class TestMigration(ProduceConsumeValidateTest): self.topic, consumer_timeout_ms=30000, message_validator=is_int, version=DEV_BRANCH) - self.run_produce_consume_validate(core_test_action=self.do_migration) + self.run_produce_consume_validate(core_test_action=partial(self.do_migration, roll_controller = roll_controller)) self.kafka.stop() @parametrize(metadata_quorum=isolated_kraft) @@ -163,9 +171,15 @@ class TestMigration(ProduceConsumeValidateTest): # Check the controller's logs for the error message about the migration state saw_expected_error = False + self.logger.info("Waiting for controller to crash") + + for node in self.kafka.nodes: + self.kafka.stop_node(node, clean_shutdown=False) + + for node in self.kafka.controller_quorum.nodes: + self.kafka.controller_quorum.stop_node(node, clean_shutdown=False) + for node in self.kafka.controller_quorum.nodes: - wait_until(lambda: not self.kafka.controller_quorum.alive(node), timeout_sec=60, - backoff_sec=1, err_msg="Controller did not halt in the expected amount of time") with node.account.monitor_log(KafkaService.STDOUT_STDERR_CAPTURE) as monitor: monitor.offset = 0 try: @@ -256,3 +270,98 @@ class TestMigration(ProduceConsumeValidateTest): assert saw_expected_log, "Did not see expected INFO log after upgrading from a 3.4 migration" self.kafka.stop() + + def test_reconcile_kraft_to_zk(self): + """ + Perform a migration and delete a topic directly from ZK. Ensure that the topic is added back + by KRaft during a failover. This exercises the snapshot reconciliation. + """ + zk_quorum = partial(ServiceQuorumInfo, zk) + self.zk = ZookeeperService(self.test_context, num_nodes=1, version=DEV_BRANCH) + self.kafka = KafkaService(self.test_context, + num_nodes=3, + zk=self.zk, + version=DEV_BRANCH, + quorum_info_provider=zk_quorum, + allow_zk_with_kraft=True, + server_prop_overrides=[["zookeeper.metadata.migration.enable", "false"]]) + + remote_quorum = partial(ServiceQuorumInfo, isolated_kraft) + controller = KafkaService(self.test_context, num_nodes=1, zk=self.zk, version=DEV_BRANCH, + allow_zk_with_kraft=True, + isolated_kafka=self.kafka, + server_prop_overrides=[["zookeeper.connect", self.zk.connect_setting()], + ["zookeeper.metadata.migration.enable", "true"]], + quorum_info_provider=remote_quorum) + + self.kafka.security_protocol = "PLAINTEXT" + self.kafka.interbroker_security_protocol = "PLAINTEXT" + self.zk.start() + self.logger.info("Pre-generating clusterId for ZK.") + cluster_id_json = """{"version": "1", "id": "%s"}""" % CLUSTER_ID + self.zk.create(path="/cluster") + self.zk.create(path="/cluster/id", value=cluster_id_json) + self.kafka.start() + + topic_cfg = { + "topic": self.topic, + "partitions": self.partitions, + "replication-factor": self.replication_factor, + "configs": {"min.insync.replicas": 2} + } + self.kafka.create_topic(topic_cfg) + + # Create topics in ZK mode + for i in range(10): + topic_cfg = { + "topic": f"zk-topic-{i}", + "partitions": self.partitions, + "replication-factor": self.replication_factor, + "configs": {"min.insync.replicas": 2} + } + self.kafka.create_topic(topic_cfg) + + controller.start() + self.kafka.reconfigure_zk_for_migration(controller) + for node in self.kafka.nodes: + self.kafka.stop_node(node) + self.kafka.start_node(node) + self.wait_until_rejoin() + + # Check the controller's logs for the INFO message that we're done with migration + saw_expected_log = False + for node in self.kafka.controller_quorum.nodes: + with node.account.monitor_log(KafkaService.STDOUT_STDERR_CAPTURE) as monitor: + monitor.offset = 0 + try: + # Shouldn't have to wait too long to see this log message after startup + monitor.wait_until( + "Finished initial migration of ZK metadata to KRaft", + timeout_sec=10.0, backoff_sec=.25, + err_msg="" + ) + saw_expected_log = True + break + except TimeoutError: + continue + + assert saw_expected_log, "Did not see expected INFO log after migration" + + # Manually delete a topic from ZK to simulate a missed dual-write + self.zk.delete(path="/brokers/topics/zk-topic-0", recursive=True) + + # Roll the controller nodes to force a failover, this causes a snapshot reconciliation + for node in controller.nodes: + controller.stop_node(node) + controller.start_node(node) + + def topic_in_zk(): + topics_in_zk = self.zk.get_children(path="/brokers/topics") + return "zk-topic-0" in topics_in_zk + + wait_until(topic_in_zk, timeout_sec=60, + backoff_sec=1, err_msg="Topic did not appear in ZK in time.") + + self.kafka.stop() + controller.stop() + self.zk.stop()
