rondagostino commented on code in PR #12578:
URL: https://github.com/apache/kafka/pull/12578#discussion_r960958343


##########
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##########
@@ -248,33 +249,45 @@ private void 
incrementalAlterConfigResource(ConfigResource configResource,
     }
 
     private ApiError validateAlterConfig(ConfigResource configResource,
-                                         List<ApiMessageAndVersion> newRecords,
+                                         List<ApiMessageAndVersion> 
recordsExplicitlyAltered,
+                                         List<ApiMessageAndVersion> 
recordsImplicitlyDeleted,
                                          boolean newlyCreatedResource) {
         Map<String, String> allConfigs = new HashMap<>();
-        Map<String, String> alteredConfigs = new HashMap<>();
+        Map<String, String> alteredConfigsForAlterConfigPolicyCheck = new 
HashMap<>();
         TimelineHashMap<String, String> existingConfigs = 
configData.get(configResource);
         if (existingConfigs != null) allConfigs.putAll(existingConfigs);
-        for (ApiMessageAndVersion newRecord : newRecords) {
+        for (ApiMessageAndVersion newRecord : recordsExplicitlyAltered) {
             ConfigRecord configRecord = (ConfigRecord) newRecord.message();
             if (configRecord.value() == null) {
                 allConfigs.remove(configRecord.name());
             } else {
                 allConfigs.put(configRecord.name(), configRecord.value());
             }
-            alteredConfigs.put(configRecord.name(), configRecord.value());
+            alteredConfigsForAlterConfigPolicyCheck.put(configRecord.name(), 
configRecord.value());
+        }
+        for (ApiMessageAndVersion recordImplicitlyDeleted : 
recordsImplicitlyDeleted) {
+            ConfigRecord configRecord = (ConfigRecord) 
recordImplicitlyDeleted.message();
+            allConfigs.remove(configRecord.name());
+            // As per KAFKA-14039, do not include implicit deletions caused by 
using the legacy AlterConfigs API
+            // in the list passed to the policy in order to maintain backwards 
compatibility

Review Comment:
   The order a few lines above is `allConfigs.remove(configRecord.name());` 
followed by `alteredConfigsForAlterConfigPolicyCheck.put(configRecord.name(), 
configRecord.value());`.  If we mirror that here then the comment appears in 
the place where `alteredConfigsForAlterConfigPolicyCheck.put()` would occur.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to