This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ae3a6ed KAKFA-10619: Idempotent producer will get authorized once it
has a WRITE access to at least one topic (KIP-679) (#9485)
ae3a6ed is described below
commit ae3a6ed990f91708686d27c6023bac050c422248
Author: Cheng Tan <[email protected]>
AuthorDate: Fri Dec 18 10:08:46 2020 -0800
KAKFA-10619: Idempotent producer will get authorized once it has a WRITE
access to at least one topic (KIP-679) (#9485)
Includes:
- New API to authorize by resource type
- Default implementation for the method that supports super users and ACLs
- Optimized implementation in AclAuthorizer that supports ACLs, super users
and allow.everyone.if.no.acl.found
- Benchmarks and tests
- InitProducerIdRequest authorized for Cluster:IdempotentWrite or WRITE to
any topic, ProduceRequest authorized only for topic even if idempotent
Reviewers: Lucas Bradstreet <[email protected]>, Rajini Sivaram
<[email protected]>
---
checkstyle/suppressions.xml | 6 +-
.../apache/kafka/common/utils/SecurityUtils.java | 30 ++
.../apache/kafka/server/authorizer/Authorizer.java | 155 ++++++++-
.../kafka/security/authorizer/AclAuthorizer.scala | 164 ++++++++-
.../security/authorizer/AuthorizerWrapper.scala | 57 +++-
core/src/main/scala/kafka/server/KafkaApis.scala | 16 +-
.../kafka/api/AuthorizerIntegrationTest.scala | 108 +++++-
.../security/authorizer/AclAuthorizerTest.scala | 68 ++--
.../AuthorizerInterfaceDefaultTest.scala | 94 ++++++
.../authorizer/AuthorizerWrapperTest.scala | 106 ++++++
.../security/authorizer/BaseAuthorizerTest.scala | 375 +++++++++++++++++++++
.../kafka/jmh/acl/AclAuthorizerBenchmark.java | 125 +++++--
12 files changed, 1201 insertions(+), 103 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 7fb20ec..69df37d 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -67,13 +67,13 @@
files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/>
<suppress checks="CyclomaticComplexity"
-
files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory).java"/>
+
files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer).java"/>
<suppress checks="JavaNCSS"
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest).java"/>
<suppress checks="NPathComplexity"
-
files="(ConsumerCoordinator|BufferPool|Fetcher|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|KafkaRaftClient).java"/>
+
files="(ConsumerCoordinator|BufferPool|Fetcher|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|KafkaRaftClient|Authorizer).java"/>
<suppress checks="(JavaNCSS|CyclomaticComplexity|MethodLength)"
files="CoordinatorClient.java"/>
@@ -100,7 +100,7 @@
files="RequestResponseTest.java|FetcherTest.java|KafkaAdminClientTest.java"/>
<suppress checks="NPathComplexity"
- files="MemoryRecordsTest|MetricsTest|TestSslUtils"/>
+
files="MemoryRecordsTest|MetricsTest|TestSslUtils|AclAuthorizerBenchmark"/>
<suppress
checks="(WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
files="Murmur3Test.java"/>
diff --git
a/clients/src/main/java/org/apache/kafka/common/utils/SecurityUtils.java
b/clients/src/main/java/org/apache/kafka/common/utils/SecurityUtils.java
index 12defdc..88a4cfc 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/SecurityUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/SecurityUtils.java
@@ -19,6 +19,8 @@ package org.apache.kafka.common.utils;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.SecurityConfig;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.SecurityProviderCreator;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
@@ -146,4 +148,32 @@ public class SecurityUtils {
}
return builder.toString();
}
+
+ public static void authorizeByResourceTypeCheckArgs(AclOperation op,
+ ResourceType type) {
+ if (type == ResourceType.ANY) {
+ throw new IllegalArgumentException(
+ "Must specify a non-filter resource type for
authorizeByResourceType");
+ }
+
+ if (type == ResourceType.UNKNOWN) {
+ throw new IllegalArgumentException(
+ "Unknown resource type");
+ }
+
+ if (op == AclOperation.ANY) {
+ throw new IllegalArgumentException(
+ "Must specify a non-filter operation type for
authorizeByResourceType");
+ }
+
+ if (op == AclOperation.UNKNOWN) {
+ throw new IllegalArgumentException(
+ "Unknown operation type");
+ }
+ }
+
+ public static boolean denyAll(ResourcePattern pattern) {
+ return pattern.patternType() == PatternType.LITERAL
+ && pattern.name().equals(ResourcePattern.WILDCARD_RESOURCE);
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
index 1865e7e..17348a7 100644
--- a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
+++ b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
@@ -17,16 +17,29 @@
package org.apache.kafka.server.authorizer;
-import java.io.Closeable;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletionStage;
-
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourcePatternFilter;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.SecurityUtils;
+
+import java.io.Closeable;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletionStage;
/**
*
@@ -139,4 +152,136 @@ public interface Authorizer extends Configurable,
Closeable {
* @return Iterator for ACL bindings, which may be populated lazily.
*/
Iterable<AclBinding> acls(AclBindingFilter filter);
+
+ /**
+ * Check if the caller is authorized to perform theĀ given ACL operation on
at least one
+ * resource of the given type.
+ *
+ * Custom authorizer implementations should consider overriding this
default implementation because:
+ * 1. The default implementation iterates all AclBindings multiple times,
without any caching
+ * by principal, host, operation, permission types, and resource types.
More efficient
+ * implementations may be added in custom authorizers that directly
access cached entries.
+ * 2. The default implementation cannot integrate with any audit logging
included in the
+ * authorizer implementation.
+ * 3. The default implementation does not support any custom authorizer
configs or other access
+ * rules apart from ACLs.
+ *
+ * @param requestContext Request context including request resourceType,
security protocol and listener name
+ * @param op The ACL operation to check
+ * @param resourceType The resource type to check
+ * @return Return {@link AuthorizationResult#ALLOWED} if the
caller is authorized
+ * to perform the given ACL operation on at least
one resource of the
+ * given type. Return {@link
AuthorizationResult#DENIED} otherwise.
+ */
+ default AuthorizationResult
authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation
op, ResourceType resourceType) {
+ SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType);
+
+ // Check a hard-coded name to ensure that super users are granted
+ // access regardless of DENY ACLs.
+ if (authorize(requestContext, Collections.singletonList(new Action(
+ op, new ResourcePattern(resourceType, "hardcode",
PatternType.LITERAL),
+ 0, true, false)))
+ .get(0) == AuthorizationResult.ALLOWED) {
+ return AuthorizationResult.ALLOWED;
+ }
+
+ // Filter out all the resource pattern corresponding to the
RequestContext,
+ // AclOperation, and ResourceType
+ ResourcePatternFilter resourceTypeFilter = new ResourcePatternFilter(
+ resourceType, null, PatternType.ANY);
+ AclBindingFilter aclFilter = new AclBindingFilter(
+ resourceTypeFilter, AccessControlEntryFilter.ANY);
+
+ EnumMap<PatternType, Set<String>> denyPatterns =
+ new EnumMap<PatternType, Set<String>>(PatternType.class) {{
+ put(PatternType.LITERAL, new HashSet<>());
+ put(PatternType.PREFIXED, new HashSet<>());
+ }};
+ EnumMap<PatternType, Set<String>> allowPatterns =
+ new EnumMap<PatternType, Set<String>>(PatternType.class) {{
+ put(PatternType.LITERAL, new HashSet<>());
+ put(PatternType.PREFIXED, new HashSet<>());
+ }};
+
+ boolean hasWildCardAllow = false;
+
+ KafkaPrincipal principal = new KafkaPrincipal(
+ requestContext.principal().getPrincipalType(),
+ requestContext.principal().getName());
+ String hostAddr = requestContext.clientAddress().getHostAddress();
+
+ for (AclBinding binding : acls(aclFilter)) {
+ if (!binding.entry().host().equals(hostAddr) &&
!binding.entry().host().equals("*"))
+ continue;
+
+ if
(!SecurityUtils.parseKafkaPrincipal(binding.entry().principal()).equals(principal)
+ && !binding.entry().principal().equals("User:*"))
+ continue;
+
+ if (binding.entry().operation() != op
+ && binding.entry().operation() != AclOperation.ALL)
+ continue;
+
+ if (binding.entry().permissionType() == AclPermissionType.DENY) {
+ switch (binding.pattern().patternType()) {
+ case LITERAL:
+ // If wildcard deny exists, return deny directly
+ if
(binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE))
+ return AuthorizationResult.DENIED;
+
denyPatterns.get(PatternType.LITERAL).add(binding.pattern().name());
+ break;
+ case PREFIXED:
+
denyPatterns.get(PatternType.PREFIXED).add(binding.pattern().name());
+ break;
+ default:
+ }
+ continue;
+ }
+
+ if (binding.entry().permissionType() != AclPermissionType.ALLOW)
+ continue;
+
+ switch (binding.pattern().patternType()) {
+ case LITERAL:
+ if
(binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE)) {
+ hasWildCardAllow = true;
+ continue;
+ }
+
allowPatterns.get(PatternType.LITERAL).add(binding.pattern().name());
+ break;
+ case PREFIXED:
+
allowPatterns.get(PatternType.PREFIXED).add(binding.pattern().name());
+ break;
+ default:
+ }
+ }
+
+ if (hasWildCardAllow) {
+ return AuthorizationResult.ALLOWED;
+ }
+
+ // For any literal allowed, if there's no dominant literal and prefix
denied, return allow.
+ // For any prefix allowed, if there's no dominant prefix denied,
return allow.
+ for (Map.Entry<PatternType, Set<String>> entry :
allowPatterns.entrySet()) {
+ for (String allowStr : entry.getValue()) {
+ if (entry.getKey() == PatternType.LITERAL
+ &&
denyPatterns.get(PatternType.LITERAL).contains(allowStr))
+ continue;
+ StringBuilder sb = new StringBuilder();
+ boolean hasDominatedDeny = false;
+ for (char ch : allowStr.toCharArray()) {
+ sb.append(ch);
+ if
(denyPatterns.get(PatternType.PREFIXED).contains(sb.toString())) {
+ hasDominatedDeny = true;
+ break;
+ }
+ }
+ if (!hasDominatedDeny)
+ return AuthorizationResult.ALLOWED;
+ }
+ }
+
+ return AuthorizationResult.DENIED;
+ }
+
}
diff --git a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
index 0a60e51..475baef 100644
--- a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
@@ -42,7 +42,7 @@ import org.apache.zookeeper.client.ZKClientConfig
import scala.annotation.nowarn
import scala.collection.mutable.ArrayBuffer
-import scala.collection.{Seq, mutable}
+import scala.collection.{Seq, immutable, mutable}
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Random, Success, Try}
@@ -130,6 +130,11 @@ class AclAuthorizer extends Authorizer with Logging {
@volatile
private var aclCache = new
scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(new
ResourceOrdering)
+
+ @volatile
+ private var resourceCache = new
scala.collection.immutable.HashMap[ResourceTypeKey,
+ scala.collection.immutable.HashSet[String]]()
+
private val lock = new Object()
// The maximum number of times we should try to update the resource acls in
zookeeper before failing;
@@ -304,6 +309,130 @@ class AclAuthorizer extends Authorizer with Logging {
if (zkClient != null) zkClient.close()
}
+ override def authorizeByResourceType(requestContext:
AuthorizableRequestContext,
+ op: AclOperation,
+ resourceType: ResourceType):
AuthorizationResult = {
+ SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+ val principal = new KafkaPrincipal(
+ requestContext.principal().getPrincipalType,
+ requestContext.principal().getName)
+
+ if (isSuperUser(principal))
+ return AuthorizationResult.ALLOWED
+
+ val resourceSnapshot = resourceCache
+ val principalStr = principal.toString
+ val host = requestContext.clientAddress().getHostAddress
+ val action = new Action(op, new ResourcePattern(resourceType, "NONE",
PatternType.UNKNOWN), 0, true, true)
+
+ val denyLiterals = matchingResources(
+ resourceSnapshot, principalStr, host, op, AclPermissionType.DENY,
resourceType, PatternType.LITERAL)
+
+ if (denyAll(denyLiterals)) {
+ logAuditMessage(requestContext, action, authorized = false)
+ return AuthorizationResult.DENIED
+ }
+
+ if (shouldAllowEveryoneIfNoAclIsFound) {
+ logAuditMessage(requestContext, action, authorized = true)
+ return AuthorizationResult.ALLOWED
+ }
+
+ val denyPrefixes = matchingResources(
+ resourceSnapshot, principalStr, host, op, AclPermissionType.DENY,
resourceType, PatternType.PREFIXED)
+
+ if (denyLiterals.isEmpty && denyPrefixes.isEmpty) {
+ if (hasMatchingResources(resourceSnapshot, principalStr, host, op,
AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED)
+ || hasMatchingResources(resourceSnapshot, principalStr, host, op,
AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) {
+ logAuditMessage(requestContext, action, authorized = true)
+ return AuthorizationResult.ALLOWED
+ } else {
+ logAuditMessage(requestContext, action, authorized = false)
+ return AuthorizationResult.DENIED
+ }
+ }
+
+ val allowLiterals = matchingResources(
+ resourceSnapshot, principalStr, host, op, AclPermissionType.ALLOW,
resourceType, PatternType.LITERAL)
+ val allowPrefixes = matchingResources(
+ resourceSnapshot, principalStr, host, op, AclPermissionType.ALLOW,
resourceType, PatternType.PREFIXED)
+
+ if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+ logAuditMessage(requestContext, action, authorized = true)
+ return AuthorizationResult.ALLOWED
+ }
+
+ logAuditMessage(requestContext, action, authorized = false)
+ AuthorizationResult.DENIED
+ }
+
+ private def matchingResources(resourceSnapshot:
immutable.Map[ResourceTypeKey, immutable.Set[String]],
+ principal: String, host: String, op:
AclOperation, permission: AclPermissionType,
+ resourceType: ResourceType, patternType:
PatternType): ArrayBuffer[Set[String]] = {
+ val matched = ArrayBuffer[immutable.Set[String]]()
+ for (p <- Set(principal, AclEntry.WildcardPrincipalString);
+ h <- Set(host, AclEntry.WildcardHost);
+ o <- Set(op, AclOperation.ALL)) {
+ val resourceTypeKey = ResourceTypeKey(
+ new AccessControlEntry(p, h, o, permission), resourceType, patternType)
+ resourceSnapshot.get(resourceTypeKey) match {
+ case Some(resources) => matched += resources
+ case None =>
+ }
+ }
+ matched
+ }
+
+ private def hasMatchingResources(resourceSnapshot:
immutable.Map[ResourceTypeKey, immutable.Set[String]],
+ principal: String, host: String, op:
AclOperation, permission: AclPermissionType,
+ resourceType: ResourceType, patternType:
PatternType): Boolean = {
+ for (p <- Set(principal, AclEntry.WildcardPrincipalString);
+ h <- Set(host, AclEntry.WildcardHost);
+ o <- Set(op, AclOperation.ALL)) {
+ val resourceTypeKey = ResourceTypeKey(
+ new AccessControlEntry(p, h, o, permission), resourceType,
patternType)
+ if (resourceSnapshot.contains(resourceTypeKey))
+ return true
+ }
+ false
+ }
+
+ private def denyAll(denyLiterals: ArrayBuffer[immutable.Set[String]]):
Boolean =
+ denyLiterals.exists(_.contains(ResourcePattern.WILDCARD_RESOURCE))
+
+
+ private def allowAny(allowLiterals: ArrayBuffer[immutable.Set[String]],
allowPrefixes: ArrayBuffer[immutable.Set[String]],
+ denyLiterals: ArrayBuffer[immutable.Set[String]],
denyPrefixes: ArrayBuffer[immutable.Set[String]]): Boolean = {
+ (allowPrefixes.exists(_.exists(prefix => allowPrefix(prefix,
denyPrefixes)))
+ || allowLiterals.exists(_.exists(literal => allowLiteral(literal,
denyLiterals, denyPrefixes))))
+ }
+
+ private def allowLiteral(literalName: String, denyLiterals:
ArrayBuffer[immutable.Set[String]],
+ denyPrefixes: ArrayBuffer[immutable.Set[String]]):
Boolean = {
+ literalName match {
+ case ResourcePattern.WILDCARD_RESOURCE => true
+ case _ => !denyLiterals.exists(_.contains(literalName)) &&
!hasDominantPrefixedDeny(literalName, denyPrefixes)
+ }
+ }
+
+ private def allowPrefix(prefixName: String,
+ denyPrefixes: ArrayBuffer[immutable.Set[String]]):
Boolean = {
+ !hasDominantPrefixedDeny(prefixName, denyPrefixes)
+ }
+
+ private def hasDominantPrefixedDeny(resourceName: String, denyPrefixes:
ArrayBuffer[immutable.Set[String]]): Boolean = {
+ val sb = new StringBuilder
+ for (ch <- resourceName.toCharArray) {
+ sb.append(ch)
+ if (denyPrefixes.exists(p => p.contains(sb.toString()))) {
+ return true
+ }
+ }
+ false
+ }
+
+
private def authorizeAction(requestContext: AuthorizableRequestContext,
action: Action): AuthorizationResult = {
val resource = action.resourcePattern
if (resource.patternType != PatternType.LITERAL) {
@@ -547,7 +676,34 @@ class AclAuthorizer extends Authorizer with Logging {
zkClient.getVersionedAclsForResource(resource)
}
- private def updateCache(resource: ResourcePattern, versionedAcls:
VersionedAcls): Unit = {
+ // Visible for benchmark
+ def updateCache(resource: ResourcePattern, versionedAcls: VersionedAcls):
Unit = {
+ val currentAces: Set[AccessControlEntry] =
aclCache.get(resource).map(_.acls.map(_.ace)).getOrElse(Set.empty)
+ val newAces: Set[AccessControlEntry] = versionedAcls.acls.map(aclEntry =>
aclEntry.ace)
+ val acesToAdd = newAces.diff(currentAces)
+ val acesToRemove = currentAces.diff(newAces)
+
+ acesToAdd.foreach(ace => {
+ val resourceTypeKey = ResourceTypeKey(ace, resource.resourceType(),
resource.patternType())
+ resourceCache.get(resourceTypeKey) match {
+ case Some(resources) => resourceCache += (resourceTypeKey ->
(resources + resource.name()))
+ case None => resourceCache += (resourceTypeKey ->
immutable.HashSet(resource.name()))
+ }
+ })
+ acesToRemove.foreach(ace => {
+ val resourceTypeKey = ResourceTypeKey(ace, resource.resourceType(),
resource.patternType())
+ resourceCache.get(resourceTypeKey) match {
+ case Some(resources) =>
+ val newResources = resources - resource.name()
+ if (newResources.isEmpty) {
+ resourceCache -= resourceTypeKey
+ } else {
+ resourceCache += (resourceTypeKey -> newResources)
+ }
+ case None =>
+ }
+ })
+
if (versionedAcls.acls.nonEmpty) {
aclCache = aclCache.updated(resource, versionedAcls)
} else {
@@ -582,4 +738,8 @@ class AclAuthorizer extends Authorizer with Logging {
processAclChangeNotification(resource)
}
}
+
+ private case class ResourceTypeKey(ace: AccessControlEntry,
+ resourceType: ResourceType,
+ patternType: PatternType)
}
diff --git
a/core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
b/core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
index 9a8bf9d..cc25fce 100644
--- a/core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
+++ b/core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
@@ -21,21 +21,23 @@ import java.util.concurrent.{CompletableFuture,
CompletionStage}
import java.{lang, util}
import kafka.network.RequestChannel.Session
-import kafka.security.auth.{Acl, Operation, PermissionType, Resource,
ResourceType}
+import kafka.security.auth.{Acl, Operation, PermissionType, Resource,
SimpleAclAuthorizer, ResourceType => ResourceTypeLegacy}
import kafka.security.authorizer.AuthorizerWrapper._
import org.apache.kafka.common.Endpoint
-import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding,
AclBindingFilter}
+import org.apache.kafka.common.acl._
import org.apache.kafka.common.errors.{ApiException, InvalidRequestException}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.ApiError
-import org.apache.kafka.common.resource.ResourcePattern
+import org.apache.kafka.common.resource.{PatternType, ResourcePattern,
ResourcePatternFilter, ResourceType}
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.utils.SecurityUtils
import org.apache.kafka.common.utils.SecurityUtils.parseKafkaPrincipal
import
org.apache.kafka.server.authorizer.AclDeleteResult.AclBindingDeleteResult
import org.apache.kafka.server.authorizer.{AuthorizableRequestContext,
AuthorizerServerInfo, _}
-import scala.jdk.CollectionConverters._
import scala.collection.mutable.ArrayBuffer
import scala.collection.{Seq, immutable, mutable}
+import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}
@deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5")
@@ -43,7 +45,7 @@ object AuthorizerWrapper {
def convertToResourceAndAcl(filter: AclBindingFilter): Either[ApiError,
(Resource, Acl)] = {
(for {
- resourceType <-
Try(ResourceType.fromJava(filter.patternFilter.resourceType))
+ resourceType <-
Try(ResourceTypeLegacy.fromJava(filter.patternFilter.resourceType))
principal <- Try(parseKafkaPrincipal(filter.entryFilter.principal))
operation <- Try(Operation.fromJava(filter.entryFilter.operation))
permissionType <-
Try(PermissionType.fromJava(filter.entryFilter.permissionType))
@@ -71,15 +73,20 @@ object AuthorizerWrapper {
}
def convertToResource(resourcePattern: ResourcePattern): Resource = {
- Resource(ResourceType.fromJava(resourcePattern.resourceType),
resourcePattern.name, resourcePattern.patternType)
+ Resource(ResourceTypeLegacy.fromJava(resourcePattern.resourceType),
resourcePattern.name, resourcePattern.patternType)
}
}
@deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5")
class AuthorizerWrapper(private[kafka] val baseAuthorizer:
kafka.security.auth.Authorizer) extends Authorizer {
+ var shouldAllowEveryoneIfNoAclIsFound = false
+
override def configure(configs: util.Map[String, _]): Unit = {
baseAuthorizer.configure(configs)
+ shouldAllowEveryoneIfNoAclIsFound = (configs.asScala.get(
+
AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean)
+ && baseAuthorizer.isInstanceOf[SimpleAclAuthorizer])
}
override def start(serverInfo: AuthorizerServerInfo): util.Map[Endpoint, _
<: CompletionStage[Void]] = {
@@ -175,4 +182,42 @@ class AuthorizerWrapper(private[kafka] val baseAuthorizer:
kafka.security.auth.A
override def close(): Unit = {
baseAuthorizer.close()
}
+
+ override def authorizeByResourceType(requestContext:
AuthorizableRequestContext,
+ op: AclOperation,
+ resourceType: ResourceType):
AuthorizationResult = {
+ SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+ if (super.authorizeByResourceType(requestContext, op, resourceType) ==
AuthorizationResult.ALLOWED)
+ AuthorizationResult.ALLOWED
+ else if (denyAllResource(requestContext, op, resourceType) ||
!shouldAllowEveryoneIfNoAclIsFound)
+ AuthorizationResult.DENIED
+ else
+ AuthorizationResult.ALLOWED
+ }
+
+ private def denyAllResource(requestContext: AuthorizableRequestContext,
+ op: AclOperation,
+ resourceType: ResourceType): Boolean = {
+ val resourceTypeFilter = new ResourcePatternFilter(
+ resourceType, Resource.WildCardResource, PatternType.LITERAL)
+ val principal = new KafkaPrincipal(
+ requestContext.principal.getPrincipalType,
requestContext.principal.getName).toString
+ val host = requestContext.clientAddress().getHostAddress
+ val entryFilter = new AccessControlEntryFilter(null, null, op,
AclPermissionType.DENY)
+ val entryFilterAllOp = new AccessControlEntryFilter(null, null,
AclOperation.ALL, AclPermissionType.DENY)
+ val aclFilter = new AclBindingFilter(resourceTypeFilter, entryFilter)
+ val aclFilterAllOp = new AclBindingFilter(resourceTypeFilter,
entryFilterAllOp)
+
+ (acls(aclFilter).asScala.exists(b => principalHostMatch(b.entry(),
principal, host))
+ || acls(aclFilterAllOp).asScala.exists(b =>
principalHostMatch(b.entry(), principal, host)))
+ }
+
+ private def principalHostMatch(ace: AccessControlEntry,
+ principal: String,
+ host: String): Boolean = {
+ ((ace.host() == AclEntry.WildcardHost || ace.host() == host)
+ && (ace.principal() == AclEntry.WildcardPrincipalString ||
ace.principal() == principal))
+ }
+
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index dcb098b..f4758a6 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -581,11 +581,6 @@ class KafkaApis(val requestChannel: RequestChannel,
sendErrorResponseMaybeThrottle(request,
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
return
}
- // Note that authorization to a transactionalId implies ProducerId
authorization
-
- } else if (hasIdempotentRecords && !authorize(request.context,
IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME)) {
- sendErrorResponseMaybeThrottle(request,
Errors.CLUSTER_AUTHORIZATION_FAILED.exception)
- return
}
val unauthorizedTopicResponses = mutable.Map[TopicPartition,
PartitionResponse]()
@@ -2128,7 +2123,8 @@ class KafkaApis(val requestChannel: RequestChannel,
sendErrorResponseMaybeThrottle(request,
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
return
}
- } else if (!authorize(request.context, IDEMPOTENT_WRITE, CLUSTER,
CLUSTER_NAME)) {
+ } else if (!authorize(request.context, IDEMPOTENT_WRITE, CLUSTER,
CLUSTER_NAME, true, false)
+ && !authorizeByResourceType(request.context, AclOperation.WRITE,
ResourceType.TOPIC)) {
sendErrorResponseMaybeThrottle(request,
Errors.CLUSTER_AUTHORIZATION_FAILED.exception)
return
}
@@ -3296,6 +3292,14 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
+ private def authorizeByResourceType(requestContext: RequestContext,
+ operation: AclOperation,
+ resourceType: ResourceType): Boolean = {
+ authorizer.forall { authZ =>
+ authZ.authorizeByResourceType(requestContext, operation, resourceType)
== AuthorizationResult.ALLOWED
+ }
+ }
+
// private package for testing
private[server] def filterByAuthorized[T](requestContext: RequestContext,
operation: AclOperation,
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index e1ee87a..7dd12e4 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -29,7 +29,7 @@ import org.apache.kafka.clients.consumer._
import
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.acl.AclOperation._
-import org.apache.kafka.common.acl.AclPermissionType.ALLOW
+import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
import org.apache.kafka.common.acl.{AccessControlEntry,
AccessControlEntryFilter, AclBindingFilter, AclOperation, AclPermissionType}
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig}
@@ -52,7 +52,7 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{CompressionType, MemoryRecords,
RecordBatch, Records, SimpleRecord}
import org.apache.kafka.common.requests._
-import org.apache.kafka.common.resource.PatternType.LITERAL
+import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
import org.apache.kafka.common.resource.ResourceType._
import org.apache.kafka.common.resource.{PatternType, Resource,
ResourcePattern, ResourcePatternFilter, ResourceType}
import org.apache.kafka.common.security.auth.{AuthenticationContext,
KafkaPrincipal, KafkaPrincipalBuilder, SecurityProtocol}
@@ -1612,14 +1612,18 @@ class AuthorizerIntegrationTest extends BaseRequestTest
{
@Test
def testIdempotentProducerNoIdempotentWriteAclInInitProducerId(): Unit = {
createTopic(topic)
+ addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString,
WildcardHost, READ, ALLOW)), topicResource)
+ shouldIdempotentProducerFailInInitProducerId(true)
+ }
- addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString,
WildcardHost, WRITE, ALLOW)), topicResource)
+ def shouldIdempotentProducerFailInInitProducerId(expectAuthException:
Boolean): Unit = {
val producer = buildIdempotentProducer()
try {
// the InitProducerId is sent asynchronously, so we expect the error
either in the callback
// or raised from send itself
producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic,
"hi".getBytes)).get()
- fail("Should have raised ClusterAuthorizationException")
+ if (expectAuthException)
+ fail("Should have raised ClusterAuthorizationException")
} catch {
case e: ExecutionException =>
assertTrue(e.getCause.isInstanceOf[ClusterAuthorizationException])
@@ -1628,7 +1632,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
// the second time, the call to send itself should fail (the producer
becomes unusable
// if no producerId can be obtained)
producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic,
"hi".getBytes)).get()
- fail("Should have raised ClusterAuthorizationException")
+ if (expectAuthException)
+ fail("Should have raised ClusterAuthorizationException")
} catch {
case e: ExecutionException =>
assertTrue(e.getCause.isInstanceOf[ClusterAuthorizationException])
@@ -1638,18 +1643,20 @@ class AuthorizerIntegrationTest extends BaseRequestTest
{
@Test
def testIdempotentProducerNoIdempotentWriteAclInProduce(): Unit = {
createTopic(topic)
-
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString,
WildcardHost, WRITE, ALLOW)), topicResource)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString,
WildcardHost, IDEMPOTENT_WRITE, ALLOW)), clusterResource)
+ idempotentProducerShouldFailInProduce(() => removeAllClientAcls())
+ }
+ def idempotentProducerShouldFailInProduce(removeAclIdempotenceRequired: ()
=> Unit): Unit = {
val producer = buildIdempotentProducer()
// first send should be fine since we have permission to get a ProducerId
producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic,
"hi".getBytes)).get()
// revoke the IdempotentWrite permission
- removeAllClientAcls()
- addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString,
WildcardHost, WRITE, ALLOW)), topicResource)
+ removeAclIdempotenceRequired()
+ addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString,
WildcardHost, DESCRIBE, ALLOW)), topicResource)
try {
// the send should now fail with a cluster auth error
@@ -1657,7 +1664,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
fail("Should have raised ClusterAuthorizationException")
} catch {
case e: ExecutionException =>
- assertTrue(e.getCause.isInstanceOf[ClusterAuthorizationException])
+ assertTrue(e.getCause.isInstanceOf[TopicAuthorizationException])
}
try {
// the second time, the call to send itself should fail (the producer
becomes unusable
@@ -1666,7 +1673,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
fail("Should have raised ClusterAuthorizationException")
} catch {
case e: ExecutionException =>
- assertTrue(e.getCause.isInstanceOf[ClusterAuthorizationException])
+ assertTrue(e.getCause.isInstanceOf[TopicAuthorizationException])
}
}
@@ -1792,6 +1799,87 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
assertFalse("Cluster id not returned", response.clusterId.isEmpty)
}
+ @Test
+ def testAuthorizeByResourceTypeMultipleAddAndRemove(): Unit = {
+ createTopic(topic)
+
+ for (_ <- 1 to 3) {
+ addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString,
WildcardHost, DESCRIBE, ALLOW)), topicResource)
+ shouldIdempotentProducerFailInInitProducerId(true)
+
+ addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString,
WildcardHost, WRITE, ALLOW)), topicResource)
+ shouldIdempotentProducerFailInInitProducerId(false)
+
+ removeAllClientAcls()
+ addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString,
WildcardHost, DESCRIBE, ALLOW)), topicResource)
+ shouldIdempotentProducerFailInInitProducerId(true)
+ }
+ }
+
+ @Test
+ def testAuthorizeByResourceTypeIsolationUnrelatedDenyWontDominateAllow():
Unit = {
+ createTopic(topic)
+ createTopic("topic-2")
+ createTopic("to")
+
+ val unrelatedPrincipalString = new
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "unrelated").toString
+ val unrelatedTopicResource = new ResourcePattern(TOPIC, "topic-2", LITERAL)
+ val unrelatedGroupResource = new ResourcePattern(GROUP, "to", PREFIXED)
+
+ val acl1 = new AccessControlEntry(clientPrincipalString, WildcardHost,
READ, DENY)
+ val acl2 = new AccessControlEntry(unrelatedPrincipalString, WildcardHost,
READ, DENY)
+ val acl3 = new AccessControlEntry(clientPrincipalString, WildcardHost,
WRITE, DENY)
+ val acl4 = new AccessControlEntry(clientPrincipalString, WildcardHost,
WRITE, ALLOW)
+ val acl5 = new AccessControlEntry(clientPrincipalString, WildcardHost,
DESCRIBE, ALLOW)
+
+ addAndVerifyAcls(Set(acl1, acl4, acl5), topicResource)
+ addAndVerifyAcls(Set(acl2, acl3), unrelatedTopicResource)
+ addAndVerifyAcls(Set(acl2, acl3), unrelatedGroupResource)
+ shouldIdempotentProducerFailInInitProducerId(false)
+ }
+
+ @Test
+ def testAuthorizeByResourceTypeDenyTakesPrecedence(): Unit = {
+ createTopic(topic)
+ val allowWriteAce = new AccessControlEntry(clientPrincipalString,
WildcardHost, WRITE, ALLOW)
+ addAndVerifyAcls(Set(allowWriteAce), topicResource)
+ shouldIdempotentProducerFailInInitProducerId(false)
+
+ val denyWriteAce = new AccessControlEntry(clientPrincipalString,
WildcardHost, WRITE, DENY)
+ addAndVerifyAcls(Set(denyWriteAce), topicResource)
+ shouldIdempotentProducerFailInInitProducerId(true)
+ }
+
+ @Test
+ def testAuthorizeByResourceTypeWildcardResourceDenyDominate(): Unit = {
+ createTopic(topic)
+ val wildcard = new ResourcePattern(TOPIC,
ResourcePattern.WILDCARD_RESOURCE, LITERAL)
+ val prefixed = new ResourcePattern(TOPIC, "t", PREFIXED)
+ val literal = new ResourcePattern(TOPIC, topic, LITERAL)
+ val allowWriteAce = new AccessControlEntry(clientPrincipalString,
WildcardHost, WRITE, ALLOW)
+ val denyWriteAce = new AccessControlEntry(clientPrincipalString,
WildcardHost, WRITE, DENY)
+
+ addAndVerifyAcls(Set(allowWriteAce), prefixed)
+ addAndVerifyAcls(Set(allowWriteAce), literal)
+ shouldIdempotentProducerFailInInitProducerId(false)
+
+ addAndVerifyAcls(Set(denyWriteAce), wildcard)
+ shouldIdempotentProducerFailInInitProducerId(true)
+ }
+
+ @Test
+ def testAuthorizeByResourceTypePrefixedResourceDenyDominate(): Unit = {
+ createTopic(topic)
+ val prefixed = new ResourcePattern(TOPIC, topic.substring(0, 1), PREFIXED)
+ val literal = new ResourcePattern(TOPIC, topic, LITERAL)
+ val allowWriteAce = new AccessControlEntry(clientPrincipalString,
WildcardHost, WRITE, ALLOW)
+ val denyWriteAce = new AccessControlEntry(clientPrincipalString,
WildcardHost, WRITE, DENY)
+
+ addAndVerifyAcls(Set(denyWriteAce), prefixed)
+ addAndVerifyAcls(Set(allowWriteAce), literal)
+ shouldIdempotentProducerFailInInitProducerId(true)
+ }
+
def removeAllClientAcls(): Unit = {
val authorizer = servers.head.dataPlaneRequestProcessor.authorizer.get
val aclEntryFilter = new AccessControlEntryFilter(clientPrincipalString,
null, AclOperation.ANY, AclPermissionType.ANY)
diff --git
a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
b/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
index a5c57b6..dff935f 100644
--- a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
@@ -34,16 +34,13 @@ import org.apache.kafka.common.acl._
import org.apache.kafka.common.acl.AclOperation._
import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
import org.apache.kafka.common.errors.{ApiException,
UnsupportedVersionException}
-import org.apache.kafka.common.network.ClientInformation
-import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.protocol.ApiKeys
-import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
+import org.apache.kafka.common.requests.RequestContext
import org.apache.kafka.common.resource.{PatternType, ResourcePattern,
ResourcePatternFilter, ResourceType}
import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
import org.apache.kafka.common.resource.ResourcePattern.WILDCARD_RESOURCE
import org.apache.kafka.common.resource.ResourceType._
import org.apache.kafka.common.resource.PatternType.{LITERAL, MATCH, PREFIXED}
-import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.server.authorizer._
import org.apache.kafka.common.utils.{Time, SecurityUtils => JSecurityUtils}
import org.junit.Assert._
@@ -53,7 +50,7 @@ import org.scalatest.Assertions.intercept
import scala.jdk.CollectionConverters._
import scala.collection.mutable
-class AclAuthorizerTest extends ZooKeeperTestHarness {
+class AclAuthorizerTest extends ZooKeeperTestHarness with BaseAuthorizerTest {
private val allowReadAcl = new AccessControlEntry(WildcardPrincipalString,
WildcardHost, READ, ALLOW)
private val allowWriteAcl = new AccessControlEntry(WildcardPrincipalString,
WildcardHost, WRITE, ALLOW)
@@ -66,18 +63,13 @@ class AclAuthorizerTest extends ZooKeeperTestHarness {
private val aclAuthorizer = new AclAuthorizer
private val aclAuthorizer2 = new AclAuthorizer
- private var resource: ResourcePattern = _
- private val superUsers = "User:superuser1; User:superuser2"
- private val username = "alice"
- private val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
username)
- private val requestContext = newRequestContext(principal,
InetAddress.getByName("192.168.0.1"))
- private var config: KafkaConfig = _
- private var zooKeeperClient: ZooKeeperClient = _
class CustomPrincipal(principalType: String, name: String) extends
KafkaPrincipal(principalType, name) {
override def equals(o: scala.Any): Boolean = false
}
+ override def authorizer: Authorizer = aclAuthorizer
+
@Before
override def setUp(): Unit = {
super.setUp()
@@ -988,6 +980,26 @@ class AclAuthorizerTest extends ZooKeeperTestHarness {
}
}
+ @Test
+ def testAuthorizeByResourceTypeNoAclFoundOverride(): Unit = {
+ val props = TestUtils.createBrokerConfig(1, zkConnect)
+ props.put(AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, "true")
+
+ val cfg = KafkaConfig.fromProps(props)
+ val aclAuthorizer = new AclAuthorizer
+ try {
+ aclAuthorizer.configure(cfg.originals)
+ assertTrue("If allow.everyone.if.no.acl.found = true, " +
+ "caller should have read access to at least one topic",
+ authorizeByResourceType(aclAuthorizer, requestContext, READ,
resource.resourceType()))
+ assertTrue("If allow.everyone.if.no.acl.found = true, " +
+ "caller should have write access to at least one topic",
+ authorizeByResourceType(aclAuthorizer, requestContext, WRITE,
resource.resourceType()))
+ } finally {
+ aclAuthorizer.close()
+ }
+ }
+
private def givenAuthorizerWithProtocolVersion(protocolVersion:
Option[ApiVersion]): Unit = {
aclAuthorizer.close()
@@ -1033,41 +1045,11 @@ class AclAuthorizerTest extends ZooKeeperTestHarness {
acls
}
- private def newRequestContext(principal: KafkaPrincipal, clientAddress:
InetAddress, apiKey: ApiKeys = ApiKeys.PRODUCE): RequestContext = {
- val securityProtocol = SecurityProtocol.SASL_PLAINTEXT
- val header = new RequestHeader(apiKey, 2, "", 1) //ApiKeys apiKey, short
version, String clientId, int correlation
- new RequestContext(header, "", clientAddress, principal,
ListenerName.forSecurityProtocol(securityProtocol),
- securityProtocol, ClientInformation.EMPTY, false)
- }
-
private def authorize(authorizer: AclAuthorizer, requestContext:
RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = {
val action = new Action(operation, resource, 1, true, true)
authorizer.authorize(requestContext, List(action).asJava).asScala.head ==
AuthorizationResult.ALLOWED
}
- private def addAcls(authorizer: AclAuthorizer, aces:
Set[AccessControlEntry], resourcePattern: ResourcePattern): Unit = {
- val bindings = aces.map { ace => new AclBinding(resourcePattern, ace) }
- authorizer.createAcls(requestContext, bindings.toList.asJava).asScala
- .map(_.toCompletableFuture.get)
- .foreach { result => result.exception.ifPresent { e => throw e } }
- }
-
- private def removeAcls(authorizer: AclAuthorizer, aces:
Set[AccessControlEntry], resourcePattern: ResourcePattern): Boolean = {
- val bindings = if (aces.isEmpty)
- Set(new AclBindingFilter(resourcePattern.toFilter,
AccessControlEntryFilter.ANY) )
- else
- aces.map { ace => new AclBinding(resourcePattern, ace).toFilter }
- authorizer.deleteAcls(requestContext, bindings.toList.asJava).asScala
- .map(_.toCompletableFuture.get)
- .forall { result =>
- result.exception.ifPresent { e => throw e }
- result.aclBindingDeleteResults.forEach { r =>
- r.exception.ifPresent { e => throw e }
- }
- !result.aclBindingDeleteResults.isEmpty
- }
- }
-
private def getAcls(authorizer: AclAuthorizer, resourcePattern:
ResourcePattern): Set[AccessControlEntry] = {
val acls = authorizer.acls(new AclBindingFilter(resourcePattern.toFilter,
AccessControlEntryFilter.ANY)).asScala.toSet
acls.map(_.entry)
diff --git
a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala
b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala
new file mode 100644
index 0000000..11cb9c7
--- /dev/null
+++
b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.security.authorizer
+
+import java.util.concurrent.CompletionStage
+import java.{lang, util}
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import kafka.zk.ZooKeeperTestHarness
+import kafka.zookeeper.ZooKeeperClient
+import org.apache.kafka.common.Endpoint
+import org.apache.kafka.common.acl._
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer._
+import org.junit.{After, Before}
+
+class AuthorizerInterfaceDefaultTest extends ZooKeeperTestHarness with
BaseAuthorizerTest {
+
+ private val interfaceDefaultAuthorizer = new DelegateAuthorizer
+
+ override def authorizer: Authorizer = interfaceDefaultAuthorizer
+
+ @Before
+ override def setUp(): Unit = {
+ super.setUp()
+
+ // Increase maxUpdateRetries to avoid transient failures
+ interfaceDefaultAuthorizer.authorizer.maxUpdateRetries = Int.MaxValue
+
+ val props = TestUtils.createBrokerConfig(0, zkConnect)
+ props.put(AclAuthorizer.SuperUsersProp, superUsers)
+
+ config = KafkaConfig.fromProps(props)
+ interfaceDefaultAuthorizer.authorizer.configure(config.originals)
+
+ zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout,
zkConnectionTimeout, zkMaxInFlightRequests,
+ Time.SYSTEM, "kafka.test", "AuthorizerInterfaceDefaultTest")
+ }
+
+ @After
+ override def tearDown(): Unit = {
+ interfaceDefaultAuthorizer.close()
+ zooKeeperClient.close()
+ super.tearDown()
+ }
+
+ class DelegateAuthorizer extends Authorizer {
+ val authorizer = new AclAuthorizer
+
+ override def start(serverInfo: AuthorizerServerInfo): util.Map[Endpoint, _
<: CompletionStage[Void]] = {
+ authorizer.start(serverInfo)
+ }
+
+ override def authorize(requestContext: AuthorizableRequestContext,
actions: util.List[Action]): util.List[AuthorizationResult] = {
+ authorizer.authorize(requestContext, actions)
+ }
+
+ override def createAcls(requestContext: AuthorizableRequestContext,
aclBindings: util.List[AclBinding]): util.List[_ <:
CompletionStage[AclCreateResult]] = {
+ authorizer.createAcls(requestContext, aclBindings)
+ }
+
+ override def deleteAcls(requestContext: AuthorizableRequestContext,
aclBindingFilters: util.List[AclBindingFilter]): util.List[_ <:
CompletionStage[AclDeleteResult]] = {
+ authorizer.deleteAcls(requestContext, aclBindingFilters)
+ }
+
+ override def acls(filter: AclBindingFilter): lang.Iterable[AclBinding] = {
+ authorizer.acls(filter)
+ }
+
+ override def configure(configs: util.Map[String, _]): Unit = {
+ authorizer.configure(configs)
+ }
+
+ override def close(): Unit = {
+ authorizer.close()
+ }
+ }
+
+}
diff --git
a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerWrapperTest.scala
b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerWrapperTest.scala
new file mode 100644
index 0000000..ec8ca08
--- /dev/null
+++
b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerWrapperTest.scala
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.security.authorizer
+
+import java.util.UUID
+
+import kafka.security.auth.SimpleAclAuthorizer
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import kafka.zk.ZooKeeperTestHarness
+import kafka.zookeeper.ZooKeeperClient
+import org.apache.kafka.common.acl.AclOperation._
+import org.apache.kafka.common.acl._
+import org.apache.kafka.common.resource.PatternType.LITERAL
+import org.apache.kafka.common.resource.ResourcePattern
+import org.apache.kafka.common.resource.ResourceType._
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer._
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+
+import scala.annotation.nowarn
+
+class AuthorizerWrapperTest extends ZooKeeperTestHarness with
BaseAuthorizerTest {
+ @nowarn("cat=deprecation")
+ private val wrappedSimpleAuthorizer = new AuthorizerWrapper(new
SimpleAclAuthorizer)
+ @nowarn("cat=deprecation")
+ private val wrappedSimpleAuthorizerAllowEveryone = new AuthorizerWrapper(new
SimpleAclAuthorizer)
+
+ override def authorizer: Authorizer = wrappedSimpleAuthorizer
+
+ @Before
+ @nowarn("cat=deprecation")
+ override def setUp(): Unit = {
+ super.setUp()
+
+ val props = TestUtils.createBrokerConfig(0, zkConnect)
+
+ props.put(AclAuthorizer.SuperUsersProp, superUsers)
+ config = KafkaConfig.fromProps(props)
+ wrappedSimpleAuthorizer.configure(config.originals)
+
+ props.put(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, "true")
+ config = KafkaConfig.fromProps(props)
+ wrappedSimpleAuthorizerAllowEveryone.configure(config.originals)
+
+ resource = new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), LITERAL)
+ zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout,
zkConnectionTimeout, zkMaxInFlightRequests,
+ Time.SYSTEM, "kafka.test", "AuthorizerWrapperTest")
+ }
+
+ @After
+ override def tearDown(): Unit = {
+ val authorizers = Seq(wrappedSimpleAuthorizer,
wrappedSimpleAuthorizerAllowEveryone)
+ authorizers.foreach(a => {
+ a.close()
+ })
+ zooKeeperClient.close()
+ super.tearDown()
+ }
+
+ @Test
+ def testAuthorizeByResourceTypeEnableAllowEveryOne(): Unit = {
+ testAuthorizeByResourceTypeEnableAllowEveryOne(wrappedSimpleAuthorizer)
+ }
+
+ private def testAuthorizeByResourceTypeEnableAllowEveryOne(authorizer:
Authorizer): Unit = {
+ assertTrue("If allow.everyone.if.no.acl.found = true, " +
+ "caller should have read access to at least one topic",
+ authorizeByResourceType(wrappedSimpleAuthorizerAllowEveryone,
requestContext, READ, resource.resourceType()))
+ val allUser = AclEntry.WildcardPrincipalString
+ val allHost = AclEntry.WildcardHost
+ val denyAll = new AccessControlEntry(allUser, allHost, ALL,
AclPermissionType.DENY)
+ val wildcardResource = new ResourcePattern(resource.resourceType(),
AclEntry.WildcardResource, LITERAL)
+
+ addAcls(wrappedSimpleAuthorizerAllowEveryone, Set(denyAll), resource)
+ assertTrue("Should still allow since the deny only apply on the specific
resource",
+ authorizeByResourceType(wrappedSimpleAuthorizerAllowEveryone,
requestContext, READ, resource.resourceType()))
+
+ addAcls(wrappedSimpleAuthorizerAllowEveryone, Set(denyAll),
wildcardResource)
+ assertFalse("When an ACL binding which can deny all users and hosts
exists, " +
+ "even if allow.everyone.if.no.acl.found = true, caller shouldn't have
read access any topic",
+ authorizeByResourceType(wrappedSimpleAuthorizerAllowEveryone,
requestContext, READ, resource.resourceType()))
+ }
+
+ @Test
+ def testAuthorizeByResourceTypeDisableAllowEveryoneOverride(): Unit = {
+ assertFalse ("If allow.everyone.if.no.acl.found = false, " +
+ "caller shouldn't have read access to any topic",
+ authorizeByResourceType(wrappedSimpleAuthorizer, requestContext, READ,
resource.resourceType()))
+ }
+}
diff --git
a/core/src/test/scala/unit/kafka/security/authorizer/BaseAuthorizerTest.scala
b/core/src/test/scala/unit/kafka/security/authorizer/BaseAuthorizerTest.scala
new file mode 100644
index 0000000..001fd24
--- /dev/null
+++
b/core/src/test/scala/unit/kafka/security/authorizer/BaseAuthorizerTest.scala
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.security.authorizer
+
+import java.net.InetAddress
+import java.util.UUID
+
+import kafka.security.authorizer.AclEntry.{WildcardHost,
WildcardPrincipalString}
+import kafka.server.KafkaConfig
+import kafka.zookeeper.ZooKeeperClient
+import org.apache.kafka.common.acl.AclOperation.{ALL, READ, WRITE}
+import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
+import org.apache.kafka.common.acl.{AccessControlEntry,
AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation}
+import org.apache.kafka.common.network.{ClientInformation, ListenerName}
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
+import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
+import org.apache.kafka.common.resource.ResourcePattern.WILDCARD_RESOURCE
+import org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC,
TRANSACTIONAL_ID}
+import org.apache.kafka.common.resource.{ResourcePattern, ResourceType}
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.server.authorizer.{AuthorizationResult, Authorizer}
+import org.junit.Assert.{assertFalse, assertTrue}
+import org.junit.Test
+
+import scala.jdk.CollectionConverters._
+
+trait BaseAuthorizerTest {
+
+ def authorizer: Authorizer
+
+ val superUsers = "User:superuser1; User:superuser2"
+ val username = "alice"
+ val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
+ val requestContext: RequestContext = newRequestContext(principal,
InetAddress.getByName("192.168.0.1"))
+ val superUserName = "superuser1"
+ var config: KafkaConfig = _
+ var zooKeeperClient: ZooKeeperClient = _
+ var resource: ResourcePattern = _
+
+ @Test
+ def testAuthorizeByResourceTypeMultipleAddAndRemove(): Unit = {
+ val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1")
+ val host1 = InetAddress.getByName("192.168.1.1")
+ val resource1 = new ResourcePattern(TOPIC, "sb1" + UUID.randomUUID(),
LITERAL)
+ val denyRead = new AccessControlEntry(user1.toString,
host1.getHostAddress, READ, DENY)
+ val allowRead = new AccessControlEntry(user1.toString,
host1.getHostAddress, READ, ALLOW)
+ val u1h1Context = newRequestContext(user1, host1)
+
+ for (_ <- 1 to 10) {
+ assertFalse("User1 from host1 should not have READ access to any topic
when no ACL exists",
+ authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.TOPIC))
+
+ addAcls(authorizer, Set(allowRead), resource1)
+ assertTrue("User1 from host1 now should have READ access to at least one
topic",
+ authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.TOPIC))
+
+ for (_ <- 1 to 10) {
+ addAcls(authorizer, Set(denyRead), resource1)
+ assertFalse("User1 from host1 now should not have READ access to any
topic",
+ authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.TOPIC))
+
+ removeAcls(authorizer, Set(denyRead), resource1)
+ addAcls(authorizer, Set(allowRead), resource1)
+ assertTrue("User1 from host1 now should have READ access to at least
one topic",
+ authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.TOPIC))
+ }
+
+ removeAcls(authorizer, Set(allowRead), resource1)
+ assertFalse("User1 from host1 now should not have READ access to any
topic",
+ authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.TOPIC))
+ }
+ }
+
+ @Test
+ def testAuthorizeByResourceTypeIsolationUnrelatedDenyWontDominateAllow():
Unit = {
+ val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1")
+ val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user2")
+ val host1 = InetAddress.getByName("192.168.1.1")
+ val host2 = InetAddress.getByName("192.168.1.2")
+ val resource1 = new ResourcePattern(TOPIC, "sb1" + UUID.randomUUID(),
LITERAL)
+ val resource2 = new ResourcePattern(TOPIC, "sb2" + UUID.randomUUID(),
LITERAL)
+ val resource3 = new ResourcePattern(GROUP, "s", PREFIXED)
+
+ val acl1 = new AccessControlEntry(user1.toString, host1.getHostAddress,
READ, DENY)
+ val acl2 = new AccessControlEntry(user2.toString, host1.getHostAddress,
READ, DENY)
+ val acl3 = new AccessControlEntry(user1.toString, host2.getHostAddress,
WRITE, DENY)
+ val acl4 = new AccessControlEntry(user1.toString, host2.getHostAddress,
READ, DENY)
+ val acl5 = new AccessControlEntry(user1.toString, host2.getHostAddress,
READ, DENY)
+ val acl6 = new AccessControlEntry(user2.toString, host2.getHostAddress,
READ, DENY)
+ val acl7 = new AccessControlEntry(user1.toString, host2.getHostAddress,
READ, ALLOW)
+
+ addAcls(authorizer, Set(acl1, acl2, acl3, acl6, acl7), resource1)
+ addAcls(authorizer, Set(acl4), resource2)
+ addAcls(authorizer, Set(acl5), resource3)
+
+ val u1h1Context = newRequestContext(user1, host1)
+ val u1h2Context = newRequestContext(user1, host2)
+
+ assertFalse("User1 from host1 should not have READ access to any topic",
+ authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.TOPIC))
+ assertFalse("User1 from host2 should not have READ access to any consumer
group",
+ authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.GROUP))
+ assertFalse("User1 from host2 should not have READ access to any topic",
+ authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.TRANSACTIONAL_ID))
+ assertFalse("User1 from host2 should not have READ access to any topic",
+ authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.CLUSTER))
+ assertTrue("User1 from host2 should have READ access to at least one
topic",
+ authorizeByResourceType(authorizer, u1h2Context, READ,
ResourceType.TOPIC))
+ }
+
+ @Test
+ def testAuthorizeByResourceTypeDenyTakesPrecedence(): Unit = {
+ val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1")
+ val host1 = InetAddress.getByName("192.168.1.1")
+ val resource1 = new ResourcePattern(TOPIC, "sb1" + UUID.randomUUID(),
LITERAL)
+
+ val u1h1Context = newRequestContext(user1, host1)
+ val acl1 = new AccessControlEntry(user1.toString, host1.getHostAddress,
WRITE, ALLOW)
+ val acl2 = new AccessControlEntry(user1.toString, host1.getHostAddress,
WRITE, DENY)
+
+ addAcls(authorizer, Set(acl1), resource1)
+ assertTrue("User1 from host1 should have WRITE access to at least one
topic",
+ authorizeByResourceType(authorizer, u1h1Context, WRITE,
ResourceType.TOPIC))
+
+ addAcls(authorizer, Set(acl2), resource1)
+ assertFalse("User1 from host1 should not have WRITE access to any topic",
+ authorizeByResourceType(authorizer, u1h1Context, WRITE,
ResourceType.TOPIC))
+ }
+
+ @Test
+ def testAuthorizeByResourceTypePrefixedResourceDenyDominate(): Unit = {
+ val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1")
+ val host1 = InetAddress.getByName("192.168.1.1")
+ val a = new ResourcePattern(GROUP, "a", PREFIXED)
+ val ab = new ResourcePattern(GROUP, "ab", PREFIXED)
+ val abc = new ResourcePattern(GROUP, "abc", PREFIXED)
+ val abcd = new ResourcePattern(GROUP, "abcd", PREFIXED)
+ val abcde = new ResourcePattern(GROUP, "abcde", PREFIXED)
+
+ val u1h1Context = newRequestContext(user1, host1)
+ val allowAce = new AccessControlEntry(user1.toString,
host1.getHostAddress, READ, ALLOW)
+ val denyAce = new AccessControlEntry(user1.toString, host1.getHostAddress,
READ, DENY)
+
+ addAcls(authorizer, Set(allowAce), abcde)
+ assertTrue("User1 from host1 should have READ access to at least one
group",
+ authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.GROUP))
+
+ addAcls(authorizer, Set(denyAce), abcd)
+ assertFalse("User1 from host1 now should not have READ access to any
group",
+ authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.GROUP))
+
+ addAcls(authorizer, Set(allowAce), abc)
+ assertTrue("User1 from host1 now should have READ access to any group",
+ authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.GROUP))
+
+ addAcls(authorizer, Set(denyAce), a)
+ assertFalse("User1 from host1 now should not have READ access to any
group",
+ authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.GROUP))
+
+ addAcls(authorizer, Set(allowAce), ab)
+ assertFalse("User1 from host1 still should not have READ access to any
group",
+ authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.GROUP))
+ }
+
+ @Test
+ def testAuthorizeByResourceTypeWildcardResourceDenyDominate(): Unit = {
+ val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1")
+ val host1 = InetAddress.getByName("192.168.1.1")
+ val wildcard = new ResourcePattern(GROUP,
ResourcePattern.WILDCARD_RESOURCE, LITERAL)
+ val prefixed = new ResourcePattern(GROUP, "hello", PREFIXED)
+ val literal = new ResourcePattern(GROUP, "aloha", LITERAL)
+
+ val u1h1Context = newRequestContext(user1, host1)
+ val allowAce = new AccessControlEntry(user1.toString,
host1.getHostAddress, WRITE, ALLOW)
+ val denyAce = new AccessControlEntry(user1.toString, host1.getHostAddress,
WRITE, DENY)
+
+ addAcls(authorizer, Set(allowAce), prefixed)
+ assertTrue("User1 from host1 should have WRITE access to at least one
group",
+ authorizeByResourceType(authorizer, u1h1Context, WRITE,
ResourceType.GROUP))
+
+ addAcls(authorizer, Set(denyAce), wildcard)
+ assertFalse("User1 from host1 now should not have WRITE access to any
group",
+ authorizeByResourceType(authorizer, u1h1Context, WRITE,
ResourceType.GROUP))
+
+ addAcls(authorizer, Set(allowAce), wildcard)
+ assertFalse("User1 from host1 still should not have WRITE access to any
group",
+ authorizeByResourceType(authorizer, u1h1Context, WRITE,
ResourceType.GROUP))
+
+ addAcls(authorizer, Set(allowAce), literal)
+ assertFalse("User1 from host1 still should not have WRITE access to any
group",
+ authorizeByResourceType(authorizer, u1h1Context, WRITE,
ResourceType.GROUP))
+ }
+
+ @Test
+ def testAuthorizeByResourceTypeWithAllOperationAce(): Unit = {
+ val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1")
+ val host1 = InetAddress.getByName("192.168.1.1")
+ val resource1 = new ResourcePattern(TOPIC, "sb1" + UUID.randomUUID(),
LITERAL)
+ val denyAll = new AccessControlEntry(user1.toString, host1.getHostAddress,
ALL, DENY)
+ val allowAll = new AccessControlEntry(user1.toString,
host1.getHostAddress, ALL, ALLOW)
+ val denyWrite = new AccessControlEntry(user1.toString,
host1.getHostAddress, WRITE, DENY)
+ val u1h1Context = newRequestContext(user1, host1)
+
+ assertFalse("User1 from host1 should not have READ access to any topic
when no ACL exists",
+ authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.TOPIC))
+
+ addAcls(authorizer, Set(denyWrite, allowAll), resource1)
+ assertTrue("User1 from host1 now should have READ access to at least one
topic",
+ authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.TOPIC))
+
+ addAcls(authorizer, Set(denyAll), resource1)
+ assertFalse("User1 from host1 now should not have READ access to any
topic",
+ authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.TOPIC))
+ }
+
+ @Test
+ def testAuthorizeByResourceTypeWithAllHostAce(): Unit = {
+ val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1")
+ val host1 = InetAddress.getByName("192.168.1.1")
+ val host2 = InetAddress.getByName("192.168.1.2")
+ val allHost = AclEntry.WildcardHost
+ val resource1 = new ResourcePattern(TOPIC, "sb1" + UUID.randomUUID(),
LITERAL)
+ val resource2 = new ResourcePattern(TOPIC, "sb2" + UUID.randomUUID(),
LITERAL)
+ val allowHost1 = new AccessControlEntry(user1.toString,
host1.getHostAddress, READ, ALLOW)
+ val denyHost1 = new AccessControlEntry(user1.toString,
host1.getHostAddress, READ, DENY)
+ val denyAllHost = new AccessControlEntry(user1.toString, allHost, READ,
DENY)
+ val allowAllHost = new AccessControlEntry(user1.toString, allHost, READ,
ALLOW)
+ val u1h1Context = newRequestContext(user1, host1)
+ val u1h2Context = newRequestContext(user1, host2)
+
+ assertFalse("User1 from host1 should not have READ access to any topic
when no ACL exists",
+ authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.TOPIC))
+
+ addAcls(authorizer, Set(allowHost1), resource1)
+ assertTrue("User1 from host1 should now have READ access to at least one
topic",
+ authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.TOPIC))
+
+ addAcls(authorizer, Set(denyAllHost), resource1)
+ assertFalse("User1 from host1 now shouldn't have READ access to any topic",
+ authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.TOPIC))
+
+ addAcls(authorizer, Set(denyHost1), resource2)
+ assertFalse("User1 from host1 still should not have READ access to any
topic",
+ authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.TOPIC))
+ assertFalse("User1 from host2 should not have READ access to any topic",
+ authorizeByResourceType(authorizer, u1h2Context, READ,
ResourceType.TOPIC))
+
+ addAcls(authorizer, Set(allowAllHost), resource2)
+ assertTrue("User1 from host2 should now have READ access to at least one
topic",
+ authorizeByResourceType(authorizer, u1h2Context, READ,
ResourceType.TOPIC))
+
+ addAcls(authorizer, Set(denyAllHost), resource2)
+ assertFalse("User1 from host2 now shouldn't have READ access to any topic",
+ authorizeByResourceType(authorizer, u1h2Context, READ,
ResourceType.TOPIC))
+ }
+
+ @Test
+ def testAuthorizeByResourceTypeWithAllPrincipalAce(): Unit = {
+ val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1")
+ val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user2")
+ val allUser = AclEntry.WildcardPrincipalString
+ val host1 = InetAddress.getByName("192.168.1.1")
+ val resource1 = new ResourcePattern(TOPIC, "sb1" + UUID.randomUUID(),
LITERAL)
+ val resource2 = new ResourcePattern(TOPIC, "sb2" + UUID.randomUUID(),
LITERAL)
+ val allowUser1 = new AccessControlEntry(user1.toString,
host1.getHostAddress, READ, ALLOW)
+ val denyUser1 = new AccessControlEntry(user1.toString,
host1.getHostAddress, READ, DENY)
+ val denyAllUser = new AccessControlEntry(allUser, host1.getHostAddress,
READ, DENY)
+ val allowAllUser = new AccessControlEntry(allUser, host1.getHostAddress,
READ, ALLOW)
+ val u1h1Context = newRequestContext(user1, host1)
+ val u2h1Context = newRequestContext(user2, host1)
+
+ assertFalse("User1 from host1 should not have READ access to any topic
when no ACL exists",
+ authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.TOPIC))
+
+ addAcls(authorizer, Set(allowUser1), resource1)
+ assertTrue("User1 from host1 should now have READ access to at least one
topic",
+ authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.TOPIC))
+
+ addAcls(authorizer, Set(denyAllUser), resource1)
+ assertFalse("User1 from host1 now shouldn't have READ access to any topic",
+ authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.TOPIC))
+
+ addAcls(authorizer, Set(denyUser1), resource2)
+ assertFalse("User1 from host1 still should not have READ access to any
topic",
+ authorizeByResourceType(authorizer, u1h1Context, READ,
ResourceType.TOPIC))
+ assertFalse("User2 from host1 should not have READ access to any topic",
+ authorizeByResourceType(authorizer, u2h1Context, READ,
ResourceType.TOPIC))
+
+ addAcls(authorizer, Set(allowAllUser), resource2)
+ assertTrue("User2 from host1 should now have READ access to at least one
topic",
+ authorizeByResourceType(authorizer, u2h1Context, READ,
ResourceType.TOPIC))
+
+ addAcls(authorizer, Set(denyAllUser), resource2)
+ assertFalse("User2 from host1 now shouldn't have READ access to any topic",
+ authorizeByResourceType(authorizer, u2h1Context, READ,
ResourceType.TOPIC))
+ }
+
+ @Test
+ def testAuthorzeByResourceTypeSuperUserHasAccess(): Unit = {
+ val denyAllAce = new AccessControlEntry(WildcardPrincipalString,
WildcardHost, AclOperation.ALL, DENY)
+ val superUser1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
superUserName)
+ val host1 = InetAddress.getByName("192.0.4.4")
+ val allTopicsResource = new ResourcePattern(TOPIC, WILDCARD_RESOURCE,
LITERAL)
+ val clusterResource = new ResourcePattern(CLUSTER, WILDCARD_RESOURCE,
LITERAL)
+ val groupResource = new ResourcePattern(GROUP, WILDCARD_RESOURCE, LITERAL)
+ val transactionIdResource = new ResourcePattern(TRANSACTIONAL_ID,
WILDCARD_RESOURCE, LITERAL)
+
+ addAcls(authorizer, Set(denyAllAce), allTopicsResource)
+ addAcls(authorizer, Set(denyAllAce), clusterResource)
+ addAcls(authorizer, Set(denyAllAce), groupResource)
+ addAcls(authorizer, Set(denyAllAce), transactionIdResource)
+
+ val superUserContext = newRequestContext(superUser1, host1)
+
+ assertTrue("superuser always has access, no matter what acls.",
+ authorizeByResourceType(authorizer, superUserContext, READ,
ResourceType.TOPIC))
+ assertTrue("superuser always has access, no matter what acls.",
+ authorizeByResourceType(authorizer, superUserContext, READ,
ResourceType.CLUSTER))
+ assertTrue("superuser always has access, no matter what acls.",
+ authorizeByResourceType(authorizer, superUserContext, READ,
ResourceType.GROUP))
+ assertTrue("superuser always has access, no matter what acls.",
+ authorizeByResourceType(authorizer, superUserContext, READ,
ResourceType.TRANSACTIONAL_ID))
+ }
+
+ def newRequestContext(principal: KafkaPrincipal, clientAddress: InetAddress,
apiKey: ApiKeys = ApiKeys.PRODUCE): RequestContext = {
+ val securityProtocol = SecurityProtocol.SASL_PLAINTEXT
+ val header = new RequestHeader(apiKey, 2, "", 1) //ApiKeys apiKey, short
version, String clientId, int correlation
+ new RequestContext(header, "", clientAddress, principal,
ListenerName.forSecurityProtocol(securityProtocol),
+ securityProtocol, ClientInformation.EMPTY, false)
+ }
+
+ def authorizeByResourceType(authorizer: Authorizer, requestContext:
RequestContext, operation: AclOperation, resourceType: ResourceType) : Boolean
= {
+ authorizer.authorizeByResourceType(requestContext, operation,
resourceType) == AuthorizationResult.ALLOWED
+ }
+
+ def addAcls(authorizer: Authorizer, aces: Set[AccessControlEntry],
resourcePattern: ResourcePattern): Unit = {
+ val bindings = aces.map { ace => new AclBinding(resourcePattern, ace) }
+ authorizer.createAcls(requestContext, bindings.toList.asJava).asScala
+ .map(_.toCompletableFuture.get)
+ .foreach { result => result.exception.ifPresent { e => throw e } }
+ }
+
+ def removeAcls(authorizer: Authorizer, aces: Set[AccessControlEntry],
resourcePattern: ResourcePattern): Boolean = {
+ val bindings = if (aces.isEmpty)
+ Set(new AclBindingFilter(resourcePattern.toFilter,
AccessControlEntryFilter.ANY) )
+ else
+ aces.map { ace => new AclBinding(resourcePattern, ace).toFilter }
+ authorizer.deleteAcls(requestContext, bindings.toList.asJava).asScala
+ .map(_.toCompletableFuture.get)
+ .forall { result =>
+ result.exception.ifPresent { e => throw e }
+ result.aclBindingDeleteResults.forEach { r =>
+ r.exception.ifPresent { e => throw e }
+ }
+ !result.aclBindingDeleteResults.isEmpty
+ }
+ }
+
+}
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
index 060ff3a..65aa2a1 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
@@ -49,9 +49,7 @@ import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import scala.collection.JavaConverters;
-import scala.collection.immutable.TreeMap;
-import java.lang.reflect.Field;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
@@ -59,7 +57,9 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@@ -75,18 +75,27 @@ public class AclAuthorizerBenchmark {
@Param({"10", "50"})
private int aclCount;
+ @Param({"0", "20", "50", "90", "99", "99.9", "99.99", "100"})
+ private double denyPercentage;
+
private final int hostPreCount = 1000;
private final String resourceNamePrefix = "foo-bar35_resource-";
-
private final AclAuthorizer aclAuthorizer = new AclAuthorizer();
private final KafkaPrincipal principal = new
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user");
private List<Action> actions = new ArrayList<>();
- private RequestContext context;
+ private RequestContext authorizeContext;
+ private RequestContext authorizeByResourceTypeContext;
+ private String authorizeByResourceTypeHostName = "127.0.0.2";
+
+ private HashMap<ResourcePattern, AclAuthorizer.VersionedAcls> aclToUpdate
= new HashMap<>();
+
+ Random rand = new Random(System.currentTimeMillis());
+ double eps = 1e-9;
@Setup(Level.Trial)
public void setup() throws Exception {
- setFieldValue(aclAuthorizer,
AclAuthorizer.class.getDeclaredField("aclCache").getName(),
- prepareAclCache());
+ prepareAclCache();
+ prepareAclToUpdate();
// By adding `-95` to the resource name prefix, we cause the
`TreeMap.from/to` call to return
// most map entries. In such cases, we rely on the filtering based on
`String.startsWith`
// to return the matching ACLs. Using a more efficient data structure
(e.g. a prefix
@@ -94,18 +103,15 @@ public class AclAuthorizerBenchmark {
actions = Collections.singletonList(new Action(AclOperation.WRITE,
new ResourcePattern(ResourceType.TOPIC, resourceNamePrefix + 95,
PatternType.LITERAL),
1, true, true));
- context = new RequestContext(new RequestHeader(ApiKeys.PRODUCE,
Integer.valueOf(1).shortValue(),
- "someclient", 1), "1", InetAddress.getLocalHost(),
KafkaPrincipal.ANONYMOUS,
+ authorizeContext = new RequestContext(new
RequestHeader(ApiKeys.PRODUCE, Integer.valueOf(1).shortValue(),
+ "someclient", 1), "1", InetAddress.getByName("127.0.0.1"),
principal,
+ ListenerName.normalised("listener"), SecurityProtocol.PLAINTEXT,
ClientInformation.EMPTY, false);
+ authorizeByResourceTypeContext = new RequestContext(new
RequestHeader(ApiKeys.PRODUCE, Integer.valueOf(1).shortValue(),
+ "someclient", 1), "1",
InetAddress.getByName(authorizeByResourceTypeHostName), principal,
ListenerName.normalised("listener"), SecurityProtocol.PLAINTEXT,
ClientInformation.EMPTY, false);
}
- private void setFieldValue(Object obj, String fieldName, Object value)
throws Exception {
- Field field = obj.getClass().getDeclaredField(fieldName);
- field.setAccessible(true);
- field.set(obj, value);
- }
-
- private TreeMap<ResourcePattern, VersionedAcls> prepareAclCache() {
+ private void prepareAclCache() {
Map<ResourcePattern, Set<AclEntry>> aclEntries = new HashMap<>();
for (int resourceId = 0; resourceId < resourceCount; resourceId++) {
ResourcePattern resource = new ResourcePattern(
@@ -116,9 +122,20 @@ public class AclAuthorizerBenchmark {
Set<AclEntry> entries = aclEntries.computeIfAbsent(resource, k ->
new HashSet<>());
for (int aclId = 0; aclId < aclCount; aclId++) {
- AccessControlEntry ace = new
AccessControlEntry(principal.toString() + aclId,
- "*", AclOperation.READ, AclPermissionType.ALLOW);
- entries.add(new AclEntry(ace));
+ // The principal in the request context we are using
+ // is principal.toString without any suffix
+ String principalName = principal.toString() + (aclId == 0 ? ""
: aclId);
+ AccessControlEntry allowAce = new AccessControlEntry(
+ principalName, "*", AclOperation.READ,
AclPermissionType.ALLOW);
+
+ entries.add(new AclEntry(allowAce));
+
+ if (shouldDeny()) {
+ // dominantly deny the resource
+ AccessControlEntry denyAce = new AccessControlEntry(
+ principalName, "*", AclOperation.READ,
AclPermissionType.DENY);
+ entries.add(new AclEntry(denyAce));
+ }
}
}
@@ -126,9 +143,16 @@ public class AclAuthorizerBenchmark {
PatternType.PREFIXED);
Set<AclEntry> entriesPrefix =
aclEntries.computeIfAbsent(resourcePrefix, k -> new HashSet<>());
for (int hostId = 0; hostId < hostPreCount; hostId++) {
- AccessControlEntry ace = new
AccessControlEntry(principal.toString(), "127.0.0." + hostId,
+ AccessControlEntry allowAce = new
AccessControlEntry(principal.toString(), "127.0.0." + hostId,
AclOperation.READ, AclPermissionType.ALLOW);
- entriesPrefix.add(new AclEntry(ace));
+ entriesPrefix.add(new AclEntry(allowAce));
+
+ if (shouldDeny()) {
+ // dominantly deny the resource
+ AccessControlEntry denyAce = new
AccessControlEntry(principal.toString(), "127.0.0." + hostId,
+ AclOperation.READ, AclPermissionType.DENY);
+ entriesPrefix.add(new AclEntry(denyAce));
+ }
}
ResourcePattern resourceWildcard = new
ResourcePattern(ResourceType.TOPIC, ResourcePattern.WILDCARD_RESOURCE,
@@ -136,18 +160,50 @@ public class AclAuthorizerBenchmark {
Set<AclEntry> entriesWildcard =
aclEntries.computeIfAbsent(resourceWildcard, k -> new HashSet<>());
// get dynamic entries number for wildcard acl
for (int hostId = 0; hostId < resourceCount / 10; hostId++) {
- AccessControlEntry ace = new
AccessControlEntry(principal.toString(), "127.0.0." + hostId,
+ String hostName = "127.0.0" + hostId;
+ // AuthorizeByResourceType is optimizing the wildcard deny case.
+ // If we didn't skip the host, we would end up having a biased
short runtime.
+ if (hostName.equals(authorizeByResourceTypeHostName)) {
+ continue;
+ }
+
+ AccessControlEntry allowAce = new
AccessControlEntry(principal.toString(), hostName,
AclOperation.READ, AclPermissionType.ALLOW);
- entriesWildcard.add(new AclEntry(ace));
+ entriesWildcard.add(new AclEntry(allowAce));
+ if (shouldDeny()) {
+ AccessControlEntry denyAce = new
AccessControlEntry(principal.toString(), hostName,
+ AclOperation.READ, AclPermissionType.DENY);
+ entriesWildcard.add(new AclEntry(denyAce));
+ }
+ }
+
+ for (Map.Entry<ResourcePattern, Set<AclEntry>> entryMap :
aclEntries.entrySet()) {
+ aclAuthorizer.updateCache(entryMap.getKey(),
+ new
VersionedAcls(JavaConverters.asScalaSetConverter(entryMap.getValue()).asScala().toSet(),
1));
}
+ }
- TreeMap<ResourcePattern, VersionedAcls> aclCache = new TreeMap<>(new
AclAuthorizer.ResourceOrdering());
- for (Map.Entry<ResourcePattern, Set<AclEntry>> entry :
aclEntries.entrySet()) {
- aclCache = aclCache.updated(entry.getKey(),
- new
VersionedAcls(JavaConverters.asScalaSetConverter(entry.getValue()).asScala().toSet(),
1));
+ private void prepareAclToUpdate() {
+ scala.collection.mutable.Set<AclEntry> entries = new
scala.collection.mutable.HashSet<>();
+ for (int i = 0; i < resourceCount; i++) {
+ scala.collection.immutable.Set<AclEntry> immutable = new
scala.collection.immutable.HashSet<>();
+ for (int j = 0; j < aclCount; j++) {
+ entries.add(new AclEntry(new AccessControlEntry(
+ principal.toString(), "127.0.0" + j, AclOperation.WRITE,
AclPermissionType.ALLOW)));
+ immutable = entries.toSet();
+ }
+ aclToUpdate.put(
+ new ResourcePattern(ResourceType.TOPIC,
randomResourceName(resourceNamePrefix), PatternType.LITERAL),
+ new AclAuthorizer.VersionedAcls(immutable, i));
}
+ }
- return aclCache;
+ private String randomResourceName(String prefix) {
+ return prefix + UUID.randomUUID().toString().substring(0, 5);
+ }
+
+ private Boolean shouldDeny() {
+ return rand.nextDouble() * 100.0 - eps < denyPercentage;
}
@TearDown(Level.Trial)
@@ -162,6 +218,19 @@ public class AclAuthorizerBenchmark {
@Benchmark
public void testAuthorizer() {
- aclAuthorizer.authorize(context, actions);
+ aclAuthorizer.authorize(authorizeContext, actions);
+ }
+
+ @Benchmark
+ public void testAuthorizeByResourceType() {
+ aclAuthorizer.authorizeByResourceType(authorizeByResourceTypeContext,
AclOperation.READ, ResourceType.TOPIC);
+ }
+
+ @Benchmark
+ public void testUpdateCache() {
+ AclAuthorizer aclAuthorizer = new AclAuthorizer();
+ for (Map.Entry<ResourcePattern, VersionedAcls> e :
aclToUpdate.entrySet()) {
+ aclAuthorizer.updateCache(e.getKey(), e.getValue());
+ }
}
}