pprovenzano commented on code in PR #13628:
URL: https://github.com/apache/kafka/pull/13628#discussion_r1179530134


##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##########
@@ -568,19 +571,58 @@ public void run() throws Exception {
                     });
                 }
 
-                // For configs and client quotas, we need to send all of the 
data to the ZK client since we persist
-                // everything for a given entity in a single ZK node.
+                // For configs and client quotas, we need to send all of the 
data to the ZK
+                // client since we persist everything for a given entity in a 
single ZK node.
                 if (delta.configsDelta() != null) {
                     delta.configsDelta().changes().forEach((configResource, 
configDelta) ->
                         apply("Updating config resource " + configResource, 
migrationState ->
                             zkMigrationClient.writeConfigs(configResource, 
image.configs().configMapForResource(configResource), migrationState)));
                 }
 
-                if (delta.clientQuotasDelta() != null) {
-                    
delta.clientQuotasDelta().changes().forEach((clientQuotaEntity, 
clientQuotaDelta) -> {
-                        Map<String, Double> quotaMap = 
image.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
-                        apply("Updating client quota " + clientQuotaEntity, 
migrationState ->
-                            
zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, 
migrationState));
+                if ((delta.clientQuotasDelta() != null) || (delta.scramDelta() 
!= null)) {
+
+                    // A list of users with scram or quota changes
+                    HashSet<String> users = new HashSet<String>();
+
+                    // Populate list with users with scram changes
+                    if (delta.scramDelta() != null) {
+                        delta.scramDelta().changes().forEach((scramMechanism, 
changes) -> {
+                            changes.forEach((userName, changeOpt) -> 
users.add(userName));
+                        });
+                    }
+
+                    // Populate list with users with quota changes 
+                    // and apply quota changes to all non user quota changes
+                    if (delta.clientQuotasDelta() != null) {
+                        Map<String, String> scramMap = new HashMap<String, 
String>();
+                        
delta.clientQuotasDelta().changes().forEach((clientQuotaEntity, 
clientQuotaDelta) -> {
+
+                            if 
((clientQuotaEntity.entries().containsKey(ClientQuotaEntity.USER)) &&
+                                
(!clientQuotaEntity.entries().containsKey(ClientQuotaEntity.CLIENT_ID))) {

Review Comment:
   That is my understanding. SCRAM records are only applied to principals and 
are not client specific.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to