pprovenzano commented on code in PR #13628: URL: https://github.com/apache/kafka/pull/13628#discussion_r1179530134
########## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ########## @@ -568,19 +571,58 @@ public void run() throws Exception { }); } - // For configs and client quotas, we need to send all of the data to the ZK client since we persist - // everything for a given entity in a single ZK node. + // For configs and client quotas, we need to send all of the data to the ZK + // client since we persist everything for a given entity in a single ZK node. if (delta.configsDelta() != null) { delta.configsDelta().changes().forEach((configResource, configDelta) -> apply("Updating config resource " + configResource, migrationState -> zkMigrationClient.writeConfigs(configResource, image.configs().configMapForResource(configResource), migrationState))); } - if (delta.clientQuotasDelta() != null) { - delta.clientQuotasDelta().changes().forEach((clientQuotaEntity, clientQuotaDelta) -> { - Map<String, Double> quotaMap = image.clientQuotas().entities().get(clientQuotaEntity).quotaMap(); - apply("Updating client quota " + clientQuotaEntity, migrationState -> - zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, migrationState)); + if ((delta.clientQuotasDelta() != null) || (delta.scramDelta() != null)) { + + // A list of users with scram or quota changes + HashSet<String> users = new HashSet<String>(); + + // Populate list with users with scram changes + if (delta.scramDelta() != null) { + delta.scramDelta().changes().forEach((scramMechanism, changes) -> { + changes.forEach((userName, changeOpt) -> users.add(userName)); + }); + } + + // Populate list with users with quota changes + // and apply quota changes to all non user quota changes + if (delta.clientQuotasDelta() != null) { + Map<String, String> scramMap = new HashMap<String, String>(); + delta.clientQuotasDelta().changes().forEach((clientQuotaEntity, clientQuotaDelta) -> { + + if ((clientQuotaEntity.entries().containsKey(ClientQuotaEntity.USER)) && + (!clientQuotaEntity.entries().containsKey(ClientQuotaEntity.CLIENT_ID))) { Review Comment: That is my understanding. SCRAM records are only applied to principals and are not client specific. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org