mumrah commented on code in PR #12636:
URL: https://github.com/apache/kafka/pull/12636#discussion_r971306933


##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java:
##########
@@ -138,6 +139,15 @@ public List<AuthorizationResult> authorize(
         return results;
     }
 
+    @Override
+    public AuthorizationResult authorizeByResourceType(

Review Comment:
   If I understand correctly, this implementation was added to take advantage 
of the new binary search approach in the ACL array. IOW, an optimization over 
the default `authorizeByResourceType` impl?



##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizer.java:
##########
@@ -76,14 +78,15 @@ public interface ClusterMetadataAuthorizer extends 
Authorizer {
     void loadSnapshot(Map<Uuid, StandardAcl> acls);
 
     /**
-     * Add a new ACL. Any ACL with the same ID will be replaced.
-     */
-    void addAcl(Uuid id, StandardAcl acl);
-
-    /**
-     * Remove the ACL with the given ID.
+     * Add or remove ACLs.
+     *
+     * @param newAcls           The ACLs to add.
+     * @param removedAclIds     The ACL IDs to remove.
      */
-    void removeAcl(Uuid id);
+    void applyAclChanges(

Review Comment:
   We should document that this method does not expect duplicates or allow 
replacing ACL by ID. 



##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##########
@@ -182,59 +174,58 @@ StandardAuthorizerData copyWithNewConfig(int nodeId,
             loadingComplete,
             newSuperUsers,
             newDefaultResult,
-            aclsByResource,
-            aclsById);
+            acls);
     }
 
-    StandardAuthorizerData copyWithNewAcls(Collection<Entry<Uuid, 
StandardAcl>> aclEntries) {
-        StandardAuthorizerData newData = new StandardAuthorizerData(
-            log,
-            aclMutator,
-            loadingComplete,
-            superUsers,
-            defaultRule.result,
-            new ConcurrentSkipListSet<>(),
-            new ConcurrentHashMap<>());
-        for (Entry<Uuid, StandardAcl> entry : aclEntries) {
-            newData.addAcl(entry.getKey(), entry.getValue());
-        }
-        log.info("Applied {} acl(s) from image.", aclEntries.size());
-        return newData;
+    StandardAuthorizerData copyWithAllNewAcls(
+        Collection<Entry<Uuid, StandardAcl>> newAclEntries
+    ) {
+        return copyWithNewAcls(EMPTY_ACLS, newAclEntries, 
Collections.emptySet());
     }
 
-    void addAcl(Uuid id, StandardAcl acl) {
-        try {
-            StandardAcl prevAcl = aclsById.putIfAbsent(id, acl);
-            if (prevAcl != null) {
-                throw new RuntimeException("An ACL with ID " + id + " already 
exists.");
-            }
-            if (!aclsByResource.add(acl)) {
-                aclsById.remove(id);
-                throw new RuntimeException("Unable to add the ACL with ID " + 
id +
-                    " to aclsByResource");
-            }
-            log.trace("Added ACL {}: {}", id, acl);
-        } catch (Throwable e) {
-            log.error("addAcl error", e);
-            throw e;
-        }
+    StandardAuthorizerData copyWithAclChanges(
+        Collection<Entry<Uuid, StandardAcl>> newAclEntries,
+        Set<Uuid> removedAclIds
+    ) {
+        return copyWithNewAcls(acls, newAclEntries, removedAclIds);
     }
 
-    void removeAcl(Uuid id) {
-        try {
-            StandardAcl acl = aclsById.remove(id);
-            if (acl == null) {
-                throw new RuntimeException("ID " + id + " not found in 
aclsById.");
+    StandardAuthorizerData copyWithNewAcls(
+        StandardAclWithId[] existingAcls,
+        Collection<Entry<Uuid, StandardAcl>> newAclEntries,
+        Set<Uuid> removedAclIds
+    ) {
+        int newSize = existingAcls.length + newAclEntries.size() - 
removedAclIds.size();
+        StandardAclWithId[] newAcls = new StandardAclWithId[newSize];
+        int numRemoved = 0, j = 0;
+        for (int i = 0; i < existingAcls.length; i++) {
+            StandardAclWithId aclWithId = existingAcls[i];
+            if (removedAclIds.contains(aclWithId.id())) {
+                numRemoved++;
+            } else {
+                newAcls[j++] = aclWithId;
             }
-            if (!aclsByResource.remove(acl)) {
-                throw new RuntimeException("Unable to remove the ACL with ID " 
+ id +
-                    " from aclsByResource");
+        }
+        if (numRemoved < removedAclIds.size()) {
+            throw new RuntimeException("Only located " + numRemoved + " out of 
" +
+                removedAclIds.size() + " removed ACL ID(s). removedAclIds = " +
+                removedAclIds.stream().map(a -> 
a.toString()).collect(Collectors.joining(", ")));
+        }
+        if (!newAclEntries.isEmpty()) {
+            int i = 0;
+            for (Entry<Uuid, StandardAcl> entry : newAclEntries) {
+                newAcls[existingAcls.length + i] = new 
StandardAclWithId(entry.getKey(), entry.getValue());

Review Comment:
   Should this index be offset by the number we removed?



##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##########
@@ -182,59 +174,58 @@ StandardAuthorizerData copyWithNewConfig(int nodeId,
             loadingComplete,
             newSuperUsers,
             newDefaultResult,
-            aclsByResource,
-            aclsById);
+            acls);
     }
 
-    StandardAuthorizerData copyWithNewAcls(Collection<Entry<Uuid, 
StandardAcl>> aclEntries) {
-        StandardAuthorizerData newData = new StandardAuthorizerData(
-            log,
-            aclMutator,
-            loadingComplete,
-            superUsers,
-            defaultRule.result,
-            new ConcurrentSkipListSet<>(),
-            new ConcurrentHashMap<>());
-        for (Entry<Uuid, StandardAcl> entry : aclEntries) {
-            newData.addAcl(entry.getKey(), entry.getValue());
-        }
-        log.info("Applied {} acl(s) from image.", aclEntries.size());
-        return newData;
+    StandardAuthorizerData copyWithAllNewAcls(
+        Collection<Entry<Uuid, StandardAcl>> newAclEntries
+    ) {
+        return copyWithNewAcls(EMPTY_ACLS, newAclEntries, 
Collections.emptySet());
     }
 
-    void addAcl(Uuid id, StandardAcl acl) {
-        try {
-            StandardAcl prevAcl = aclsById.putIfAbsent(id, acl);
-            if (prevAcl != null) {
-                throw new RuntimeException("An ACL with ID " + id + " already 
exists.");
-            }
-            if (!aclsByResource.add(acl)) {
-                aclsById.remove(id);
-                throw new RuntimeException("Unable to add the ACL with ID " + 
id +
-                    " to aclsByResource");
-            }

Review Comment:
   Got it. Looking at AclControlManager, we should not generate a record for an 
existing ACL (as determined by the hash set in AclControlManager). 
   
   The replace-by-ID case via `ClusterMetadataAuthorizer#addAcl` confused me 
since I don't see how a caller could pass in the same ID for a new ACL. The doc 
used to say "Add a new ACL. Any ACL with the same ID will be replaced." 
Anyways, it's gone now with this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to