This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new f5f8ff0d24 KAFKA-14214: Introduce read-write lock to 
StandardAuthorizer for consistent ACL reads. (#12628)
f5f8ff0d24 is described below

commit f5f8ff0d24ce9f564fe1d7225db7121f22d1b296
Author: Akhilesh C <[email protected]>
AuthorDate: Tue Sep 20 13:54:18 2022 -0700

    KAFKA-14214: Introduce read-write lock to StandardAuthorizer for consistent 
ACL reads. (#12628)
    
    Fixes an issue with StandardAuthorizer#authorize that allowed inconsistent 
results. The underlying
    concurrent data structure (ConcurrentSkipListMap) had weak consistency 
guarantees. This meant
    that a concurrent update to the authorizer data could result in the 
authorize function processing
    ACL updates out of order.
    
    This patch replaces the concurrent data structures with regular non-thread 
safe equivalents and uses
    a read/write lock for thread safety and strong consistency.
    
    Reviewers: David Arthur <[email protected]>, Jason Gustafson 
<[email protected]>, Colin P. McCabe <[email protected]>, Luke Chen 
<[email protected]>
---
 .../metadata/authorizer/StandardAuthorizer.java    | 112 +++++++++++++++++----
 .../authorizer/StandardAuthorizerData.java         | 103 +++++++------------
 2 files changed, 129 insertions(+), 86 deletions(-)

diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
index 42f03367c2..197272a3e6 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
@@ -39,6 +39,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static org.apache.kafka.server.authorizer.AuthorizationResult.ALLOWED;
 import static org.apache.kafka.server.authorizer.AuthorizationResult.DENIED;
@@ -58,19 +59,34 @@ public class StandardAuthorizer implements 
ClusterMetadataAuthorizer {
      */
     private final CompletableFuture<Void> initialLoadFuture = new 
CompletableFuture<>();
 
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
     /**
-     * The current data. Can be read without a lock. Must be written while 
holding the object lock.
+     * The current data. We use a read-write lock to synchronize reads and 
writes to the data. We
+     * expect one writer and multiple readers accessing the ACL data, and we 
use the lock to make
+     * sure we have consistent reads when writer tries to change the data.
      */
     private volatile StandardAuthorizerData data = 
StandardAuthorizerData.createEmpty();
 
     @Override
-    public synchronized void setAclMutator(AclMutator aclMutator) {
-        this.data = data.copyWithNewAclMutator(aclMutator);
+    public void setAclMutator(AclMutator aclMutator) {
+        lock.writeLock().lock();
+        try {
+            this.data = data.copyWithNewAclMutator(aclMutator);
+        } finally {
+            lock.writeLock().unlock();
+        }
     }
 
     @Override
     public AclMutator aclMutatorOrException() {
-        AclMutator aclMutator = data.aclMutator;
+        AclMutator aclMutator;
+        lock.readLock().lock();
+        try {
+            aclMutator = data.aclMutator;
+        } finally {
+            lock.readLock().unlock();
+        }
         if (aclMutator == null) {
             throw new NotControllerException("The current node is not the 
active controller.");
         }
@@ -78,8 +94,13 @@ public class StandardAuthorizer implements 
ClusterMetadataAuthorizer {
     }
 
     @Override
-    public synchronized void completeInitialLoad() {
-        data = data.copyWithNewLoadingComplete(true);
+    public void completeInitialLoad() {
+        lock.writeLock().lock();
+        try {
+            data = data.copyWithNewLoadingComplete(true);
+        } finally {
+            lock.writeLock().unlock();
+        }
         data.log.info("Completed initial ACL load process.");
         initialLoadFuture.complete(null);
     }
@@ -97,17 +118,36 @@ public class StandardAuthorizer implements 
ClusterMetadataAuthorizer {
 
     @Override
     public void addAcl(Uuid id, StandardAcl acl) {
-        data.addAcl(id, acl);
+        lock.writeLock().lock();
+        try {
+            data.addAcl(id, acl);
+        } finally {
+            lock.writeLock().unlock();
+        }
     }
 
     @Override
     public void removeAcl(Uuid id) {
-        data.removeAcl(id);
+        lock.writeLock().lock();
+        try {
+            data.removeAcl(id);
+        } finally {
+            lock.writeLock().unlock();
+        }
     }
 
     @Override
-    public synchronized void loadSnapshot(Map<Uuid, StandardAcl> acls) {
-        data = data.copyWithNewAcls(acls.entrySet());
+    public void loadSnapshot(Map<Uuid, StandardAcl> acls) {
+        StandardAuthorizerData newData = StandardAuthorizerData.createEmpty();
+        for (Map.Entry<Uuid, StandardAcl> entry : acls.entrySet()) {
+            newData.addAcl(entry.getKey(), entry.getValue());
+        }
+        lock.writeLock().lock();
+        try {
+            data = data.copyWithNewAcls(newData.getAclsByResource(), 
newData.getAclsById());
+        } finally {
+            lock.writeLock().unlock();
+        }
     }
 
     @Override
@@ -129,23 +169,40 @@ public class StandardAuthorizer implements 
ClusterMetadataAuthorizer {
     public List<AuthorizationResult> authorize(
             AuthorizableRequestContext requestContext,
             List<Action> actions) {
-        StandardAuthorizerData curData = data;
         List<AuthorizationResult> results = new ArrayList<>(actions.size());
-        for (Action action: actions) {
-            AuthorizationResult result = curData.authorize(requestContext, 
action);
-            results.add(result);
+        lock.readLock().lock();
+        try {
+            StandardAuthorizerData curData = data;
+            for (Action action : actions) {
+                AuthorizationResult result = curData.authorize(requestContext, 
action);
+                results.add(result);
+            }
+        } finally {
+            lock.readLock().unlock();
         }
         return results;
     }
 
     @Override
     public Iterable<AclBinding> acls(AclBindingFilter filter) {
-        return data.acls(filter);
+        lock.readLock().lock();
+        try {
+            // The Iterable returned here is consistent because it is created 
over a read-only
+            // copy of ACLs data.
+            return data.acls(filter);
+        } finally {
+            lock.readLock().unlock();
+        }
     }
 
     @Override
     public int aclCount() {
-        return data.aclCount();
+        lock.readLock().lock();
+        try {
+            return data.aclCount();
+        } finally {
+            lock.readLock().unlock();
+        }
     }
 
     @Override
@@ -156,7 +213,7 @@ public class StandardAuthorizer implements 
ClusterMetadataAuthorizer {
     }
 
     @Override
-    public synchronized void configure(Map<String, ?> configs) {
+    public void configure(Map<String, ?> configs) {
         Set<String> superUsers = getConfiguredSuperUsers(configs);
         AuthorizationResult defaultResult = getDefaultResult(configs);
         int nodeId;
@@ -165,17 +222,32 @@ public class StandardAuthorizer implements 
ClusterMetadataAuthorizer {
         } catch (Exception e) {
             nodeId = -1;
         }
-        this.data = data.copyWithNewConfig(nodeId, superUsers, defaultResult);
+        lock.writeLock().lock();
+        try {
+            data = data.copyWithNewConfig(nodeId, superUsers, defaultResult);
+        } finally {
+            lock.writeLock().unlock();
+        }
         this.data.log.info("set super.users={}, default result={}", 
String.join(",", superUsers), defaultResult);
     }
 
     // VisibleForTesting
     Set<String> superUsers()  {
-        return new HashSet<>(data.superUsers());
+        lock.readLock().lock();
+        try {
+            return new HashSet<>(data.superUsers());
+        } finally {
+            lock.readLock().unlock();
+        }
     }
 
     AuthorizationResult defaultResult() {
-        return data.defaultResult();
+        lock.readLock().lock();
+        try {
+            return data.defaultResult();
+        } finally {
+            lock.readLock().unlock();
+        }
     }
 
     static Set<String> getConfiguredSuperUsers(Map<String, ?> configs) {
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
index d9ffd17562..c6e3b74a2a 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
@@ -36,16 +36,15 @@ import 
org.apache.kafka.server.authorizer.AuthorizationResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collection;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.Iterator;
-import java.util.Map.Entry;
+import java.util.List;
 import java.util.NavigableSet;
-import java.util.NoSuchElementException;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.TreeSet;
 
 import static org.apache.kafka.common.acl.AclOperation.ALL;
 import static org.apache.kafka.common.acl.AclOperation.ALTER;
@@ -64,7 +63,7 @@ import static 
org.apache.kafka.server.authorizer.AuthorizationResult.DENIED;
 /**
  * A class which encapsulates the configuration and the ACL data owned by 
StandardAuthorizer.
  *
- * The methods in this class support lockless concurrent access.
+ * The class is not thread-safe.
  */
 public class StandardAuthorizerData {
     /**
@@ -111,12 +110,12 @@ public class StandardAuthorizerData {
     /**
      * Contains all of the current ACLs sorted by (resource type, resource 
name).
      */
-    private final ConcurrentSkipListSet<StandardAcl> aclsByResource;
+    private final TreeSet<StandardAcl> aclsByResource;
 
     /**
      * Contains all of the current ACLs indexed by UUID.
      */
-    private final ConcurrentHashMap<Uuid, StandardAcl> aclsById;
+    private final HashMap<Uuid, StandardAcl> aclsById;
 
     private static Logger createLogger(int nodeId) {
         return new LogContext("[StandardAuthorizer " + nodeId + "] 
").logger(StandardAuthorizerData.class);
@@ -132,7 +131,7 @@ public class StandardAuthorizerData {
             false,
             Collections.emptySet(),
             DENIED,
-            new ConcurrentSkipListSet<>(), new ConcurrentHashMap<>());
+            new TreeSet<>(), new HashMap<>());
     }
 
     private StandardAuthorizerData(Logger log,
@@ -140,8 +139,8 @@ public class StandardAuthorizerData {
                                    boolean loadingComplete,
                                    Set<String> superUsers,
                                    AuthorizationResult defaultResult,
-                                   ConcurrentSkipListSet<StandardAcl> 
aclsByResource,
-                                   ConcurrentHashMap<Uuid, StandardAcl> 
aclsById) {
+                                   TreeSet<StandardAcl> aclsByResource,
+                                   HashMap<Uuid, StandardAcl> aclsById) {
         this.log = log;
         this.auditLog = auditLogger();
         this.aclMutator = aclMutator;
@@ -186,19 +185,17 @@ public class StandardAuthorizerData {
             aclsById);
     }
 
-    StandardAuthorizerData copyWithNewAcls(Collection<Entry<Uuid, 
StandardAcl>> aclEntries) {
-        StandardAuthorizerData newData = new StandardAuthorizerData(
+    StandardAuthorizerData copyWithNewAcls(TreeSet<StandardAcl> 
aclsByResource, HashMap<Uuid,
+        StandardAcl> aclsById) {
+        StandardAuthorizerData newData =  new StandardAuthorizerData(
             log,
             aclMutator,
             loadingComplete,
             superUsers,
             defaultRule.result,
-            new ConcurrentSkipListSet<>(),
-            new ConcurrentHashMap<>());
-        for (Entry<Uuid, StandardAcl> entry : aclEntries) {
-            newData.addAcl(entry.getKey(), entry.getValue());
-        }
-        log.info("Applied {} acl(s) from image.", aclEntries.size());
+            aclsByResource,
+            aclsById);
+        log.info("Initialized with {} acl(s).", aclsById.size());
         return newData;
     }
 
@@ -529,55 +526,21 @@ public class StandardAuthorizerData {
         return acl.permissionType().equals(ALLOW) ? ALLOWED : DENIED;
     }
 
+    /**
+     * Creates a consistent Iterable on read-only copy of AclBindings data for 
the given filter.
+     *
+     * @param filter The filter constraining the AclBindings to be present in 
the Iterable.
+     * @return Iterable over AclBindings matching the filter.
+     */
     Iterable<AclBinding> acls(AclBindingFilter filter) {
-        return new AclIterable(filter);
-    }
-
-    class AclIterable implements Iterable<AclBinding> {
-        private final AclBindingFilter filter;
-
-        AclIterable(AclBindingFilter filter) {
-            this.filter = filter;
-        }
-
-        @Override
-        public Iterator<AclBinding> iterator() {
-            return new AclIterator(filter);
-        }
-    }
-
-    class AclIterator implements Iterator<AclBinding> {
-        private final AclBindingFilter filter;
-        private final Iterator<StandardAcl> iterator;
-        private AclBinding next;
-
-        AclIterator(AclBindingFilter filter) {
-            this.filter = filter;
-            this.iterator = aclsByResource.iterator();
-            this.next = null;
-        }
-
-        @Override
-        public boolean hasNext() {
-            while (next == null) {
-                if (!iterator.hasNext()) return false;
-                AclBinding binding = iterator.next().toBinding();
-                if (filter.matches(binding)) {
-                    next = binding;
-                }
+        List<AclBinding> aclBindingList = new ArrayList<>();
+        aclsByResource.forEach(acl -> {
+            AclBinding aclBinding = acl.toBinding();
+            if (filter.matches(aclBinding)) {
+                aclBindingList.add(aclBinding);
             }
-            return true;
-        }
-
-        @Override
-        public AclBinding next() {
-            if (!hasNext()) {
-                throw new NoSuchElementException();
-            }
-            AclBinding result = next;
-            next = null;
-            return result;
-        }
+        });
+        return aclBindingList;
     }
 
     private interface MatchingRule {
@@ -654,4 +617,12 @@ public class StandardAuthorizerData {
             }
         }
     }
+
+    TreeSet<StandardAcl> getAclsByResource() {
+        return aclsByResource;
+    }
+
+    HashMap<Uuid, StandardAcl> getAclsById() {
+        return aclsById;
+    }
 }

Reply via email to