This is an automated email from the ASF dual-hosted git repository.
davidarthur pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new f5f8ff0d24 KAFKA-14214: Introduce read-write lock to
StandardAuthorizer for consistent ACL reads. (#12628)
f5f8ff0d24 is described below
commit f5f8ff0d24ce9f564fe1d7225db7121f22d1b296
Author: Akhilesh C <[email protected]>
AuthorDate: Tue Sep 20 13:54:18 2022 -0700
KAFKA-14214: Introduce read-write lock to StandardAuthorizer for consistent
ACL reads. (#12628)
Fixes an issue with StandardAuthorizer#authorize that allowed inconsistent
results. The underlying
concurrent data structure (ConcurrentSkipListMap) had weak consistency
guarantees. This meant
that a concurrent update to the authorizer data could result in the
authorize function processing
ACL updates out of order.
This patch replaces the concurrent data structures with regular non-thread
safe equivalents and uses
a read/write lock for thread safety and strong consistency.
Reviewers: David Arthur <[email protected]>, Jason Gustafson
<[email protected]>, Colin P. McCabe <[email protected]>, Luke Chen
<[email protected]>
---
.../metadata/authorizer/StandardAuthorizer.java | 112 +++++++++++++++++----
.../authorizer/StandardAuthorizerData.java | 103 +++++++------------
2 files changed, 129 insertions(+), 86 deletions(-)
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
index 42f03367c2..197272a3e6 100644
---
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
+++
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
@@ -39,6 +39,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.kafka.server.authorizer.AuthorizationResult.ALLOWED;
import static org.apache.kafka.server.authorizer.AuthorizationResult.DENIED;
@@ -58,19 +59,34 @@ public class StandardAuthorizer implements
ClusterMetadataAuthorizer {
*/
private final CompletableFuture<Void> initialLoadFuture = new
CompletableFuture<>();
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
/**
- * The current data. Can be read without a lock. Must be written while
holding the object lock.
+ * The current data. We use a read-write lock to synchronize reads and
writes to the data. We
+ * expect one writer and multiple readers accessing the ACL data, and we
use the lock to make
+ * sure we have consistent reads when writer tries to change the data.
*/
private volatile StandardAuthorizerData data =
StandardAuthorizerData.createEmpty();
@Override
- public synchronized void setAclMutator(AclMutator aclMutator) {
- this.data = data.copyWithNewAclMutator(aclMutator);
+ public void setAclMutator(AclMutator aclMutator) {
+ lock.writeLock().lock();
+ try {
+ this.data = data.copyWithNewAclMutator(aclMutator);
+ } finally {
+ lock.writeLock().unlock();
+ }
}
@Override
public AclMutator aclMutatorOrException() {
- AclMutator aclMutator = data.aclMutator;
+ AclMutator aclMutator;
+ lock.readLock().lock();
+ try {
+ aclMutator = data.aclMutator;
+ } finally {
+ lock.readLock().unlock();
+ }
if (aclMutator == null) {
throw new NotControllerException("The current node is not the
active controller.");
}
@@ -78,8 +94,13 @@ public class StandardAuthorizer implements
ClusterMetadataAuthorizer {
}
@Override
- public synchronized void completeInitialLoad() {
- data = data.copyWithNewLoadingComplete(true);
+ public void completeInitialLoad() {
+ lock.writeLock().lock();
+ try {
+ data = data.copyWithNewLoadingComplete(true);
+ } finally {
+ lock.writeLock().unlock();
+ }
data.log.info("Completed initial ACL load process.");
initialLoadFuture.complete(null);
}
@@ -97,17 +118,36 @@ public class StandardAuthorizer implements
ClusterMetadataAuthorizer {
@Override
public void addAcl(Uuid id, StandardAcl acl) {
- data.addAcl(id, acl);
+ lock.writeLock().lock();
+ try {
+ data.addAcl(id, acl);
+ } finally {
+ lock.writeLock().unlock();
+ }
}
@Override
public void removeAcl(Uuid id) {
- data.removeAcl(id);
+ lock.writeLock().lock();
+ try {
+ data.removeAcl(id);
+ } finally {
+ lock.writeLock().unlock();
+ }
}
@Override
- public synchronized void loadSnapshot(Map<Uuid, StandardAcl> acls) {
- data = data.copyWithNewAcls(acls.entrySet());
+ public void loadSnapshot(Map<Uuid, StandardAcl> acls) {
+ StandardAuthorizerData newData = StandardAuthorizerData.createEmpty();
+ for (Map.Entry<Uuid, StandardAcl> entry : acls.entrySet()) {
+ newData.addAcl(entry.getKey(), entry.getValue());
+ }
+ lock.writeLock().lock();
+ try {
+ data = data.copyWithNewAcls(newData.getAclsByResource(),
newData.getAclsById());
+ } finally {
+ lock.writeLock().unlock();
+ }
}
@Override
@@ -129,23 +169,40 @@ public class StandardAuthorizer implements
ClusterMetadataAuthorizer {
public List<AuthorizationResult> authorize(
AuthorizableRequestContext requestContext,
List<Action> actions) {
- StandardAuthorizerData curData = data;
List<AuthorizationResult> results = new ArrayList<>(actions.size());
- for (Action action: actions) {
- AuthorizationResult result = curData.authorize(requestContext,
action);
- results.add(result);
+ lock.readLock().lock();
+ try {
+ StandardAuthorizerData curData = data;
+ for (Action action : actions) {
+ AuthorizationResult result = curData.authorize(requestContext,
action);
+ results.add(result);
+ }
+ } finally {
+ lock.readLock().unlock();
}
return results;
}
@Override
public Iterable<AclBinding> acls(AclBindingFilter filter) {
- return data.acls(filter);
+ lock.readLock().lock();
+ try {
+ // The Iterable returned here is consistent because it is created
over a read-only
+ // copy of ACLs data.
+ return data.acls(filter);
+ } finally {
+ lock.readLock().unlock();
+ }
}
@Override
public int aclCount() {
- return data.aclCount();
+ lock.readLock().lock();
+ try {
+ return data.aclCount();
+ } finally {
+ lock.readLock().unlock();
+ }
}
@Override
@@ -156,7 +213,7 @@ public class StandardAuthorizer implements
ClusterMetadataAuthorizer {
}
@Override
- public synchronized void configure(Map<String, ?> configs) {
+ public void configure(Map<String, ?> configs) {
Set<String> superUsers = getConfiguredSuperUsers(configs);
AuthorizationResult defaultResult = getDefaultResult(configs);
int nodeId;
@@ -165,17 +222,32 @@ public class StandardAuthorizer implements
ClusterMetadataAuthorizer {
} catch (Exception e) {
nodeId = -1;
}
- this.data = data.copyWithNewConfig(nodeId, superUsers, defaultResult);
+ lock.writeLock().lock();
+ try {
+ data = data.copyWithNewConfig(nodeId, superUsers, defaultResult);
+ } finally {
+ lock.writeLock().unlock();
+ }
this.data.log.info("set super.users={}, default result={}",
String.join(",", superUsers), defaultResult);
}
// VisibleForTesting
Set<String> superUsers() {
- return new HashSet<>(data.superUsers());
+ lock.readLock().lock();
+ try {
+ return new HashSet<>(data.superUsers());
+ } finally {
+ lock.readLock().unlock();
+ }
}
AuthorizationResult defaultResult() {
- return data.defaultResult();
+ lock.readLock().lock();
+ try {
+ return data.defaultResult();
+ } finally {
+ lock.readLock().unlock();
+ }
}
static Set<String> getConfiguredSuperUsers(Map<String, ?> configs) {
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
index d9ffd17562..c6e3b74a2a 100644
---
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
+++
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
@@ -36,16 +36,15 @@ import
org.apache.kafka.server.authorizer.AuthorizationResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collection;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.Iterator;
-import java.util.Map.Entry;
+import java.util.List;
import java.util.NavigableSet;
-import java.util.NoSuchElementException;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.TreeSet;
import static org.apache.kafka.common.acl.AclOperation.ALL;
import static org.apache.kafka.common.acl.AclOperation.ALTER;
@@ -64,7 +63,7 @@ import static
org.apache.kafka.server.authorizer.AuthorizationResult.DENIED;
/**
* A class which encapsulates the configuration and the ACL data owned by
StandardAuthorizer.
*
- * The methods in this class support lockless concurrent access.
+ * The class is not thread-safe.
*/
public class StandardAuthorizerData {
/**
@@ -111,12 +110,12 @@ public class StandardAuthorizerData {
/**
* Contains all of the current ACLs sorted by (resource type, resource
name).
*/
- private final ConcurrentSkipListSet<StandardAcl> aclsByResource;
+ private final TreeSet<StandardAcl> aclsByResource;
/**
* Contains all of the current ACLs indexed by UUID.
*/
- private final ConcurrentHashMap<Uuid, StandardAcl> aclsById;
+ private final HashMap<Uuid, StandardAcl> aclsById;
private static Logger createLogger(int nodeId) {
return new LogContext("[StandardAuthorizer " + nodeId + "]
").logger(StandardAuthorizerData.class);
@@ -132,7 +131,7 @@ public class StandardAuthorizerData {
false,
Collections.emptySet(),
DENIED,
- new ConcurrentSkipListSet<>(), new ConcurrentHashMap<>());
+ new TreeSet<>(), new HashMap<>());
}
private StandardAuthorizerData(Logger log,
@@ -140,8 +139,8 @@ public class StandardAuthorizerData {
boolean loadingComplete,
Set<String> superUsers,
AuthorizationResult defaultResult,
- ConcurrentSkipListSet<StandardAcl>
aclsByResource,
- ConcurrentHashMap<Uuid, StandardAcl>
aclsById) {
+ TreeSet<StandardAcl> aclsByResource,
+ HashMap<Uuid, StandardAcl> aclsById) {
this.log = log;
this.auditLog = auditLogger();
this.aclMutator = aclMutator;
@@ -186,19 +185,17 @@ public class StandardAuthorizerData {
aclsById);
}
- StandardAuthorizerData copyWithNewAcls(Collection<Entry<Uuid,
StandardAcl>> aclEntries) {
- StandardAuthorizerData newData = new StandardAuthorizerData(
+ StandardAuthorizerData copyWithNewAcls(TreeSet<StandardAcl>
aclsByResource, HashMap<Uuid,
+ StandardAcl> aclsById) {
+ StandardAuthorizerData newData = new StandardAuthorizerData(
log,
aclMutator,
loadingComplete,
superUsers,
defaultRule.result,
- new ConcurrentSkipListSet<>(),
- new ConcurrentHashMap<>());
- for (Entry<Uuid, StandardAcl> entry : aclEntries) {
- newData.addAcl(entry.getKey(), entry.getValue());
- }
- log.info("Applied {} acl(s) from image.", aclEntries.size());
+ aclsByResource,
+ aclsById);
+ log.info("Initialized with {} acl(s).", aclsById.size());
return newData;
}
@@ -529,55 +526,21 @@ public class StandardAuthorizerData {
return acl.permissionType().equals(ALLOW) ? ALLOWED : DENIED;
}
+ /**
+ * Creates a consistent Iterable on read-only copy of AclBindings data for
the given filter.
+ *
+ * @param filter The filter constraining the AclBindings to be present in
the Iterable.
+ * @return Iterable over AclBindings matching the filter.
+ */
Iterable<AclBinding> acls(AclBindingFilter filter) {
- return new AclIterable(filter);
- }
-
- class AclIterable implements Iterable<AclBinding> {
- private final AclBindingFilter filter;
-
- AclIterable(AclBindingFilter filter) {
- this.filter = filter;
- }
-
- @Override
- public Iterator<AclBinding> iterator() {
- return new AclIterator(filter);
- }
- }
-
- class AclIterator implements Iterator<AclBinding> {
- private final AclBindingFilter filter;
- private final Iterator<StandardAcl> iterator;
- private AclBinding next;
-
- AclIterator(AclBindingFilter filter) {
- this.filter = filter;
- this.iterator = aclsByResource.iterator();
- this.next = null;
- }
-
- @Override
- public boolean hasNext() {
- while (next == null) {
- if (!iterator.hasNext()) return false;
- AclBinding binding = iterator.next().toBinding();
- if (filter.matches(binding)) {
- next = binding;
- }
+ List<AclBinding> aclBindingList = new ArrayList<>();
+ aclsByResource.forEach(acl -> {
+ AclBinding aclBinding = acl.toBinding();
+ if (filter.matches(aclBinding)) {
+ aclBindingList.add(aclBinding);
}
- return true;
- }
-
- @Override
- public AclBinding next() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- AclBinding result = next;
- next = null;
- return result;
- }
+ });
+ return aclBindingList;
}
private interface MatchingRule {
@@ -654,4 +617,12 @@ public class StandardAuthorizerData {
}
}
}
+
+ TreeSet<StandardAcl> getAclsByResource() {
+ return aclsByResource;
+ }
+
+ HashMap<Uuid, StandardAcl> getAclsById() {
+ return aclsById;
+ }
}