mumrah commented on code in PR #13628:
URL: https://github.com/apache/kafka/pull/13628#discussion_r1179299560


##########
core/src/main/scala/kafka/zk/ZkMigrationClient.scala:
##########
@@ -211,6 +213,20 @@ class ZkMigrationClient(
       adminZkClient.fetchAllEntityConfigs(entityType).foreach { case (name, 
props) =>
         val entity = new 
EntityData().setEntityType(entityType).setEntityName(name)
         val batch = new util.ArrayList[ApiMessageAndVersion]()
+        ScramMechanism.values().filter(_ != ScramMechanism.UNKNOWN).foreach { 
mechanism =>
+          val propertyValue = props.getProperty(mechanism.mechanismName)
+          if (propertyValue != null) {

Review Comment:
   Should we log something if we get a null value here?



##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##########
@@ -568,19 +571,58 @@ public void run() throws Exception {
                     });
                 }
 
-                // For configs and client quotas, we need to send all of the 
data to the ZK client since we persist
-                // everything for a given entity in a single ZK node.
+                // For configs and client quotas, we need to send all of the 
data to the ZK
+                // client since we persist everything for a given entity in a 
single ZK node.
                 if (delta.configsDelta() != null) {
                     delta.configsDelta().changes().forEach((configResource, 
configDelta) ->
                         apply("Updating config resource " + configResource, 
migrationState ->
                             zkMigrationClient.writeConfigs(configResource, 
image.configs().configMapForResource(configResource), migrationState)));
                 }
 
-                if (delta.clientQuotasDelta() != null) {
-                    
delta.clientQuotasDelta().changes().forEach((clientQuotaEntity, 
clientQuotaDelta) -> {
-                        Map<String, Double> quotaMap = 
image.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
-                        apply("Updating client quota " + clientQuotaEntity, 
migrationState ->
-                            
zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, 
migrationState));
+                if ((delta.clientQuotasDelta() != null) || (delta.scramDelta() 
!= null)) {
+
+                    // A list of users with scram or quota changes
+                    HashSet<String> users = new HashSet<String>();
+
+                    // Populate list with users with scram changes
+                    if (delta.scramDelta() != null) {
+                        delta.scramDelta().changes().forEach((scramMechanism, 
changes) -> {
+                            changes.forEach((userName, changeOpt) -> 
users.add(userName));
+                        });
+                    }
+
+                    // Populate list with users with quota changes 
+                    // and apply quota changes to all non user quota changes
+                    if (delta.clientQuotasDelta() != null) {
+                        Map<String, String> scramMap = new HashMap<String, 
String>();
+                        
delta.clientQuotasDelta().changes().forEach((clientQuotaEntity, 
clientQuotaDelta) -> {
+
+                            if 
((clientQuotaEntity.entries().containsKey(ClientQuotaEntity.USER)) &&
+                                
(!clientQuotaEntity.entries().containsKey(ClientQuotaEntity.CLIENT_ID))) {
+                                String userName = 
clientQuotaEntity.entries().get(ClientQuotaEntity.USER);
+                                // Add clientQuotaEntity to list to process at 
the end
+                                users.add(userName);
+                            } else {
+                                Map<String, Double> quotaMap = 
image.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
+                                apply("Updating client quota " + 
clientQuotaEntity, migrationState -> 
+                                    
zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, 
scramMap, migrationState));
+                            }
+                        });
+                    }
+                    // Updateuser scram and quota data for each user with 
changes in either.

Review Comment:
   typo: Update user



##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##########
@@ -568,19 +571,58 @@ public void run() throws Exception {
                     });
                 }
 
-                // For configs and client quotas, we need to send all of the 
data to the ZK client since we persist
-                // everything for a given entity in a single ZK node.
+                // For configs and client quotas, we need to send all of the 
data to the ZK
+                // client since we persist everything for a given entity in a 
single ZK node.
                 if (delta.configsDelta() != null) {
                     delta.configsDelta().changes().forEach((configResource, 
configDelta) ->
                         apply("Updating config resource " + configResource, 
migrationState ->
                             zkMigrationClient.writeConfigs(configResource, 
image.configs().configMapForResource(configResource), migrationState)));
                 }
 
-                if (delta.clientQuotasDelta() != null) {
-                    
delta.clientQuotasDelta().changes().forEach((clientQuotaEntity, 
clientQuotaDelta) -> {
-                        Map<String, Double> quotaMap = 
image.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
-                        apply("Updating client quota " + clientQuotaEntity, 
migrationState ->
-                            
zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, 
migrationState));
+                if ((delta.clientQuotasDelta() != null) || (delta.scramDelta() 
!= null)) {
+
+                    // A list of users with scram or quota changes
+                    HashSet<String> users = new HashSet<String>();
+
+                    // Populate list with users with scram changes
+                    if (delta.scramDelta() != null) {
+                        delta.scramDelta().changes().forEach((scramMechanism, 
changes) -> {
+                            changes.forEach((userName, changeOpt) -> 
users.add(userName));
+                        });
+                    }
+
+                    // Populate list with users with quota changes 
+                    // and apply quota changes to all non user quota changes
+                    if (delta.clientQuotasDelta() != null) {
+                        Map<String, String> scramMap = new HashMap<String, 
String>();
+                        
delta.clientQuotasDelta().changes().forEach((clientQuotaEntity, 
clientQuotaDelta) -> {
+
+                            if 
((clientQuotaEntity.entries().containsKey(ClientQuotaEntity.USER)) &&
+                                
(!clientQuotaEntity.entries().containsKey(ClientQuotaEntity.CLIENT_ID))) {
+                                String userName = 
clientQuotaEntity.entries().get(ClientQuotaEntity.USER);
+                                // Add clientQuotaEntity to list to process at 
the end
+                                users.add(userName);
+                            } else {
+                                Map<String, Double> quotaMap = 
image.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
+                                apply("Updating client quota " + 
clientQuotaEntity, migrationState -> 
+                                    
zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, 
scramMap, migrationState));
+                            }
+                        });
+                    }
+                    // Updateuser scram and quota data for each user with 
changes in either.
+                    users.forEach(userName -> {
+                        Map<String, String> userscramMap = 
getScramCredentialStringsForUser(userName);

Review Comment:
   nit: userScramMap



##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##########
@@ -568,19 +571,58 @@ public void run() throws Exception {
                     });
                 }
 
-                // For configs and client quotas, we need to send all of the 
data to the ZK client since we persist
-                // everything for a given entity in a single ZK node.
+                // For configs and client quotas, we need to send all of the 
data to the ZK
+                // client since we persist everything for a given entity in a 
single ZK node.
                 if (delta.configsDelta() != null) {
                     delta.configsDelta().changes().forEach((configResource, 
configDelta) ->
                         apply("Updating config resource " + configResource, 
migrationState ->
                             zkMigrationClient.writeConfigs(configResource, 
image.configs().configMapForResource(configResource), migrationState)));
                 }
 
-                if (delta.clientQuotasDelta() != null) {
-                    
delta.clientQuotasDelta().changes().forEach((clientQuotaEntity, 
clientQuotaDelta) -> {
-                        Map<String, Double> quotaMap = 
image.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
-                        apply("Updating client quota " + clientQuotaEntity, 
migrationState ->
-                            
zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, 
migrationState));
+                if ((delta.clientQuotasDelta() != null) || (delta.scramDelta() 
!= null)) {
+
+                    // A list of users with scram or quota changes
+                    HashSet<String> users = new HashSet<String>();
+
+                    // Populate list with users with scram changes
+                    if (delta.scramDelta() != null) {
+                        delta.scramDelta().changes().forEach((scramMechanism, 
changes) -> {
+                            changes.forEach((userName, changeOpt) -> 
users.add(userName));
+                        });
+                    }
+
+                    // Populate list with users with quota changes 
+                    // and apply quota changes to all non user quota changes
+                    if (delta.clientQuotasDelta() != null) {
+                        Map<String, String> scramMap = new HashMap<String, 
String>();
+                        
delta.clientQuotasDelta().changes().forEach((clientQuotaEntity, 
clientQuotaDelta) -> {
+
+                            if 
((clientQuotaEntity.entries().containsKey(ClientQuotaEntity.USER)) &&
+                                
(!clientQuotaEntity.entries().containsKey(ClientQuotaEntity.CLIENT_ID))) {

Review Comment:
   Just so I understand -- we exclude entities like 
`/user/userA/client/clientX` because we don't store SCRAM credentials for 
user+client entities? We only store SCRAM for plain user entities?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to