This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch 3.5
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 65f628fd87bb8dae8781bb2722100550982799fa
Author: Proven Provenzano <[email protected]>
AuthorDate: Wed May 31 15:42:00 2023 -0400

    KAFKA-15017 Fix snapshot load in dual write mode for ClientQuotas and SCRAM 
 (#13757)
    
    This patch fixes the case where a ClientQuota or SCRAM credential was added 
in KRaft, but not written back to ZK. This missed write only occurred when 
handling a KRaft snapshot. If the changed quota was processed in a metadata 
delta (which is the typical case), it would be written to ZK.
    
    Reviewers: David Arthur <[email protected]>
---
 .../zk/migration/ZkConfigMigrationClientTest.scala | 78 ++++++++++++++++++++--
 .../metadata/migration/KRaftMigrationZkWriter.java | 26 +++++++-
 2 files changed, 96 insertions(+), 8 deletions(-)

diff --git 
a/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala 
b/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala
index b9d86be25c4..f3152697e86 100644
--- 
a/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala
+++ 
b/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala
@@ -18,15 +18,21 @@ package kafka.zk.migration
 
 import kafka.server.{ConfigType, KafkaConfig, ZkAdminManager}
 import kafka.zk.{AdminZkClient, ZkMigrationClient}
+import org.apache.kafka.clients.admin.ScramMechanism
 import org.apache.kafka.common.config.internals.QuotaConfigs
 import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
+import org.apache.kafka.common.metadata.ClientQuotaRecord
+import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData
 import org.apache.kafka.common.metadata.ConfigRecord
+import org.apache.kafka.common.metadata.UserScramCredentialRecord
 import org.apache.kafka.common.quota.ClientQuotaEntity
 import org.apache.kafka.common.security.scram.ScramCredential
 import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
 import org.apache.kafka.image.{ClientQuotasDelta, ClientQuotasImage}
+import org.apache.kafka.image.{MetadataDelta, MetadataImage, 
MetadataProvenance}
 import org.apache.kafka.metadata.RecordTestUtils
+import org.apache.kafka.metadata.migration.KRaftMigrationZkWriter
 import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
 import org.apache.kafka.server.common.ApiMessageAndVersion
 import org.apache.kafka.server.util.MockRandom
@@ -39,6 +45,12 @@ import scala.collection.Map
 import scala.jdk.CollectionConverters._
 
 class ZkConfigMigrationClientTest extends ZkMigrationTestHarness {
+  def randomBuffer(random: MockRandom, length: Int): Array[Byte] = {
+    val buf = new Array[Byte](length)
+    random.nextBytes(buf)
+    buf
+  }
+
   @Test
   def testMigrationBrokerConfigs(): Unit = {
     val brokers = new java.util.ArrayList[Integer]()
@@ -235,12 +247,6 @@ class ZkConfigMigrationClientTest extends 
ZkMigrationTestHarness {
   def testScram(): Unit = {
     val random = new MockRandom()
 
-    def randomBuffer(random: MockRandom, length: Int): Array[Byte] = {
-      val buf = new Array[Byte](length)
-      random.nextBytes(buf)
-      buf
-    }
-
     val scramCredential = new ScramCredential(
       randomBuffer(random, 1024),
       randomBuffer(random, 1024),
@@ -259,4 +265,64 @@ class ZkConfigMigrationClientTest extends 
ZkMigrationTestHarness {
     assertEquals(1, batches.size())
     assertEquals(1, batches.get(0).size)
   }
+
+  @Test
+  def testScramAndQuotaChangesInSnapshot(): Unit = {
+    val random = new MockRandom()
+
+    val props = new Properties()
+    props.put(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, "100000")
+    adminZkClient.changeConfigs(ConfigType.User, "user1", props)
+
+    // Create SCRAM records in Zookeeper.
+    val aliceScramCredential = new ScramCredential(
+      randomBuffer(random, 1024),
+      randomBuffer(random, 1024),
+      randomBuffer(random, 1024),
+      4096)
+
+    val alicePropsInit = new Properties()
+    alicePropsInit.put("SCRAM-SHA-256", 
ScramCredentialUtils.credentialToString(aliceScramCredential))
+    adminZkClient.changeConfigs(ConfigType.User, "alice", alicePropsInit)
+
+    val delta = new MetadataDelta(MetadataImage.EMPTY)
+
+    // Create a new Quota for user2
+    val entityData = new 
EntityData().setEntityType("user").setEntityName("user2")
+    val clientQuotaRecord = new ClientQuotaRecord()
+      .setEntity(List(entityData).asJava)
+      .setKey("request_percentage")
+      .setValue(58.58)
+      .setRemove(false)
+    delta.replay(clientQuotaRecord)
+
+    // Create a new SCRAM credential for george
+    val scramCredentialRecord = new UserScramCredentialRecord()
+      .setName("george")
+      .setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)
+      .setSalt(randomBuffer(random, 1024))
+      .setStoredKey(randomBuffer(random, 1024))
+      .setServerKey(randomBuffer(random, 1024))
+      .setIterations(8192)
+    delta.replay(scramCredentialRecord)
+
+    // Add Quota record for user2 but not user1 to delete user1
+    // Add SCRAM record for george but not for alice to delete alice
+    val image = delta.apply(MetadataProvenance.EMPTY)
+
+    // load snapshot to Zookeeper.
+    val kraftMigrationZkWriter = new KRaftMigrationZkWriter(migrationClient,
+      (_, operation) => { migrationState = operation.apply(migrationState) })
+    kraftMigrationZkWriter.handleLoadSnapshot(image)
+
+    val user1Props = zkClient.getEntityConfigs(ConfigType.User, "user1")
+    assertEquals(0, user1Props.size())
+    val user2Props = zkClient.getEntityConfigs(ConfigType.User, "user2")
+    assertEquals(1, user2Props.size())
+
+    val georgeProps = zkClient.getEntityConfigs(ConfigType.User, "george")
+    assertEquals(1, georgeProps.size())
+    val aliceProps = zkClient.getEntityConfigs(ConfigType.User, "alice")
+    assertEquals(0, aliceProps.size())
+  }
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
index bec062f5c98..01dc782f9dc 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
@@ -50,6 +50,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Set;
 import java.util.function.BiConsumer;
@@ -258,6 +259,27 @@ public class KRaftMigrationZkWriter {
     void handleClientQuotasSnapshot(ClientQuotasImage clientQuotasImage, 
ScramImage scramImage) {
         Set<ClientQuotaEntity> changedNonUserEntities = new HashSet<>();
         Set<String> changedUsers = new HashSet<>();
+
+        if (clientQuotasImage != null) {
+            for (Entry<ClientQuotaEntity, ClientQuotaImage> entry : 
clientQuotasImage.entities().entrySet()) {
+                ClientQuotaEntity entity = entry.getKey();
+                if (entity.entries().containsKey(ClientQuotaEntity.USER) &&
+                    
!entity.entries().containsKey(ClientQuotaEntity.CLIENT_ID)) {
+                    // Track regular user entities separately
+                    // There should only be 1 entry in the list of type 
ClientQuotaEntity.USER
+                    
changedUsers.add(entity.entries().get(ClientQuotaEntity.USER));
+                } else {
+                    changedNonUserEntities.add(entity);
+                }
+            }
+        }
+        if (scramImage != null) {
+            for (Entry<ScramMechanism, Map<String, ScramCredentialData>> 
mechanismEntry : scramImage.mechanisms().entrySet()) {
+                for (Entry<String, ScramCredentialData> userEntry : 
mechanismEntry.getValue().entrySet()) {
+                    changedUsers.add(userEntry.getKey());
+                }
+            }
+        }
         migrationClient.configClient().iterateClientQuotas(new 
ConfigMigrationClient.ClientQuotaVisitor() {
             @Override
             public void visitClientQuota(List<ClientQuotaRecord.EntityData> 
entityDataList, Map<String, Double> quotas) {
@@ -293,13 +315,13 @@ public class KRaftMigrationZkWriter {
 
         changedUsers.forEach(userName -> {
             ClientQuotaEntity entity = new 
ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, userName));
-            Map<String, Double> quotaMap = 
clientQuotasImage.entities().get(entity).quotaMap();
+            Map<String, Double> quotaMap = clientQuotasImage.entities().
+                getOrDefault(entity, ClientQuotaImage.EMPTY).quotaMap();
             Map<String, String> scramMap = 
getScramCredentialStringsForUser(scramImage, userName);
             operationConsumer.accept("Update scram credentials for " + 
userName, migrationState ->
                 
migrationClient.configClient().writeClientQuotas(entity.entries(), quotaMap, 
scramMap, migrationState));
         });
 
-
     }
 
     void handleConfigsDelta(ConfigurationsImage configsImage, 
ConfigurationsDelta configsDelta) {

Reply via email to