This is an automated email from the ASF dual-hosted git repository. davidarthur pushed a commit to branch 3.5 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 65f628fd87bb8dae8781bb2722100550982799fa Author: Proven Provenzano <[email protected]> AuthorDate: Wed May 31 15:42:00 2023 -0400 KAFKA-15017 Fix snapshot load in dual write mode for ClientQuotas and SCRAM (#13757) This patch fixes the case where a ClientQuota or SCRAM credential was added in KRaft, but not written back to ZK. This missed write only occurred when handling a KRaft snapshot. If the changed quota was processed in a metadata delta (which is the typical case), it would be written to ZK. Reviewers: David Arthur <[email protected]> --- .../zk/migration/ZkConfigMigrationClientTest.scala | 78 ++++++++++++++++++++-- .../metadata/migration/KRaftMigrationZkWriter.java | 26 +++++++- 2 files changed, 96 insertions(+), 8 deletions(-) diff --git a/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala b/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala index b9d86be25c4..f3152697e86 100644 --- a/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala @@ -18,15 +18,21 @@ package kafka.zk.migration import kafka.server.{ConfigType, KafkaConfig, ZkAdminManager} import kafka.zk.{AdminZkClient, ZkMigrationClient} +import org.apache.kafka.clients.admin.ScramMechanism import org.apache.kafka.common.config.internals.QuotaConfigs import org.apache.kafka.common.config.types.Password import org.apache.kafka.common.config.{ConfigResource, TopicConfig} +import org.apache.kafka.common.metadata.ClientQuotaRecord +import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData import org.apache.kafka.common.metadata.ConfigRecord +import org.apache.kafka.common.metadata.UserScramCredentialRecord import org.apache.kafka.common.quota.ClientQuotaEntity import org.apache.kafka.common.security.scram.ScramCredential import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils import org.apache.kafka.image.{ClientQuotasDelta, ClientQuotasImage} +import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance} import org.apache.kafka.metadata.RecordTestUtils +import org.apache.kafka.metadata.migration.KRaftMigrationZkWriter import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState import org.apache.kafka.server.common.ApiMessageAndVersion import org.apache.kafka.server.util.MockRandom @@ -39,6 +45,12 @@ import scala.collection.Map import scala.jdk.CollectionConverters._ class ZkConfigMigrationClientTest extends ZkMigrationTestHarness { + def randomBuffer(random: MockRandom, length: Int): Array[Byte] = { + val buf = new Array[Byte](length) + random.nextBytes(buf) + buf + } + @Test def testMigrationBrokerConfigs(): Unit = { val brokers = new java.util.ArrayList[Integer]() @@ -235,12 +247,6 @@ class ZkConfigMigrationClientTest extends ZkMigrationTestHarness { def testScram(): Unit = { val random = new MockRandom() - def randomBuffer(random: MockRandom, length: Int): Array[Byte] = { - val buf = new Array[Byte](length) - random.nextBytes(buf) - buf - } - val scramCredential = new ScramCredential( randomBuffer(random, 1024), randomBuffer(random, 1024), @@ -259,4 +265,64 @@ class ZkConfigMigrationClientTest extends ZkMigrationTestHarness { assertEquals(1, batches.size()) assertEquals(1, batches.get(0).size) } + + @Test + def testScramAndQuotaChangesInSnapshot(): Unit = { + val random = new MockRandom() + + val props = new Properties() + props.put(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, "100000") + adminZkClient.changeConfigs(ConfigType.User, "user1", props) + + // Create SCRAM records in Zookeeper. + val aliceScramCredential = new ScramCredential( + randomBuffer(random, 1024), + randomBuffer(random, 1024), + randomBuffer(random, 1024), + 4096) + + val alicePropsInit = new Properties() + alicePropsInit.put("SCRAM-SHA-256", ScramCredentialUtils.credentialToString(aliceScramCredential)) + adminZkClient.changeConfigs(ConfigType.User, "alice", alicePropsInit) + + val delta = new MetadataDelta(MetadataImage.EMPTY) + + // Create a new Quota for user2 + val entityData = new EntityData().setEntityType("user").setEntityName("user2") + val clientQuotaRecord = new ClientQuotaRecord() + .setEntity(List(entityData).asJava) + .setKey("request_percentage") + .setValue(58.58) + .setRemove(false) + delta.replay(clientQuotaRecord) + + // Create a new SCRAM credential for george + val scramCredentialRecord = new UserScramCredentialRecord() + .setName("george") + .setMechanism(ScramMechanism.SCRAM_SHA_256.`type`) + .setSalt(randomBuffer(random, 1024)) + .setStoredKey(randomBuffer(random, 1024)) + .setServerKey(randomBuffer(random, 1024)) + .setIterations(8192) + delta.replay(scramCredentialRecord) + + // Add Quota record for user2 but not user1 to delete user1 + // Add SCRAM record for george but not for alice to delete alice + val image = delta.apply(MetadataProvenance.EMPTY) + + // load snapshot to Zookeeper. + val kraftMigrationZkWriter = new KRaftMigrationZkWriter(migrationClient, + (_, operation) => { migrationState = operation.apply(migrationState) }) + kraftMigrationZkWriter.handleLoadSnapshot(image) + + val user1Props = zkClient.getEntityConfigs(ConfigType.User, "user1") + assertEquals(0, user1Props.size()) + val user2Props = zkClient.getEntityConfigs(ConfigType.User, "user2") + assertEquals(1, user2Props.size()) + + val georgeProps = zkClient.getEntityConfigs(ConfigType.User, "george") + assertEquals(1, georgeProps.size()) + val aliceProps = zkClient.getEntityConfigs(ConfigType.User, "alice") + assertEquals(0, aliceProps.size()) + } } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java index bec062f5c98..01dc782f9dc 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java @@ -50,6 +50,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Optional; import java.util.Set; import java.util.function.BiConsumer; @@ -258,6 +259,27 @@ public class KRaftMigrationZkWriter { void handleClientQuotasSnapshot(ClientQuotasImage clientQuotasImage, ScramImage scramImage) { Set<ClientQuotaEntity> changedNonUserEntities = new HashSet<>(); Set<String> changedUsers = new HashSet<>(); + + if (clientQuotasImage != null) { + for (Entry<ClientQuotaEntity, ClientQuotaImage> entry : clientQuotasImage.entities().entrySet()) { + ClientQuotaEntity entity = entry.getKey(); + if (entity.entries().containsKey(ClientQuotaEntity.USER) && + !entity.entries().containsKey(ClientQuotaEntity.CLIENT_ID)) { + // Track regular user entities separately + // There should only be 1 entry in the list of type ClientQuotaEntity.USER + changedUsers.add(entity.entries().get(ClientQuotaEntity.USER)); + } else { + changedNonUserEntities.add(entity); + } + } + } + if (scramImage != null) { + for (Entry<ScramMechanism, Map<String, ScramCredentialData>> mechanismEntry : scramImage.mechanisms().entrySet()) { + for (Entry<String, ScramCredentialData> userEntry : mechanismEntry.getValue().entrySet()) { + changedUsers.add(userEntry.getKey()); + } + } + } migrationClient.configClient().iterateClientQuotas(new ConfigMigrationClient.ClientQuotaVisitor() { @Override public void visitClientQuota(List<ClientQuotaRecord.EntityData> entityDataList, Map<String, Double> quotas) { @@ -293,13 +315,13 @@ public class KRaftMigrationZkWriter { changedUsers.forEach(userName -> { ClientQuotaEntity entity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, userName)); - Map<String, Double> quotaMap = clientQuotasImage.entities().get(entity).quotaMap(); + Map<String, Double> quotaMap = clientQuotasImage.entities(). + getOrDefault(entity, ClientQuotaImage.EMPTY).quotaMap(); Map<String, String> scramMap = getScramCredentialStringsForUser(scramImage, userName); operationConsumer.accept("Update scram credentials for " + userName, migrationState -> migrationClient.configClient().writeClientQuotas(entity.entries(), quotaMap, scramMap, migrationState)); }); - } void handleConfigsDelta(ConfigurationsImage configsImage, ConfigurationsDelta configsDelta) {
