This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 57eb5fd7dc3 KAFKA-14587: Move AclCommand to tools (#17880)
57eb5fd7dc3 is described below
commit 57eb5fd7dc3d4a16b0c50255e0e5e7f2dd265cc6
Author: Mickael Maison <[email protected]>
AuthorDate: Sat Dec 14 20:05:46 2024 +0100
KAFKA-14587: Move AclCommand to tools (#17880)
Reviewers: Chia-Ping Tsai <[email protected]>
---
bin/kafka-acls.sh | 2 +-
bin/windows/kafka-acls.bat | 2 +-
checkstyle/import-control.xml | 2 +
checkstyle/suppressions.xml | 4 +-
.../apache/kafka/server/authorizer/Authorizer.java | 6 +-
core/src/main/scala/kafka/admin/AclCommand.scala | 510 -----------------
.../kafka/api/EndToEndAuthorizationTest.scala | 2 +-
.../java/org/apache/kafka/tools/AclCommand.java | 605 +++++++++++++++++++++
.../org/apache/kafka/tools}/AclCommandTest.java | 322 +++++------
9 files changed, 750 insertions(+), 705 deletions(-)
diff --git a/bin/kafka-acls.sh b/bin/kafka-acls.sh
index 8fa65542e10..ffbb1e19810 100755
--- a/bin/kafka-acls.sh
+++ b/bin/kafka-acls.sh
@@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-exec $(dirname $0)/kafka-run-class.sh kafka.admin.AclCommand "$@"
+exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.AclCommand "$@"
diff --git a/bin/windows/kafka-acls.bat b/bin/windows/kafka-acls.bat
index 8f0be85c045..12c4a9a69a7 100644
--- a/bin/windows/kafka-acls.bat
+++ b/bin/windows/kafka-acls.bat
@@ -14,4 +14,4 @@ rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
-"%~dp0kafka-run-class.bat" kafka.admin.AclCommand %*
+"%~dp0kafka-run-class.bat" org.apache.kafka.tools.AclCommand %*
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 0b9e7dd717b..e3382fb3db3 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -290,6 +290,8 @@
<allow pkg="org.apache.kafka.server.util" />
<allow pkg="kafka.admin" />
<allow pkg="kafka.server" />
+ <allow pkg="org.apache.kafka.metadata.authorizer" />
+ <allow pkg="org.apache.kafka.security.authorizer" />
<allow pkg="org.apache.kafka.storage.internals" />
<allow pkg="org.apache.kafka.server.config" />
<allow pkg="org.apache.kafka.server.common" />
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 2f8ff2d84c2..2d05fac1e70 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -269,11 +269,11 @@
<suppress checks="ClassDataAbstractionCoupling"
files="VerifiableConsumer.java"/>
<suppress checks="CyclomaticComplexity"
-
files="(ConsoleConsumer|DefaultMessageFormatter|StreamsResetter|ProducerPerformance|Agent).java"/>
+
files="(AclCommand|ConsoleConsumer|DefaultMessageFormatter|StreamsResetter|ProducerPerformance|Agent).java"/>
<suppress checks="BooleanExpressionComplexity"
files="(StreamsResetter|DefaultMessageFormatter).java"/>
<suppress checks="NPathComplexity"
-
files="(DefaultMessageFormatter|ProducerPerformance|StreamsResetter|Agent|TransactionalMessageCopier|ReplicaVerificationTool|LineMessageReader).java"/>
+
files="(AclCommand|DefaultMessageFormatter|ProducerPerformance|StreamsResetter|Agent|TransactionalMessageCopier|ReplicaVerificationTool|LineMessageReader).java"/>
<suppress checks="ImportControl"
files="SignalLogger.java"/>
<suppress checks="IllegalImport"
diff --git
a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
index 58241c51ea0..6dcc80f1e1d 100644
--- a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
+++ b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
@@ -112,8 +112,7 @@ public interface Authorizer extends Configurable, Closeable
{
* to process the update synchronously on the request thread.
*
* @param requestContext Request context if the ACL is being created by a
broker to handle
- * a client request to create ACLs. This may be null if ACLs are
created directly in ZooKeeper
- * using AclCommand.
+ * a client request to create ACLs.
* @param aclBindings ACL bindings to create
*
* @return Create result for each ACL binding in the same order as in the
input list. Each result
@@ -131,8 +130,7 @@ public interface Authorizer extends Configurable, Closeable
{
* Refer to the authorizer implementation docs for details on concurrent
update guarantees.
*
* @param requestContext Request context if the ACL is being deleted by a
broker to handle
- * a client request to delete ACLs. This may be null if ACLs are
deleted directly in ZooKeeper
- * using AclCommand.
+ * a client request to delete ACLs.
* @param aclBindingFilters Filters to match ACL bindings that are to be
deleted
*
* @return Delete result for each filter in the same order as in the input
list.
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala
b/core/src/main/scala/kafka/admin/AclCommand.scala
deleted file mode 100644
index e83ffe30ec6..00000000000
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ /dev/null
@@ -1,510 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.admin
-
-import java.util.Properties
-import joptsimple._
-import joptsimple.util.EnumConverter
-import kafka.utils._
-import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
-import org.apache.kafka.common.acl._
-import org.apache.kafka.common.acl.AclOperation._
-import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
-import org.apache.kafka.common.resource.{PatternType, ResourcePattern,
ResourcePatternFilter, Resource => JResource, ResourceType => JResourceType}
-import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.utils.{Exit, Utils, SecurityUtils =>
JSecurityUtils}
-import org.apache.kafka.security.authorizer.AclEntry
-import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
-
-import scala.jdk.CollectionConverters._
-import scala.collection.mutable
-import scala.io.StdIn
-
-object AclCommand extends Logging {
-
- private val ClusterResourceFilter = new
ResourcePatternFilter(JResourceType.CLUSTER, JResource.CLUSTER_NAME,
PatternType.LITERAL)
-
- private val Newline = scala.util.Properties.lineSeparator
-
- def main(args: Array[String]): Unit = {
-
- val opts = new AclCommandOptions(args)
-
- CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to manage
acls on kafka.")
-
- opts.checkArgs()
-
- val aclCommandService = new AdminClientService(opts)
-
- try {
- if (opts.options.has(opts.addOpt))
- aclCommandService.addAcls()
- else if (opts.options.has(opts.removeOpt))
- aclCommandService.removeAcls()
- else if (opts.options.has(opts.listOpt))
- aclCommandService.listAcls()
- } catch {
- case e: Throwable =>
- println(s"Error while executing ACL command: ${e.getMessage}")
- println(Utils.stackTrace(e))
- Exit.exit(1)
- }
- }
-
- private class AdminClientService(val opts: AclCommandOptions) extends
Logging {
-
- private def withAdminClient(opts: AclCommandOptions)(f: Admin => Unit):
Unit = {
- val props = if (opts.options.has(opts.commandConfigOpt))
- Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
- else
- new Properties()
-
- if (opts.options.has(opts.bootstrapServerOpt)) {
- props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
opts.options.valueOf(opts.bootstrapServerOpt))
- } else {
- props.put(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG,
opts.options.valueOf(opts.bootstrapControllerOpt))
- }
- val adminClient = Admin.create(props)
-
- try {
- f(adminClient)
- } finally {
- adminClient.close()
- }
- }
-
- def addAcls(): Unit = {
- val resourceToAcl = getResourceToAcls(opts)
- withAdminClient(opts) { adminClient =>
- for ((resource, acls) <- resourceToAcl) {
- println(s"Adding ACLs for resource `$resource`: $Newline
${acls.map("\t" + _).mkString(Newline)} $Newline")
- val aclBindings = acls.map(acl => new AclBinding(resource,
acl)).asJavaCollection
- adminClient.createAcls(aclBindings).all().get()
- }
- }
- }
-
- def removeAcls(): Unit = {
- withAdminClient(opts) { adminClient =>
- val filterToAcl = getResourceFilterToAcls(opts)
-
- for ((filter, acls) <- filterToAcl) {
- if (acls.isEmpty) {
- if (confirmAction(opts, s"Are you sure you want to delete all ACLs
for resource filter `$filter`? (y/n)"))
- removeAcls(adminClient, acls, filter)
- } else {
- if (confirmAction(opts, s"Are you sure you want to remove ACLs:
$Newline ${acls.map("\t" + _).mkString(Newline)} $Newline from resource filter
`$filter`? (y/n)"))
- removeAcls(adminClient, acls, filter)
- }
- }
- }
- }
-
- def listAcls(): Unit = {
- withAdminClient(opts) { adminClient =>
- listAcls(adminClient)
- }
- }
-
- private def listAcls(adminClient: Admin): Unit = {
- val filters = getResourceFilter(opts, dieIfNoResourceFound = false)
- val listPrincipals = getPrincipals(opts, opts.listPrincipalsOpt)
- val resourceToAcls = getAcls(adminClient, filters)
-
- if (listPrincipals.isEmpty) {
- printResourceAcls(resourceToAcls)
- } else {
- listPrincipals.foreach{principal =>
- println(s"ACLs for principal `$principal`")
- val filteredResourceToAcls = resourceToAcls.map { case (resource,
acls) =>
- resource -> acls.filter(acl =>
principal.toString.equals(acl.principal))
- }.filter { case (_, acls) => acls.nonEmpty }
- printResourceAcls(filteredResourceToAcls)
- }
- }
- }
-
- private def printResourceAcls(resourceToAcls: Map[ResourcePattern,
Set[AccessControlEntry]]): Unit = {
- for ((resource, acls) <- resourceToAcls)
- println(s"Current ACLs for resource `$resource`: $Newline
${acls.map("\t" + _).mkString(Newline)} $Newline")
- }
-
- private def removeAcls(adminClient: Admin, acls: Set[AccessControlEntry],
filter: ResourcePatternFilter): Unit = {
- if (acls.isEmpty)
- adminClient.deleteAcls(List(new AclBindingFilter(filter,
AccessControlEntryFilter.ANY)).asJava).all().get()
- else {
- val aclBindingFilters = acls.map(acl => new AclBindingFilter(filter,
acl.toFilter)).toList.asJava
- adminClient.deleteAcls(aclBindingFilters).all().get()
- }
- }
-
- private def getAcls(adminClient: Admin, filters:
Set[ResourcePatternFilter]): Map[ResourcePattern, Set[AccessControlEntry]] = {
- val aclBindings =
- if (filters.isEmpty)
adminClient.describeAcls(AclBindingFilter.ANY).values().get().asScala.toList
- else {
- val results = for (filter <- filters) yield {
- adminClient.describeAcls(new AclBindingFilter(filter,
AccessControlEntryFilter.ANY)).values().get().asScala.toList
- }
- results.reduceLeft(_ ++ _)
- }
-
- val resourceToAcls = mutable.Map[ResourcePattern,
Set[AccessControlEntry]]().withDefaultValue(Set())
-
- aclBindings.foreach(aclBinding => resourceToAcls(aclBinding.pattern()) =
resourceToAcls(aclBinding.pattern()) + aclBinding.entry())
- resourceToAcls.toMap
- }
- }
-
- private def getResourceToAcls(opts: AclCommandOptions): Map[ResourcePattern,
Set[AccessControlEntry]] = {
- val patternType = opts.options.valueOf(opts.resourcePatternType)
- if (!patternType.isSpecific)
- CommandLineUtils.printUsageAndExit(opts.parser, s"A
'--resource-pattern-type' value of '$patternType' is not valid when adding
acls.")
-
- val resourceToAcl = getResourceFilterToAcls(opts).map {
- case (filter, acls) =>
- new ResourcePattern(filter.resourceType(), filter.name(),
filter.patternType()) -> acls
- }
-
- if (resourceToAcl.values.exists(_.isEmpty))
- CommandLineUtils.printUsageAndExit(opts.parser, "You must specify one
of: --allow-principal, --deny-principal when trying to add ACLs.")
-
- resourceToAcl
- }
-
- private def getResourceFilterToAcls(opts: AclCommandOptions):
Map[ResourcePatternFilter, Set[AccessControlEntry]] = {
- var resourceToAcls = Map.empty[ResourcePatternFilter,
Set[AccessControlEntry]]
-
- //if none of the --producer or --consumer options are specified , just
construct ACLs from CLI options.
- if (!opts.options.has(opts.producerOpt) &&
!opts.options.has(opts.consumerOpt)) {
- resourceToAcls ++= getCliResourceFilterToAcls(opts)
- }
-
- //users are allowed to specify both --producer and --consumer options in a
single command.
- if (opts.options.has(opts.producerOpt))
- resourceToAcls ++= getProducerResourceFilterToAcls(opts)
-
- if (opts.options.has(opts.consumerOpt))
- resourceToAcls ++= getConsumerResourceFilterToAcls(opts).map { case (k,
v) => k -> (v ++ resourceToAcls.getOrElse(k, Set.empty[AccessControlEntry])) }
-
- validateOperation(opts, resourceToAcls)
-
- resourceToAcls
- }
-
- private def getProducerResourceFilterToAcls(opts: AclCommandOptions):
Map[ResourcePatternFilter, Set[AccessControlEntry]] = {
- val filters = getResourceFilter(opts)
-
- val topics = filters.filter(_.resourceType == JResourceType.TOPIC)
- val transactionalIds = filters.filter(_.resourceType ==
JResourceType.TRANSACTIONAL_ID)
- val enableIdempotence = opts.options.has(opts.idempotentOpt)
-
- val topicAcls = getAcl(opts, Set(WRITE, DESCRIBE, CREATE))
- val transactionalIdAcls = getAcl(opts, Set(WRITE, DESCRIBE))
-
- //Write, Describe, Create permission on topics, Write, Describe on
transactionalIds
- topics.map(_ -> topicAcls).toMap ++
- transactionalIds.map(_ -> transactionalIdAcls).toMap ++
- (if (enableIdempotence)
- Map(ClusterResourceFilter -> getAcl(opts, Set(IDEMPOTENT_WRITE)))
- else
- Map.empty)
- }
-
- private def getConsumerResourceFilterToAcls(opts: AclCommandOptions):
Map[ResourcePatternFilter, Set[AccessControlEntry]] = {
- val filters = getResourceFilter(opts)
-
- val topics = filters.filter(_.resourceType == JResourceType.TOPIC)
- val groups = filters.filter(_.resourceType == JResourceType.GROUP)
-
- //Read, Describe on topic, Read on consumerGroup
-
- val acls = getAcl(opts, Set(READ, DESCRIBE))
-
- topics.map(_ -> acls).toMap[ResourcePatternFilter,
Set[AccessControlEntry]] ++
- groups.map(_ -> getAcl(opts, Set(READ))).toMap[ResourcePatternFilter,
Set[AccessControlEntry]]
- }
-
- private def getCliResourceFilterToAcls(opts: AclCommandOptions):
Map[ResourcePatternFilter, Set[AccessControlEntry]] = {
- val acls = getAcl(opts)
- val filters = getResourceFilter(opts)
- filters.map(_ -> acls).toMap
- }
-
- private def getAcl(opts: AclCommandOptions, operations: Set[AclOperation]):
Set[AccessControlEntry] = {
- val allowedPrincipals = getPrincipals(opts, opts.allowPrincipalsOpt)
-
- val deniedPrincipals = getPrincipals(opts, opts.denyPrincipalsOpt)
-
- val allowedHosts = getHosts(opts, opts.allowHostsOpt,
opts.allowPrincipalsOpt)
-
- val deniedHosts = getHosts(opts, opts.denyHostsOpt, opts.denyPrincipalsOpt)
-
- val acls = new collection.mutable.HashSet[AccessControlEntry]
- if (allowedHosts.nonEmpty && allowedPrincipals.nonEmpty)
- acls ++= getAcls(allowedPrincipals, ALLOW, operations, allowedHosts)
-
- if (deniedHosts.nonEmpty && deniedPrincipals.nonEmpty)
- acls ++= getAcls(deniedPrincipals, DENY, operations, deniedHosts)
-
- acls.toSet
- }
-
- private def getAcl(opts: AclCommandOptions): Set[AccessControlEntry] = {
- val operations = opts.options.valuesOf(opts.operationsOpt).asScala
- .map(operation => JSecurityUtils.operation(operation.trim)).toSet
- getAcl(opts, operations)
- }
-
- def getAcls(principals: Set[KafkaPrincipal], permissionType:
AclPermissionType, operations: Set[AclOperation],
- hosts: Set[String]): Set[AccessControlEntry] = {
- for {
- principal <- principals
- operation <- operations
- host <- hosts
- } yield new AccessControlEntry(principal.toString, host, operation,
permissionType)
- }
-
- private def getHosts(opts: AclCommandOptions, hostOptionSpec:
OptionSpec[String],
- principalOptionSpec: OptionSpec[String]): Set[String] =
{
- if (opts.options.has(hostOptionSpec))
- opts.options.valuesOf(hostOptionSpec).asScala.map(_.trim).toSet
- else if (opts.options.has(principalOptionSpec))
- Set[String](AclEntry.WILDCARD_HOST)
- else
- Set.empty[String]
- }
-
- private def getPrincipals(opts: AclCommandOptions, principalOptionSpec:
OptionSpec[String]): Set[KafkaPrincipal] = {
- if (opts.options.has(principalOptionSpec))
- opts.options.valuesOf(principalOptionSpec).asScala.map(s =>
JSecurityUtils.parseKafkaPrincipal(s.trim)).toSet
- else
- Set.empty[KafkaPrincipal]
- }
-
- private def getResourceFilter(opts: AclCommandOptions, dieIfNoResourceFound:
Boolean = true): Set[ResourcePatternFilter] = {
- val patternType = opts.options.valueOf(opts.resourcePatternType)
-
- var resourceFilters = Set.empty[ResourcePatternFilter]
- if (opts.options.has(opts.topicOpt))
- opts.options.valuesOf(opts.topicOpt).forEach(topic => resourceFilters +=
new ResourcePatternFilter(JResourceType.TOPIC, topic.trim, patternType))
-
- if (patternType == PatternType.LITERAL &&
(opts.options.has(opts.clusterOpt) || opts.options.has(opts.idempotentOpt)))
- resourceFilters += ClusterResourceFilter
-
- if (opts.options.has(opts.groupOpt))
- opts.options.valuesOf(opts.groupOpt).forEach(group => resourceFilters +=
new ResourcePatternFilter(JResourceType.GROUP, group.trim, patternType))
-
- if (opts.options.has(opts.transactionalIdOpt))
- opts.options.valuesOf(opts.transactionalIdOpt).forEach(transactionalId =>
- resourceFilters += new
ResourcePatternFilter(JResourceType.TRANSACTIONAL_ID, transactionalId,
patternType))
-
- if (opts.options.has(opts.delegationTokenOpt))
- opts.options.valuesOf(opts.delegationTokenOpt).forEach(token =>
resourceFilters += new ResourcePatternFilter(JResourceType.DELEGATION_TOKEN,
token.trim, patternType))
-
- if (opts.options.has(opts.userPrincipalOpt))
- opts.options.valuesOf(opts.userPrincipalOpt).forEach(user =>
resourceFilters += new ResourcePatternFilter(JResourceType.USER, user.trim,
patternType))
-
- if (resourceFilters.isEmpty && dieIfNoResourceFound)
- CommandLineUtils.printUsageAndExit(opts.parser, "You must provide at
least one resource: --topic <topic> or --cluster or --group <group> or
--delegation-token <Delegation Token ID>")
-
- resourceFilters
- }
-
- private def confirmAction(opts: AclCommandOptions, msg: String): Boolean = {
- if (opts.options.has(opts.forceOpt))
- return true
- println(msg)
- StdIn.readLine().equalsIgnoreCase("y")
- }
-
- private def validateOperation(opts: AclCommandOptions, resourceToAcls:
Map[ResourcePatternFilter, Set[AccessControlEntry]]): Unit = {
- for ((resource, acls) <- resourceToAcls) {
- val validOps =
AclEntry.supportedOperations(resource.resourceType).asScala.toSet +
AclOperation.ALL
- if ((acls.map(_.operation) -- validOps).nonEmpty)
- CommandLineUtils.printUsageAndExit(opts.parser, s"ResourceType
${resource.resourceType} only supports operations
${validOps.map(JSecurityUtils.operationName).mkString(", ")}")
- }
- }
-
- class AclCommandOptions(args: Array[String]) extends
CommandDefaultOptions(args) {
- val CommandConfigDoc = "A property file containing configs to be passed to
Admin Client."
-
- val bootstrapServerOpt: OptionSpec[String] =
parser.accepts("bootstrap-server", "A list of host/port pairs to use for
establishing the connection to the Kafka cluster." +
- " This list should be in the form host1:port1,host2:port2,... This
config is required for acl management using admin client API.")
- .withRequiredArg
- .describedAs("server to connect to")
- .ofType(classOf[String])
-
- val bootstrapControllerOpt: OptionSpec[String] =
parser.accepts("bootstrap-controller", "A list of host/port pairs to use for
establishing the connection to the Kafka cluster." +
- " This list should be in the form host1:port1,host2:port2,... This
config is required for acl management using admin client API.")
- .withRequiredArg
- .describedAs("controller to connect to")
- .ofType(classOf[String])
-
- val commandConfigOpt: OptionSpec[String] =
parser.accepts("command-config", CommandConfigDoc)
- .withOptionalArg()
- .describedAs("command-config")
- .ofType(classOf[String])
-
- val topicOpt: OptionSpec[String] = parser.accepts("topic", "topic to which
ACLs should be added or removed. " +
- "A value of '*' indicates ACL should apply to all topics.")
- .withRequiredArg
- .describedAs("topic")
- .ofType(classOf[String])
-
- val clusterOpt: OptionSpecBuilder = parser.accepts("cluster", "Add/Remove
cluster ACLs.")
- val groupOpt: OptionSpec[String] = parser.accepts("group", "Consumer Group
to which the ACLs should be added or removed. " +
- "A value of '*' indicates the ACLs should apply to all groups.")
- .withRequiredArg
- .describedAs("group")
- .ofType(classOf[String])
-
- val transactionalIdOpt: OptionSpec[String] =
parser.accepts("transactional-id", "The transactionalId to which ACLs should " +
- "be added or removed. A value of '*' indicates the ACLs should apply to
all transactionalIds.")
- .withRequiredArg
- .describedAs("transactional-id")
- .ofType(classOf[String])
-
- val idempotentOpt: OptionSpecBuilder = parser.accepts("idempotent",
"Enable idempotence for the producer. This should be " +
- "used in combination with the --producer option. Note that idempotence
is enabled automatically if " +
- "the producer is authorized to a particular transactional-id.")
-
- val delegationTokenOpt: OptionSpec[String] =
parser.accepts("delegation-token", "Delegation token to which ACLs should be
added or removed. " +
- "A value of '*' indicates ACL should apply to all tokens.")
- .withRequiredArg
- .describedAs("delegation-token")
- .ofType(classOf[String])
-
- val resourcePatternType: OptionSpec[PatternType] =
parser.accepts("resource-pattern-type", "The type of the resource pattern or
pattern filter. " +
- "When adding acls, this should be a specific pattern type, e.g.
'literal' or 'prefixed'. " +
- "When listing or removing acls, a specific pattern type can be used to
list or remove acls from specific resource patterns, " +
- "or use the filter values of 'any' or 'match', where 'any' will match
any pattern type, but will match the resource name exactly, " +
- "where as 'match' will perform pattern matching to list or remove all
acls that affect the supplied resource(s). " +
- "WARNING: 'match', when used in combination with the '--remove' switch,
should be used with care.")
- .withRequiredArg()
- .ofType(classOf[String])
- .withValuesConvertedBy(new PatternTypeConverter())
- .defaultsTo(PatternType.LITERAL)
-
- val addOpt: OptionSpecBuilder = parser.accepts("add", "Indicates you are
trying to add ACLs.")
- val removeOpt: OptionSpecBuilder = parser.accepts("remove", "Indicates you
are trying to remove ACLs.")
- val listOpt: OptionSpecBuilder = parser.accepts("list", "List ACLs for the
specified resource, use --topic <topic> or --group <group> or --cluster to
specify a resource.")
-
- val operationsOpt: OptionSpec[String] = parser.accepts("operation",
"Operation that is being allowed or denied. Valid operation names are: " +
Newline +
- AclEntry.ACL_OPERATIONS.asScala.map("\t" +
JSecurityUtils.operationName(_)).mkString(Newline) + Newline)
- .withRequiredArg
- .ofType(classOf[String])
- .defaultsTo(JSecurityUtils.operationName(AclOperation.ALL))
-
- val allowPrincipalsOpt: OptionSpec[String] =
parser.accepts("allow-principal", "principal is in principalType:name format." +
- " Note that principalType must be supported by the Authorizer being
used." +
- " For example, User:'*' is the wild card indicating all users.")
- .withRequiredArg
- .describedAs("allow-principal")
- .ofType(classOf[String])
-
- val denyPrincipalsOpt: OptionSpec[String] =
parser.accepts("deny-principal", "principal is in principalType:name format. " +
- "By default anyone not added through --allow-principal is denied access.
" +
- "You only need to use this option as negation to already allowed set. " +
- "Note that principalType must be supported by the Authorizer being used.
" +
- "For example if you wanted to allow access to all users in the system
but not test-user you can define an ACL that " +
- "allows access to User:'*' and specify
--deny-principal=User:[email protected]. " +
- "AND PLEASE REMEMBER DENY RULES TAKES PRECEDENCE OVER ALLOW RULES.")
- .withRequiredArg
- .describedAs("deny-principal")
- .ofType(classOf[String])
-
- val listPrincipalsOpt: OptionSpec[String] = parser.accepts("principal",
"List ACLs for the specified principal. principal is in principalType:name
format." +
- " Note that principalType must be supported by the Authorizer being
used. Multiple --principal option can be passed.")
- .withOptionalArg()
- .describedAs("principal")
- .ofType(classOf[String])
-
- val allowHostsOpt: OptionSpec[String] = parser.accepts("allow-host", "Host
from which principals listed in --allow-principal will have access. " +
- "If you have specified --allow-principal then the default for this
option will be set to '*' which allows access from all hosts.")
- .withRequiredArg
- .describedAs("allow-host")
- .ofType(classOf[String])
-
- val denyHostsOpt: OptionSpec[String] = parser.accepts("deny-host", "Host
from which principals listed in --deny-principal will be denied access. " +
- "If you have specified --deny-principal then the default for this option
will be set to '*' which denies access from all hosts.")
- .withRequiredArg
- .describedAs("deny-host")
- .ofType(classOf[String])
-
- val producerOpt: OptionSpecBuilder = parser.accepts("producer",
"Convenience option to add/remove ACLs for producer role. " +
- "This will generate ACLs that allows WRITE,DESCRIBE and CREATE on
topic.")
-
- val consumerOpt: OptionSpecBuilder = parser.accepts("consumer",
"Convenience option to add/remove ACLs for consumer role. " +
- "This will generate ACLs that allows READ,DESCRIBE on topic and READ on
group.")
-
- val forceOpt: OptionSpecBuilder = parser.accepts("force", "Assume Yes to
all queries and do not prompt.")
-
- val userPrincipalOpt: OptionSpec[String] =
parser.accepts("user-principal", "Specifies a user principal as a resource in
relation with the operation. For instance " +
- "one could grant CreateTokens or DescribeTokens permission on a given
user principal.")
- .withRequiredArg()
- .describedAs("user-principal")
- .ofType(classOf[String])
-
- options = parser.parse(args: _*)
-
- def checkArgs(): Unit = {
- if (options.has(bootstrapServerOpt) &&
options.has(bootstrapControllerOpt))
- CommandLineUtils.printUsageAndExit(parser, "Only one of
--bootstrap-server or --bootstrap-controller must be specified")
-
- if (!options.has(bootstrapServerOpt) &&
!options.has(bootstrapControllerOpt))
- CommandLineUtils.printUsageAndExit(parser, "One of --bootstrap-server
or --bootstrap-controller must be specified")
-
- val actions = Seq(addOpt, removeOpt, listOpt).count(options.has)
- if (actions != 1)
- CommandLineUtils.printUsageAndExit(parser, "Command must include
exactly one action: --list, --add, --remove. ")
-
- CommandLineUtils.checkInvalidArgs(parser, options, listOpt, producerOpt,
consumerOpt, allowHostsOpt, allowPrincipalsOpt, denyHostsOpt, denyPrincipalsOpt)
-
- //when --producer or --consumer is specified , user should not specify
operations as they are inferred and we also disallow --deny-principals and
--deny-hosts.
- CommandLineUtils.checkInvalidArgs(parser, options, producerOpt,
operationsOpt, denyPrincipalsOpt, denyHostsOpt)
- CommandLineUtils.checkInvalidArgs(parser, options, consumerOpt,
operationsOpt, denyPrincipalsOpt, denyHostsOpt)
-
- if (options.has(listPrincipalsOpt) && !options.has(listOpt))
- CommandLineUtils.printUsageAndExit(parser, "The --principal option is
only available if --list is set")
-
- if (options.has(producerOpt) && !options.has(topicOpt))
- CommandLineUtils.printUsageAndExit(parser, "With --producer you must
specify a --topic")
-
- if (options.has(idempotentOpt) && !options.has(producerOpt))
- CommandLineUtils.printUsageAndExit(parser, "The --idempotent option is
only available if --producer is set")
-
- if (options.has(consumerOpt) && (!options.has(topicOpt) ||
!options.has(groupOpt) || (!options.has(producerOpt) &&
(options.has(clusterOpt) || options.has(transactionalIdOpt)))))
- CommandLineUtils.printUsageAndExit(parser, "With --consumer you must
specify a --topic and a --group and no --cluster or --transactional-id option
should be specified.")
- }
- }
-}
-
-class PatternTypeConverter extends
EnumConverter[PatternType](classOf[PatternType]) {
-
- override def convert(value: String): PatternType = {
- val patternType = super.convert(value)
- if (patternType.isUnknown)
- throw new ValueConversionException("Unknown resource-pattern-type: " +
value)
-
- patternType
- }
-
- override def valuePattern: String = PatternType.values
- .filter(_ != PatternType.UNKNOWN)
- .mkString("|")
-}
diff --git
a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index 9560f060d0f..fe9336a04f4 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -58,7 +58,7 @@ import scala.jdk.CollectionConverters._
* brokers.
*
* To start brokers we need to set a cluster ACL, which happens optionally in
KafkaServerTestHarness.
- * The remaining ACLs to enable access to producers and consumers are set
here. To set ACLs, we use AclCommand directly.
+ * The remaining ACLs to enable access to producers and consumers are set
here.
*
* Finally, we rely on SaslSetup to bootstrap and setup Kerberos. We don't use
* SaslTestHarness here directly because it extends QuorumTestHarness, and we
diff --git a/tools/src/main/java/org/apache/kafka/tools/AclCommand.java
b/tools/src/main/java/org/apache/kafka/tools/AclCommand.java
new file mode 100644
index 00000000000..d54970ad042
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/AclCommand.java
@@ -0,0 +1,605 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AccessControlEntryFilter;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.Resource;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourcePatternFilter;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.security.authorizer.AclEntry;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import joptsimple.AbstractOptionSpec;
+import joptsimple.OptionException;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.ValueConversionException;
+import joptsimple.util.EnumConverter;
+
+import static org.apache.kafka.common.acl.AclOperation.CREATE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.acl.AclOperation.IDEMPOTENT_WRITE;
+import static org.apache.kafka.common.acl.AclOperation.READ;
+import static org.apache.kafka.common.acl.AclOperation.WRITE;
+import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
+import static org.apache.kafka.common.acl.AclPermissionType.DENY;
+
+public class AclCommand {
+
+ private static final ResourcePatternFilter CLUSTER_RESOURCE_FILTER =
+ new ResourcePatternFilter(ResourceType.CLUSTER,
Resource.CLUSTER_NAME, PatternType.LITERAL);
+ private static final String NL = System.lineSeparator();
+
+ public static void main(String[] args) {
+ AclCommandOptions opts = new AclCommandOptions(args);
+ AdminClientService aclCommandService = new AdminClientService(opts);
+ try (Admin admin = Admin.create(adminConfigs(opts))) {
+ if (opts.options.has(opts.addOpt)) {
+ aclCommandService.addAcls(admin);
+ } else if (opts.options.has(opts.removeOpt)) {
+ aclCommandService.removeAcls(admin);
+ } else if (opts.options.has(opts.listOpt)) {
+ aclCommandService.listAcls(admin);
+ }
+ } catch (Throwable e) {
+ System.out.println("Error while executing ACL command: " +
e.getMessage());
+ System.out.println(Utils.stackTrace(e));
+ Exit.exit(1);
+ }
+ }
+
+ private static Properties adminConfigs(AclCommandOptions opts) throws
IOException {
+ Properties props = new Properties();
+ if (opts.options.has(opts.commandConfigOpt)) {
+ props =
Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt));
+ }
+ if (opts.options.has(opts.bootstrapServerOpt)) {
+ props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
opts.options.valueOf(opts.bootstrapServerOpt));
+ } else {
+ props.put(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG,
opts.options.valueOf(opts.bootstrapControllerOpt));
+ }
+ return props;
+ }
+
+ private static class AdminClientService {
+
+ private final AclCommandOptions opts;
+
+ AdminClientService(AclCommandOptions opts) {
+ this.opts = opts;
+ }
+
+ void addAcls(Admin admin) throws ExecutionException,
InterruptedException {
+ Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcl =
getResourceToAcls(opts);
+ for (Map.Entry<ResourcePattern, Set<AccessControlEntry>> entry :
resourceToAcl.entrySet()) {
+ ResourcePattern resource = entry.getKey();
+ Set<AccessControlEntry> acls = entry.getValue();
+ System.out.println("Adding ACLs for resource `" + resource +
"`: " + NL + " " + acls.stream().map(a -> "\t" +
a).collect(Collectors.joining(NL)) + NL);
+ Collection<AclBinding> aclBindings = acls.stream().map(acl ->
new AclBinding(resource, acl)).collect(Collectors.toList());
+ admin.createAcls(aclBindings).all().get();
+ }
+ }
+
+ void removeAcls(Admin admin) throws ExecutionException,
InterruptedException {
+ Map<ResourcePatternFilter, Set<AccessControlEntry>> filterToAcl =
getResourceFilterToAcls(opts);
+ for (Map.Entry<ResourcePatternFilter, Set<AccessControlEntry>>
entry : filterToAcl.entrySet()) {
+ ResourcePatternFilter filter = entry.getKey();
+ Set<AccessControlEntry> acls = entry.getValue();
+ if (acls.isEmpty()) {
+ if (confirmAction(opts, "Are you sure you want to delete
all ACLs for resource filter `" + filter + "`? (y/n)")) {
+ removeAcls(admin, acls, filter);
+ }
+ } else {
+ String msg = "Are you sure you want to remove ACLs: " + NL
+
+ " " + acls.stream().map(a -> "\t" +
a).collect(Collectors.joining(NL)) + NL +
+ " from resource filter `" + filter + "`? (y/n)";
+ if (confirmAction(opts, msg)) {
+ removeAcls(admin, acls, filter);
+ }
+ }
+ }
+ }
+
+ private void listAcls(Admin admin) throws ExecutionException,
InterruptedException {
+ Set<ResourcePatternFilter> filters = getResourceFilter(opts,
false);
+ Set<KafkaPrincipal> listPrincipals = getPrincipals(opts,
opts.listPrincipalsOpt);
+ Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcls =
getAcls(admin, filters);
+
+ if (listPrincipals.isEmpty()) {
+ printResourceAcls(resourceToAcls);
+ } else {
+ listPrincipals.forEach(principal -> {
+ System.out.println("ACLs for principal `" + principal +
"`");
+ Map<ResourcePattern, Set<AccessControlEntry>>
filteredResourceToAcls = resourceToAcls.entrySet().stream()
+ .map(entry -> {
+ ResourcePattern resource = entry.getKey();
+ Set<AccessControlEntry> acls =
entry.getValue().stream()
+ .filter(acl ->
principal.toString().equals(acl.principal()))
+ .collect(Collectors.toSet());
+ return new AbstractMap.SimpleEntry<>(resource,
acls);
+ })
+ .filter(entry -> !entry.getValue().isEmpty())
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+ printResourceAcls(filteredResourceToAcls);
+ });
+ }
+ }
+
+ private static void printResourceAcls(Map<ResourcePattern,
Set<AccessControlEntry>> resourceToAcls) {
+ resourceToAcls.forEach((resource, acls) ->
+ System.out.println("Current ACLs for resource `" + resource +
"`:" + NL +
+ acls.stream().map(acl -> "\t" +
acl).collect(Collectors.joining(NL)) + NL)
+ );
+ }
+
+ private static void removeAcls(Admin adminClient,
Set<AccessControlEntry> acls, ResourcePatternFilter filter) throws
ExecutionException, InterruptedException {
+ if (acls.isEmpty()) {
+ adminClient.deleteAcls(Collections.singletonList(new
AclBindingFilter(filter, AccessControlEntryFilter.ANY))).all().get();
+ } else {
+ List<AclBindingFilter> aclBindingFilters =
acls.stream().map(acl -> new AclBindingFilter(filter,
acl.toFilter())).collect(Collectors.toList());
+ adminClient.deleteAcls(aclBindingFilters).all().get();
+ }
+ }
+
+ private Map<ResourcePattern, Set<AccessControlEntry>> getAcls(Admin
adminClient, Set<ResourcePatternFilter> filters) throws ExecutionException,
InterruptedException {
+ Collection<AclBinding> aclBindings;
+ if (filters.isEmpty()) {
+ aclBindings =
adminClient.describeAcls(AclBindingFilter.ANY).values().get();
+ } else {
+ aclBindings = new ArrayList<>();
+ for (ResourcePatternFilter filter : filters) {
+ aclBindings.addAll(adminClient.describeAcls(new
AclBindingFilter(filter, AccessControlEntryFilter.ANY)).values().get());
+ }
+ }
+
+ Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcls = new
HashMap<>();
+ for (AclBinding aclBinding : aclBindings) {
+ ResourcePattern resource = aclBinding.pattern();
+ Set<AccessControlEntry> acls =
resourceToAcls.getOrDefault(resource, new HashSet<>());
+ acls.add(aclBinding.entry());
+ resourceToAcls.put(resource, acls);
+ }
+ return resourceToAcls;
+ }
+ }
+
+ private static Map<ResourcePattern, Set<AccessControlEntry>>
getResourceToAcls(AclCommandOptions opts) {
+ PatternType patternType =
opts.options.valueOf(opts.resourcePatternType);
+ if (!patternType.isSpecific()) {
+ CommandLineUtils.printUsageAndExit(opts.parser, "A
'--resource-pattern-type' value of '" + patternType + "' is not valid when
adding acls.");
+ }
+ Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcl =
getResourceFilterToAcls(opts).entrySet().stream()
+ .collect(Collectors.toMap(entry -> new
ResourcePattern(entry.getKey().resourceType(), entry.getKey().name(),
entry.getKey().patternType()),
+ Map.Entry::getValue));
+
+ if (resourceToAcl.values().stream().anyMatch(Set::isEmpty)) {
+ CommandLineUtils.printUsageAndExit(opts.parser, "You must specify
one of: --allow-principal, --deny-principal when trying to add ACLs.");
+ }
+ return resourceToAcl;
+ }
+
+ private static Map<ResourcePatternFilter, Set<AccessControlEntry>>
getResourceFilterToAcls(AclCommandOptions opts) {
+ Map<ResourcePatternFilter, Set<AccessControlEntry>> resourceToAcls =
new HashMap<>();
+ //if none of the --producer or --consumer options are specified , just
construct ACLs from CLI options.
+ if (!opts.options.has(opts.producerOpt) &&
!opts.options.has(opts.consumerOpt)) {
+ resourceToAcls.putAll(getCliResourceFilterToAcls(opts));
+ }
+ //users are allowed to specify both --producer and --consumer options
in a single command.
+ if (opts.options.has(opts.producerOpt)) {
+ resourceToAcls.putAll(getProducerResourceFilterToAcls(opts));
+ }
+ if (opts.options.has(opts.consumerOpt)) {
+ getConsumerResourceFilterToAcls(opts).forEach((k, v) -> {
+ Set<AccessControlEntry> existingAcls =
resourceToAcls.getOrDefault(k, new HashSet<>());
+ existingAcls.addAll(v);
+ resourceToAcls.put(k, existingAcls);
+ });
+ }
+ validateOperation(opts, resourceToAcls);
+ return resourceToAcls;
+ }
+
+ private static Map<ResourcePatternFilter, Set<AccessControlEntry>>
getProducerResourceFilterToAcls(AclCommandOptions opts) {
+ Set<ResourcePatternFilter> filters = getResourceFilter(opts, true);
+
+ Set<ResourcePatternFilter> topics = filters.stream().filter(f ->
f.resourceType() == ResourceType.TOPIC).collect(Collectors.toSet());
+ Set<ResourcePatternFilter> transactionalIds =
filters.stream().filter(f -> f.resourceType() ==
ResourceType.TRANSACTIONAL_ID).collect(Collectors.toSet());
+ boolean enableIdempotence = opts.options.has(opts.idempotentOpt);
+
+ Set<AccessControlEntry> topicAcls = getAcl(opts, new
HashSet<>(Arrays.asList(WRITE, DESCRIBE, CREATE)));
+ Set<AccessControlEntry> transactionalIdAcls = getAcl(opts, new
HashSet<>(Arrays.asList(WRITE, DESCRIBE)));
+
+ //Write, Describe, Create permission on topics, Write, Describe on
transactionalIds
+ Map<ResourcePatternFilter, Set<AccessControlEntry>> result = new
HashMap<>();
+ for (ResourcePatternFilter topic : topics) {
+ result.put(topic, topicAcls);
+ }
+ for (ResourcePatternFilter transactionalId : transactionalIds) {
+ result.put(transactionalId, transactionalIdAcls);
+ }
+ if (enableIdempotence) {
+ result.put(CLUSTER_RESOURCE_FILTER, getAcl(opts,
Collections.singleton(IDEMPOTENT_WRITE)));
+ }
+ return result;
+ }
+
+ private static Map<ResourcePatternFilter, Set<AccessControlEntry>>
getConsumerResourceFilterToAcls(AclCommandOptions opts) {
+ Set<ResourcePatternFilter> filters = getResourceFilter(opts, true);
+ Set<ResourcePatternFilter> topics = filters.stream().filter(f ->
f.resourceType() == ResourceType.TOPIC).collect(Collectors.toSet());
+ Set<ResourcePatternFilter> groups = filters.stream().filter(f ->
f.resourceType() == ResourceType.GROUP).collect(Collectors.toSet());
+
+ //Read, Describe on topic, Read on consumerGroup
+ Set<AccessControlEntry> topicAcls = getAcl(opts, new
HashSet<>(Arrays.asList(READ, DESCRIBE)));
+ Set<AccessControlEntry> groupAcls = getAcl(opts,
Collections.singleton(READ));
+
+ Map<ResourcePatternFilter, Set<AccessControlEntry>> result = new
HashMap<>();
+ for (ResourcePatternFilter topic : topics) {
+ result.put(topic, topicAcls);
+ }
+ for (ResourcePatternFilter group : groups) {
+ result.put(group, groupAcls);
+ }
+ return result;
+ }
+
+ private static Map<ResourcePatternFilter, Set<AccessControlEntry>>
getCliResourceFilterToAcls(AclCommandOptions opts) {
+ Set<AccessControlEntry> acls = getAcl(opts);
+ Set<ResourcePatternFilter> filters = getResourceFilter(opts, true);
+ return filters.stream().collect(Collectors.toMap(filter -> filter,
filter -> acls));
+ }
+
+ private static Set<AccessControlEntry> getAcl(AclCommandOptions opts,
Set<AclOperation> operations) {
+ Set<KafkaPrincipal> allowedPrincipals = getPrincipals(opts,
opts.allowPrincipalsOpt);
+ Set<KafkaPrincipal> deniedPrincipals = getPrincipals(opts,
opts.denyPrincipalsOpt);
+ Set<String> allowedHosts = getHosts(opts, opts.allowHostsOpt,
opts.allowPrincipalsOpt);
+ Set<String> deniedHosts = getHosts(opts, opts.denyHostsOpt,
opts.denyPrincipalsOpt);
+
+ Set<AccessControlEntry> acls = new HashSet<>();
+ if (!allowedHosts.isEmpty() && !allowedPrincipals.isEmpty()) {
+ acls.addAll(getAcls(allowedPrincipals, ALLOW, operations,
allowedHosts));
+ }
+ if (!deniedHosts.isEmpty() && !deniedPrincipals.isEmpty()) {
+ acls.addAll(getAcls(deniedPrincipals, DENY, operations,
deniedHosts));
+ }
+ return acls;
+ }
+
+ private static Set<AccessControlEntry> getAcl(AclCommandOptions opts) {
+ Set<AclOperation> operations =
opts.options.valuesOf(opts.operationsOpt)
+ .stream().map(operation ->
SecurityUtils.operation(operation.trim()))
+ .collect(Collectors.toSet());
+ return getAcl(opts, operations);
+ }
+
+ static Set<AccessControlEntry> getAcls(Set<KafkaPrincipal> principals,
+ AclPermissionType
permissionType,
+ Set<AclOperation>
operations,
+ Set<String> hosts) {
+ Set<AccessControlEntry> acls = new HashSet<>();
+ for (KafkaPrincipal principal : principals) {
+ for (AclOperation operation : operations) {
+ for (String host : hosts) {
+ acls.add(new AccessControlEntry(principal.toString(),
host, operation, permissionType));
+ }
+ }
+ }
+ return acls;
+ }
+
+ private static Set<String> getHosts(AclCommandOptions opts,
OptionSpec<String> hostOptionSpec, OptionSpec<String> principalOptionSpec) {
+ if (opts.options.has(hostOptionSpec)) {
+ return
opts.options.valuesOf(hostOptionSpec).stream().map(String::trim).collect(Collectors.toSet());
+ } else if (opts.options.has(principalOptionSpec)) {
+ return Collections.singleton(AclEntry.WILDCARD_HOST);
+ } else {
+ return Collections.emptySet();
+ }
+ }
+
+ private static Set<KafkaPrincipal> getPrincipals(AclCommandOptions opts,
OptionSpec<String> principalOptionSpec) {
+ if (opts.options.has(principalOptionSpec)) {
+ return opts.options.valuesOf(principalOptionSpec).stream()
+ .map(s -> SecurityUtils.parseKafkaPrincipal(s.trim()))
+ .collect(Collectors.toSet());
+ } else {
+ return Collections.emptySet();
+ }
+ }
+
+ private static Set<ResourcePatternFilter>
getResourceFilter(AclCommandOptions opts, boolean dieIfNoResourceFound) {
+ PatternType patternType =
opts.options.valueOf(opts.resourcePatternType);
+ Set<ResourcePatternFilter> resourceFilters = new HashSet<>();
+ if (opts.options.has(opts.topicOpt)) {
+ opts.options.valuesOf(opts.topicOpt).forEach(topic ->
resourceFilters.add(new ResourcePatternFilter(ResourceType.TOPIC, topic.trim(),
patternType)));
+ }
+ if (patternType == PatternType.LITERAL &&
(opts.options.has(opts.clusterOpt) || opts.options.has(opts.idempotentOpt))) {
+ resourceFilters.add(CLUSTER_RESOURCE_FILTER);
+ }
+ if (opts.options.has(opts.groupOpt)) {
+ opts.options.valuesOf(opts.groupOpt).forEach(group ->
resourceFilters.add(new ResourcePatternFilter(ResourceType.GROUP, group.trim(),
patternType)));
+ }
+ if (opts.options.has(opts.transactionalIdOpt)) {
+
opts.options.valuesOf(opts.transactionalIdOpt).forEach(transactionalId ->
+ resourceFilters.add(new
ResourcePatternFilter(ResourceType.TRANSACTIONAL_ID, transactionalId,
patternType)));
+ }
+ if (opts.options.has(opts.delegationTokenOpt)) {
+ opts.options.valuesOf(opts.delegationTokenOpt).forEach(token ->
resourceFilters.add(new ResourcePatternFilter(ResourceType.DELEGATION_TOKEN,
token.trim(), patternType)));
+ }
+ if (opts.options.has(opts.userPrincipalOpt)) {
+ opts.options.valuesOf(opts.userPrincipalOpt).forEach(user ->
resourceFilters.add(new ResourcePatternFilter(ResourceType.USER, user.trim(),
patternType)));
+ }
+ if (resourceFilters.isEmpty() && dieIfNoResourceFound) {
+ CommandLineUtils.printUsageAndExit(opts.parser, "You must provide
at least one resource: --topic <topic> or --cluster or --group <group> or
--delegation-token <Delegation Token ID>");
+ }
+ return resourceFilters;
+ }
+
+ private static boolean confirmAction(AclCommandOptions opts, String msg) {
+ if (opts.options.has(opts.forceOpt)) {
+ return true;
+ }
+ System.out.println(msg);
+ return System.console().readLine().equalsIgnoreCase("y");
+ }
+
+ private static void validateOperation(AclCommandOptions opts,
Map<ResourcePatternFilter, Set<AccessControlEntry>> resourceToAcls) {
+ for (Map.Entry<ResourcePatternFilter, Set<AccessControlEntry>> entry :
resourceToAcls.entrySet()) {
+ ResourcePatternFilter resource = entry.getKey();
+ Set<AccessControlEntry> acls = entry.getValue();
+ Collection<AclOperation> validOps = new
HashSet<>(AclEntry.supportedOperations(resource.resourceType()));
+ validOps.add(AclOperation.ALL);
+ Set<AclOperation> unsupportedOps = new HashSet<>();
+ for (AccessControlEntry acl : acls) {
+ if (!validOps.contains(acl.operation())) {
+ unsupportedOps.add(acl.operation());
+ }
+ }
+ if (!unsupportedOps.isEmpty()) {
+ String msg = String.format("ResourceType %s only supports
operations %s", resource.resourceType(), validOps);
+ CommandLineUtils.printUsageAndExit(opts.parser, msg);
+ }
+ }
+ }
+
+ public static class AclCommandOptions extends CommandDefaultOptions {
+
+ private final OptionSpec<String> bootstrapServerOpt;
+ private final OptionSpec<String> bootstrapControllerOpt;
+ private final OptionSpec<String> commandConfigOpt;
+ private final OptionSpec<String> topicOpt;
+ private final OptionSpecBuilder clusterOpt;
+ private final OptionSpec<String> groupOpt;
+ private final OptionSpec<String> transactionalIdOpt;
+ private final OptionSpecBuilder idempotentOpt;
+ private final OptionSpec<String> delegationTokenOpt;
+ private final OptionSpec<PatternType> resourcePatternType;
+ private final OptionSpecBuilder addOpt;
+ private final OptionSpecBuilder removeOpt;
+ private final OptionSpecBuilder listOpt;
+ private final OptionSpec<String> operationsOpt;
+ private final OptionSpec<String> allowPrincipalsOpt;
+ private final OptionSpec<String> denyPrincipalsOpt;
+ private final OptionSpec<String> listPrincipalsOpt;
+ private final OptionSpec<String> allowHostsOpt;
+ private final OptionSpec<String> denyHostsOpt;
+ private final OptionSpecBuilder producerOpt;
+ private final OptionSpecBuilder consumerOpt;
+ private final OptionSpecBuilder forceOpt;
+ private final OptionSpec<String> userPrincipalOpt;
+
+ @SuppressWarnings("this-escape")
+ public AclCommandOptions(String[] args) {
+ super(args);
+ bootstrapServerOpt = parser.accepts("bootstrap-server", "A list of
host/port pairs to use for establishing the connection to the Kafka cluster." +
+ " This list should be in the form
host1:port1,host2:port2,... This config is required for acl management using
admin client API.")
+ .withRequiredArg()
+ .describedAs("server to connect to")
+ .ofType(String.class);
+ bootstrapControllerOpt = parser.accepts("bootstrap-controller", "A
list of host/port pairs to use for establishing the connection to the Kafka
cluster." +
+ " This list should be in the form
host1:port1,host2:port2,... This config is required for acl management using
admin client API.")
+ .withRequiredArg()
+ .describedAs("controller to connect to")
+ .ofType(String.class);
+ commandConfigOpt = parser.accepts("command-config", "A property
file containing configs to be passed to Admin Client.")
+ .withOptionalArg()
+ .describedAs("command-config")
+ .ofType(String.class);
+ topicOpt = parser.accepts("topic", "topic to which ACLs should be
added or removed. " +
+ "A value of '*' indicates ACL should apply to all
topics.")
+ .withRequiredArg()
+ .describedAs("topic")
+ .ofType(String.class);
+ clusterOpt = parser.accepts("cluster", "Add/Remove cluster ACLs.");
+ groupOpt = parser.accepts("group", "Consumer Group to which the
ACLs should be added or removed. " +
+ "A value of '*' indicates the ACLs should apply to
all groups.")
+ .withRequiredArg()
+ .describedAs("group")
+ .ofType(String.class);
+ transactionalIdOpt = parser.accepts("transactional-id", "The
transactionalId to which ACLs should " +
+ "be added or removed. A value of '*' indicates the
ACLs should apply to all transactionalIds.")
+ .withRequiredArg()
+ .describedAs("transactional-id")
+ .ofType(String.class);
+ idempotentOpt = parser.accepts("idempotent", "Enable idempotence
for the producer. This should be " +
+ "used in combination with the --producer option. Note that
idempotence is enabled automatically if " +
+ "the producer is authorized to a particular
transactional-id.");
+ delegationTokenOpt = parser.accepts("delegation-token",
"Delegation token to which ACLs should be added or removed. " +
+ "A value of '*' indicates ACL should apply to all
tokens.")
+ .withRequiredArg()
+ .describedAs("delegation-token")
+ .ofType(String.class);
+ resourcePatternType = parser.accepts("resource-pattern-type", "The
type of the resource pattern or pattern filter. " +
+ "When adding acls, this should be a specific
pattern type, e.g. 'literal' or 'prefixed'. " +
+ "When listing or removing acls, a specific pattern
type can be used to list or remove acls from specific resource patterns, " +
+ "or use the filter values of 'any' or 'match',
where 'any' will match any pattern type, but will match the resource name
exactly, " +
+ "where as 'match' will perform pattern matching to
list or remove all acls that affect the supplied resource(s). " +
+ "WARNING: 'match', when used in combination with
the '--remove' switch, should be used with care.")
+ .withRequiredArg()
+ .ofType(String.class)
+ .withValuesConvertedBy(new PatternTypeConverter())
+ .defaultsTo(PatternType.LITERAL);
+ addOpt = parser.accepts("add", "Indicates you are trying to add
ACLs.");
+ removeOpt = parser.accepts("remove", "Indicates you are trying to
remove ACLs.");
+ listOpt = parser.accepts("list", "List ACLs for the specified
resource, use --topic <topic> or --group <group> or --cluster to specify a
resource.");
+ operationsOpt = parser.accepts("operation", "Operation that is
being allowed or denied. Valid operation names are: " + NL +
+ AclEntry.ACL_OPERATIONS.stream().map(o -> "\t" +
SecurityUtils.operationName(o)).collect(Collectors.joining(NL)) + NL)
+ .withRequiredArg()
+ .ofType(String.class)
+ .defaultsTo(SecurityUtils.operationName(AclOperation.ALL));
+ allowPrincipalsOpt = parser.accepts("allow-principal", "principal
is in principalType:name format." +
+ " Note that principalType must be supported by the
Authorizer being used." +
+ " For example, User:'*' is the wild card
indicating all users.")
+ .withRequiredArg()
+ .describedAs("allow-principal")
+ .ofType(String.class);
+ denyPrincipalsOpt = parser.accepts("deny-principal", "principal is
in principalType:name format. " +
+ "By default anyone not added through
--allow-principal is denied access. " +
+ "You only need to use this option as negation to
already allowed set. " +
+ "Note that principalType must be supported by the
Authorizer being used. " +
+ "For example if you wanted to allow access to all
users in the system but not test-user you can define an ACL that " +
+ "allows access to User:'*' and specify
--deny-principal=User:[email protected]. " +
+ "AND PLEASE REMEMBER DENY RULES TAKES PRECEDENCE
OVER ALLOW RULES.")
+ .withRequiredArg()
+ .describedAs("deny-principal")
+ .ofType(String.class);
+ listPrincipalsOpt = parser.accepts("principal", "List ACLs for the
specified principal. principal is in principalType:name format." +
+ " Note that principalType must be supported by the
Authorizer being used. Multiple --principal option can be passed.")
+ .withOptionalArg()
+ .describedAs("principal")
+ .ofType(String.class);
+ allowHostsOpt = parser.accepts("allow-host", "Host from which
principals listed in --allow-principal will have access. " +
+ "If you have specified --allow-principal then the
default for this option will be set to '*' which allows access from all hosts.")
+ .withRequiredArg()
+ .describedAs("allow-host")
+ .ofType(String.class);
+ denyHostsOpt = parser.accepts("deny-host", "Host from which
principals listed in --deny-principal will be denied access. " +
+ "If you have specified --deny-principal then the
default for this option will be set to '*' which denies access from all hosts.")
+ .withRequiredArg()
+ .describedAs("deny-host")
+ .ofType(String.class);
+ producerOpt = parser.accepts("producer", "Convenience option to
add/remove ACLs for producer role. " +
+ "This will generate ACLs that allows WRITE,DESCRIBE and
CREATE on topic.");
+ consumerOpt = parser.accepts("consumer", "Convenience option to
add/remove ACLs for consumer role. " +
+ "This will generate ACLs that allows READ,DESCRIBE on
topic and READ on group.");
+ forceOpt = parser.accepts("force", "Assume Yes to all queries and
do not prompt.");
+ userPrincipalOpt = parser.accepts("user-principal", "Specifies a
user principal as a resource in relation with the operation. For instance " +
+ "one could grant CreateTokens or DescribeTokens
permission on a given user principal.")
+ .withRequiredArg()
+ .describedAs("user-principal")
+ .ofType(String.class);
+
+ try {
+ options = parser.parse(args);
+ } catch (OptionException e) {
+ CommandLineUtils.printUsageAndExit(parser, e.getMessage());
+ }
+ checkArgs();
+ }
+
+ void checkArgs() {
+ CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to
manage acls on kafka.");
+
+ if (options.has(bootstrapServerOpt) &&
options.has(bootstrapControllerOpt)) {
+ CommandLineUtils.printUsageAndExit(parser, "Only one of
--bootstrap-server or --bootstrap-controller must be specified");
+ }
+ if (!options.has(bootstrapServerOpt) &&
!options.has(bootstrapControllerOpt)) {
+ CommandLineUtils.printUsageAndExit(parser, "One of
--bootstrap-server or --bootstrap-controller must be specified");
+ }
+ List<AbstractOptionSpec<?>> mutuallyExclusiveOptions =
Arrays.asList(addOpt, removeOpt, listOpt);
+ long mutuallyExclusiveOptionsCount =
mutuallyExclusiveOptions.stream()
+ .filter(abstractOptionSpec ->
options.has(abstractOptionSpec))
+ .count();
+ if (mutuallyExclusiveOptionsCount != 1) {
+ CommandLineUtils.printUsageAndExit(parser, "Command must
include exactly one action: --list, --add, --remove. ");
+ }
+ CommandLineUtils.checkInvalidArgs(parser, options, listOpt,
producerOpt, consumerOpt, allowHostsOpt, allowPrincipalsOpt, denyHostsOpt,
denyPrincipalsOpt);
+
+ //when --producer or --consumer is specified , user should not
specify operations as they are inferred and we also disallow --deny-principals
and --deny-hosts.
+ CommandLineUtils.checkInvalidArgs(parser, options, producerOpt,
operationsOpt, denyPrincipalsOpt, denyHostsOpt);
+ CommandLineUtils.checkInvalidArgs(parser, options, consumerOpt,
operationsOpt, denyPrincipalsOpt, denyHostsOpt);
+
+ if (options.has(listPrincipalsOpt) && !options.has(listOpt)) {
+ CommandLineUtils.printUsageAndExit(parser, "The --principal
option is only available if --list is set");
+ }
+ if (options.has(producerOpt) && !options.has(topicOpt)) {
+ CommandLineUtils.printUsageAndExit(parser, "With --producer
you must specify a --topic");
+ }
+ if (options.has(idempotentOpt) && !options.has(producerOpt)) {
+ CommandLineUtils.printUsageAndExit(parser, "The --idempotent
option is only available if --producer is set");
+ }
+ if (options.has(consumerOpt) && (!options.has(topicOpt) ||
!options.has(groupOpt) || (!options.has(producerOpt) &&
(options.has(clusterOpt) || options.has(transactionalIdOpt))))) {
+ CommandLineUtils.printUsageAndExit(parser, "With --consumer
you must specify a --topic and a --group and no --cluster or --transactional-id
option should be specified.");
+ }
+ }
+ }
+
+ static class PatternTypeConverter extends EnumConverter<PatternType> {
+
+ PatternTypeConverter() {
+ super(PatternType.class);
+ }
+
+ @Override
+ public PatternType convert(String value) {
+ PatternType patternType = super.convert(value);
+ if (patternType.isUnknown())
+ throw new ValueConversionException("Unknown
resource-pattern-type: " + value);
+
+ return patternType;
+ }
+
+ @Override
+ public String valuePattern() {
+ List<PatternType> values = Arrays.asList(PatternType.values());
+ List<PatternType> filteredValues = values.stream()
+ .filter(type -> type != PatternType.UNKNOWN)
+ .collect(Collectors.toList());
+ return filteredValues.stream()
+ .map(Object::toString)
+ .collect(Collectors.joining("|"));
+ }
+ }
+}
diff --git a/core/src/test/java/kafka/admin/AclCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/AclCommandTest.java
similarity index 64%
rename from core/src/test/java/kafka/admin/AclCommandTest.java
rename to tools/src/test/java/org/apache/kafka/tools/AclCommandTest.java
index e71c348c272..12f4dab8801 100644
--- a/core/src/test/java/kafka/admin/AclCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/AclCommandTest.java
@@ -14,9 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package kafka.admin;
-
-import kafka.admin.AclCommand.AclCommandOptions;
+package org.apache.kafka.tools;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBindingFilter;
@@ -44,15 +42,9 @@ import org.apache.logging.log4j.Level;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
-import java.io.PrintStream;
-import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -63,9 +55,6 @@ import java.util.stream.Collectors;
import javax.management.InstanceAlreadyExistsException;
-import scala.Console;
-import scala.jdk.javaapi.CollectionConverters;
-
import static org.apache.kafka.common.acl.AccessControlEntryFilter.ANY;
import static org.apache.kafka.common.acl.AclOperation.ALTER;
import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS;
@@ -120,102 +109,98 @@ public class AclCommandTest {
private static final String TOPIC = "--topic";
private static final String RESOURCE_PATTERN_TYPE =
"--resource-pattern-type";
private static final KafkaPrincipal PRINCIPAL =
SecurityUtils.parseKafkaPrincipal("User:test2");
- private static final Set<KafkaPrincipal> USERS = new
HashSet<>(Arrays.asList(
-
SecurityUtils.parseKafkaPrincipal("User:CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown"),
- PRINCIPAL,
- SecurityUtils.parseKafkaPrincipal("User:CN=\\#User with special
chars in CN : (\\, \\+ \" \\ \\< \\> \\; ')")
- ));
- private static final Set<String> HOSTS = new
HashSet<>(Arrays.asList("host1", "host2"));
- private static final List<String> ALLOW_HOST_COMMAND =
Arrays.asList("--allow-host", "host1", "--allow-host", "host2");
- private static final List<String> DENY_HOST_COMMAND =
Arrays.asList("--deny-host", "host1", "--deny-host", "host2");
+ private static final Set<KafkaPrincipal> USERS = Set.of(
+
SecurityUtils.parseKafkaPrincipal("User:CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown"),
+ PRINCIPAL,
+ SecurityUtils.parseKafkaPrincipal("User:CN=\\#User with special chars
in CN : (\\, \\+ \" \\ \\< \\> \\; ')")
+ );
+ private static final Set<String> HOSTS = Set.of("host1", "host2");
+ private static final List<String> ALLOW_HOST_COMMAND =
List.of("--allow-host", "host1", "--allow-host", "host2");
+ private static final List<String> DENY_HOST_COMMAND =
List.of("--deny-host", "host1", "--deny-host", "host2");
private static final ResourcePattern CLUSTER_RESOURCE = new
ResourcePattern(CLUSTER, Resource.CLUSTER_NAME, LITERAL);
- private static final Set<ResourcePattern> TOPIC_RESOURCES = new
HashSet<>(Arrays.asList(
- new ResourcePattern(ResourceType.TOPIC, "test-1", LITERAL),
- new ResourcePattern(ResourceType.TOPIC, "test-2", LITERAL)
- ));
- private static final Set<ResourcePattern> GROUP_RESOURCES = new
HashSet<>(Arrays.asList(
- new ResourcePattern(ResourceType.GROUP, "testGroup-1", LITERAL),
- new ResourcePattern(ResourceType.GROUP, "testGroup-2", LITERAL)
- ));
- private static final Set<ResourcePattern> TRANSACTIONAL_ID_RESOURCES = new
HashSet<>(Arrays.asList(
+ private static final Set<ResourcePattern> TOPIC_RESOURCES = Set.of(
+ new ResourcePattern(ResourceType.TOPIC, "test-1", LITERAL),
+ new ResourcePattern(ResourceType.TOPIC, "test-2", LITERAL)
+ );
+ private static final Set<ResourcePattern> GROUP_RESOURCES = Set.of(
+ new ResourcePattern(ResourceType.GROUP, "testGroup-1", LITERAL),
+ new ResourcePattern(ResourceType.GROUP, "testGroup-2", LITERAL)
+ );
+ private static final Set<ResourcePattern> TRANSACTIONAL_ID_RESOURCES =
Set.of(
new ResourcePattern(TRANSACTIONAL_ID, "t0", LITERAL),
new ResourcePattern(TRANSACTIONAL_ID, "t1", LITERAL)
- ));
- private static final Set<ResourcePattern> TOKEN_RESOURCES = new
HashSet<>(Arrays.asList(
- new ResourcePattern(DELEGATION_TOKEN, "token1", LITERAL),
- new ResourcePattern(DELEGATION_TOKEN, "token2", LITERAL)
- ));
- private static final Set<ResourcePattern> USER_RESOURCES = new
HashSet<>(Arrays.asList(
- new ResourcePattern(USER, "User:test-user1", LITERAL),
- new ResourcePattern(USER, "User:test-user2", LITERAL)
- ));
-
- private static final Map<Set<ResourcePattern>, List<String>>
RESOURCE_TO_COMMAND = new HashMap<Set<ResourcePattern>, List<String>>() {{
- put(TOPIC_RESOURCES, Arrays.asList(TOPIC, "test-1", TOPIC,
"test-2"));
- put(Collections.singleton(CLUSTER_RESOURCE),
Collections.singletonList("--cluster"));
- put(GROUP_RESOURCES, Arrays.asList(GROUP, "testGroup-1", GROUP,
"testGroup-2"));
- put(TRANSACTIONAL_ID_RESOURCES,
Arrays.asList("--transactional-id", "t0", "--transactional-id", "t1"));
- put(TOKEN_RESOURCES, Arrays.asList("--delegation-token", "token1",
"--delegation-token", "token2"));
- put(USER_RESOURCES, Arrays.asList("--user-principal",
"User:test-user1", "--user-principal", "User:test-user2"));
- }};
-
- private static final Map<Set<ResourcePattern>,
Map.Entry<Set<AclOperation>, List<String>>> RESOURCE_TO_OPERATIONS =
- new HashMap<Set<ResourcePattern>, Map.Entry<Set<AclOperation>,
List<String>>>() {{
- put(TOPIC_RESOURCES, new SimpleImmutableEntry<>(
- new HashSet<>(Arrays.asList(READ, WRITE, CREATE,
DESCRIBE, DELETE, DESCRIBE_CONFIGS, ALTER_CONFIGS, ALTER)),
- Arrays.asList(OPERATION, "Read", OPERATION, "Write",
OPERATION, "Create",
- OPERATION, "Describe", OPERATION, "Delete",
OPERATION, "DescribeConfigs",
- OPERATION, "AlterConfigs", OPERATION, "Alter"))
- );
- put(Collections.singleton(CLUSTER_RESOURCE), new
SimpleImmutableEntry<>(
- new HashSet<>(Arrays.asList(CREATE, CLUSTER_ACTION,
DESCRIBE_CONFIGS, ALTER_CONFIGS, IDEMPOTENT_WRITE, ALTER, DESCRIBE)),
- Arrays.asList(OPERATION, "Create", OPERATION,
"ClusterAction", OPERATION, "DescribeConfigs",
- OPERATION, "AlterConfigs", OPERATION,
"IdempotentWrite", OPERATION, "Alter", OPERATION, "Describe"))
- );
- put(GROUP_RESOURCES, new SimpleImmutableEntry<>(
- new HashSet<>(Arrays.asList(READ, DESCRIBE, DELETE)),
- Arrays.asList(OPERATION, "Read", OPERATION,
"Describe", OPERATION, "Delete"))
- );
- put(TRANSACTIONAL_ID_RESOURCES, new SimpleImmutableEntry<>(
- new HashSet<>(Arrays.asList(DESCRIBE, WRITE)),
- Arrays.asList(OPERATION, "Describe", OPERATION,
"Write"))
- );
- put(TOKEN_RESOURCES, new
SimpleImmutableEntry<>(Collections.singleton(DESCRIBE),
Arrays.asList(OPERATION, "Describe")));
- put(USER_RESOURCES, new SimpleImmutableEntry<>(
- new HashSet<>(Arrays.asList(CREATE_TOKENS,
DESCRIBE_TOKENS)),
- Arrays.asList(OPERATION, "CreateTokens", OPERATION,
"DescribeTokens"))
- );
- }};
-
- private static final Map<Set<ResourcePattern>, Set<AccessControlEntry>>
CONSUMER_RESOURCE_TO_ACLS =
- new HashMap<Set<ResourcePattern>, Set<AccessControlEntry>>() {{
- put(TOPIC_RESOURCES,
asJavaSet(AclCommand.getAcls(asScalaSet(USERS), ALLOW,
- asScalaSet(new HashSet<>(Arrays.asList(READ,
DESCRIBE))), asScalaSet(HOSTS))));
- put(GROUP_RESOURCES,
asJavaSet(AclCommand.getAcls(asScalaSet(USERS), ALLOW,
- asScalaSet(Collections.singleton(READ)),
asScalaSet(HOSTS))));
- }};
-
- private static final Map<List<String>, Map<Set<ResourcePattern>,
Set<AccessControlEntry>>> CMD_TO_RESOURCES_TO_ACL =
- new HashMap<List<String>, Map<Set<ResourcePattern>,
Set<AccessControlEntry>>>() {{
- put(Collections.singletonList(PRODUCER),
producerResourceToAcls(false));
- put(Arrays.asList(PRODUCER, IDEMPOTENT),
producerResourceToAcls(true));
- put(Collections.singletonList(CONSUMER),
CONSUMER_RESOURCE_TO_ACLS);
- put(Arrays.asList(PRODUCER, CONSUMER),
-
CONSUMER_RESOURCE_TO_ACLS.entrySet().stream().map(entry -> {
- Set<AccessControlEntry> value = new
HashSet<>(entry.getValue());
- value.addAll(producerResourceToAcls(false)
- .getOrDefault(entry.getKey(),
Collections.emptySet()));
- return new SimpleEntry<>(entry.getKey(), value);
- }).collect(Collectors.toMap(Entry::getKey,
Entry::getValue)));
- put(Arrays.asList(PRODUCER, IDEMPOTENT, CONSUMER),
-
CONSUMER_RESOURCE_TO_ACLS.entrySet().stream().map(entry -> {
- Set<AccessControlEntry> value = new
HashSet<>(entry.getValue());
- value.addAll(producerResourceToAcls(true)
- .getOrDefault(entry.getKey(),
Collections.emptySet()));
- return new SimpleEntry<>(entry.getKey(), value);
- }).collect(Collectors.toMap(Entry::getKey,
Entry::getValue)));
- }};
+ );
+ private static final Set<ResourcePattern> TOKEN_RESOURCES = Set.of(
+ new ResourcePattern(DELEGATION_TOKEN, "token1", LITERAL),
+ new ResourcePattern(DELEGATION_TOKEN, "token2", LITERAL)
+ );
+ private static final Set<ResourcePattern> USER_RESOURCES = Set.of(
+ new ResourcePattern(USER, "User:test-user1", LITERAL),
+ new ResourcePattern(USER, "User:test-user2", LITERAL)
+ );
+
+ private static final Map<Set<ResourcePattern>, List<String>>
RESOURCE_TO_COMMAND = Map.of(
+ TOPIC_RESOURCES, List.of(TOPIC, "test-1", TOPIC, "test-2"),
+ Set.of(CLUSTER_RESOURCE), List.of("--cluster"),
+ GROUP_RESOURCES, List.of(GROUP, "testGroup-1", GROUP, "testGroup-2"),
+ TRANSACTIONAL_ID_RESOURCES, List.of("--transactional-id", "t0",
"--transactional-id", "t1"),
+ TOKEN_RESOURCES, List.of("--delegation-token", "token1",
"--delegation-token", "token2"),
+ USER_RESOURCES, List.of("--user-principal", "User:test-user1",
"--user-principal", "User:test-user2")
+ );
+
+ private static final Map<Set<ResourcePattern>,
Map.Entry<Set<AclOperation>, List<String>>> RESOURCE_TO_OPERATIONS = Map.of(
+ TOPIC_RESOURCES, Map.entry(
+ Set.of(READ, WRITE, CREATE, DESCRIBE, DELETE, DESCRIBE_CONFIGS,
ALTER_CONFIGS, ALTER),
+ List.of(OPERATION, "Read", OPERATION, "Write", OPERATION, "Create",
+ OPERATION, "Describe", OPERATION, "Delete", OPERATION,
"DescribeConfigs",
+ OPERATION, "AlterConfigs", OPERATION, "Alter")),
+ Set.of(CLUSTER_RESOURCE), Map.entry(
+ Set.of(CREATE, CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS,
IDEMPOTENT_WRITE, ALTER, DESCRIBE),
+ List.of(OPERATION, "Create", OPERATION, "ClusterAction",
OPERATION, "DescribeConfigs",
+ OPERATION, "AlterConfigs", OPERATION, "IdempotentWrite",
OPERATION, "Alter", OPERATION, "Describe")),
+ GROUP_RESOURCES, Map.entry(
+ Set.of(READ, DESCRIBE, DELETE),
+ List.of(OPERATION, "Read", OPERATION, "Describe", OPERATION,
"Delete")),
+ TRANSACTIONAL_ID_RESOURCES, Map.entry(
+ Set.of(DESCRIBE, WRITE),
+ List.of(OPERATION, "Describe", OPERATION, "Write")),
+ TOKEN_RESOURCES, Map.entry(
+ Set.of(DESCRIBE),
+ List.of(OPERATION, "Describe")),
+ USER_RESOURCES, Map.entry(
+ Set.of(CREATE_TOKENS, DESCRIBE_TOKENS),
+ List.of(OPERATION, "CreateTokens", OPERATION, "DescribeTokens"))
+ );
+
+ private static final Map<Set<ResourcePattern>, Set<AccessControlEntry>>
CONSUMER_RESOURCE_TO_ACLS = Map.of(
+ TOPIC_RESOURCES, AclCommand.getAcls(USERS, ALLOW, Set.of(READ,
DESCRIBE), HOSTS),
+ GROUP_RESOURCES, AclCommand.getAcls(USERS, ALLOW, Set.of(READ), HOSTS)
+ );
+
+ private static final Map<List<String>, Map<Set<ResourcePattern>,
Set<AccessControlEntry>>> CMD_TO_RESOURCES_TO_ACL = Map.of(
+ List.of(PRODUCER), producerResourceToAcls(false),
+ List.of(PRODUCER, IDEMPOTENT), producerResourceToAcls(true),
+ List.of(CONSUMER), CONSUMER_RESOURCE_TO_ACLS,
+ List.of(PRODUCER, CONSUMER),
+
CONSUMER_RESOURCE_TO_ACLS.entrySet().stream().collect(Collectors.toMap(
+ Map.Entry::getKey,
+ entry -> {
+ Set<AccessControlEntry> value = new
HashSet<>(entry.getValue());
+
value.addAll(producerResourceToAcls(false).getOrDefault(entry.getKey(),
Set.of()));
+ return value;
+ }
+ )),
+ List.of(PRODUCER, IDEMPOTENT, CONSUMER),
+
CONSUMER_RESOURCE_TO_ACLS.entrySet().stream().collect(Collectors.toMap(
+ Map.Entry::getKey,
+ entry -> {
+ Set<AccessControlEntry> value = new
HashSet<>(entry.getValue());
+
value.addAll(producerResourceToAcls(true).getOrDefault(entry.getKey(),
Set.of()));
+ return value;
+ }
+ ))
+ );
@ClusterTest
public void testAclCliWithAdminAPI(ClusterInstance cluster) throws
InterruptedException {
@@ -295,7 +280,7 @@ public class AclCommandTest {
@Test
public void testUseBootstrapServerOptWithBootstrapControllerOpt() {
assertInitializeInvalidOptionsExitCodeAndMsg(
- Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST,
BOOTSTRAP_CONTROLLER, LOCALHOST),
+ List.of(BOOTSTRAP_SERVER, LOCALHOST, BOOTSTRAP_CONTROLLER,
LOCALHOST),
"Only one of --bootstrap-server or --bootstrap-controller must
be specified"
);
}
@@ -303,7 +288,7 @@ public class AclCommandTest {
@Test
public void testUseWithoutBootstrapServerOptAndBootstrapControllerOpt() {
assertInitializeInvalidOptionsExitCodeAndMsg(
- Collections.emptyList(),
+ List.of(ADD),
"One of --bootstrap-server or --bootstrap-controller must be
specified"
);
}
@@ -311,14 +296,14 @@ public class AclCommandTest {
@Test
public void testExactlyOneAction() {
String errMsg = "Command must include exactly one action: --list,
--add, --remove. ";
-
assertInitializeInvalidOptionsExitCodeAndMsg(Arrays.asList(BOOTSTRAP_SERVER,
LOCALHOST, ADD, LIST), errMsg);
-
assertInitializeInvalidOptionsExitCodeAndMsg(Arrays.asList(BOOTSTRAP_SERVER,
LOCALHOST, ADD, LIST, REMOVE), errMsg);
+ assertInitializeInvalidOptionsExitCodeAndMsg(List.of(BOOTSTRAP_SERVER,
LOCALHOST, ADD, LIST), errMsg);
+ assertInitializeInvalidOptionsExitCodeAndMsg(List.of(BOOTSTRAP_SERVER,
LOCALHOST, ADD, LIST, REMOVE), errMsg);
}
@Test
public void testUseListPrincipalsOptWithoutListOpt() {
assertInitializeInvalidOptionsExitCodeAndMsg(
- Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, ADD, "--principal",
"User:CN=client"),
+ List.of(BOOTSTRAP_SERVER, LOCALHOST, ADD, "--principal",
"User:CN=client"),
"The --principal option is only available if --list is set"
);
}
@@ -326,7 +311,7 @@ public class AclCommandTest {
@Test
public void testUseProducerOptWithoutTopicOpt() {
assertInitializeInvalidOptionsExitCodeAndMsg(
- Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, ADD, PRODUCER),
+ List.of(BOOTSTRAP_SERVER, LOCALHOST, ADD, PRODUCER),
"With --producer you must specify a --topic"
);
}
@@ -334,7 +319,7 @@ public class AclCommandTest {
@Test
public void testUseIdempotentOptWithoutProducerOpt() {
assertInitializeInvalidOptionsExitCodeAndMsg(
- Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, ADD, IDEMPOTENT),
+ List.of(BOOTSTRAP_SERVER, LOCALHOST, ADD, IDEMPOTENT),
"The --idempotent option is only available if --producer is
set"
);
}
@@ -342,24 +327,24 @@ public class AclCommandTest {
@Test
public void testUseConsumerOptWithoutRequiredOpt() {
assertInitializeInvalidOptionsExitCodeAndMsg(
- Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, ADD, CONSUMER),
+ List.of(BOOTSTRAP_SERVER, LOCALHOST, ADD, CONSUMER),
"With --consumer you must specify a --topic and a --group and
no --cluster or --transactional-id option should be specified."
);
- checkNotThrow(Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, ADD,
CONSUMER, TOPIC, "test-topic", GROUP, "test-group"));
+ checkNotThrow(List.of(BOOTSTRAP_SERVER, LOCALHOST, ADD, CONSUMER,
TOPIC, "test-topic", GROUP, "test-group"));
}
@Test
public void testInvalidArgs() {
assertInitializeInvalidOptionsExitCodeAndMsg(
- Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, LIST, PRODUCER),
+ List.of(BOOTSTRAP_SERVER, LOCALHOST, LIST, PRODUCER),
"Option \"[list]\" can't be used with option \"[producer]\""
);
assertInitializeInvalidOptionsExitCodeAndMsg(
- Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, ADD, PRODUCER,
OPERATION),
+ List.of(BOOTSTRAP_SERVER, LOCALHOST, ADD, PRODUCER, OPERATION,
"all"),
"Option \"[producer]\" can't be used with option
\"[operation]\""
);
assertInitializeInvalidOptionsExitCodeAndMsg(
- Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, ADD, CONSUMER,
OPERATION, TOPIC, "test-topic", GROUP, "test-group"),
+ List.of(BOOTSTRAP_SERVER, LOCALHOST, ADD, CONSUMER, OPERATION,
TOPIC, "test-topic", GROUP, "test-group"),
"Option \"[consumer]\" can't be used with option
\"[operation]\""
);
}
@@ -394,8 +379,7 @@ public class AclCommandTest {
}
private void testAclsOnPrefixedResources(ClusterInstance cluster,
List<String> cmdArgs) throws InterruptedException {
- List<String> cmd = Arrays.asList("--allow-principal",
PRINCIPAL.toString(), PRODUCER, TOPIC, "Test-",
- RESOURCE_PATTERN_TYPE, "Prefixed");
+ List<String> cmd = List.of("--allow-principal", PRINCIPAL.toString(),
PRODUCER, TOPIC, "Test-", RESOURCE_PATTERN_TYPE, "Prefixed");
List<String> args = new ArrayList<>(cmdArgs);
args.addAll(cmd);
@@ -406,7 +390,7 @@ public class AclCommandTest {
AccessControlEntry describeAcl = new
AccessControlEntry(PRINCIPAL.toString(), WILDCARD_HOST, DESCRIBE, ALLOW);
AccessControlEntry createAcl = new
AccessControlEntry(PRINCIPAL.toString(), WILDCARD_HOST, CREATE, ALLOW);
cluster.waitAcls(new AclBindingFilter(new
ResourcePattern(ResourceType.TOPIC, "Test-", PREFIXED).toFilter(), ANY),
- Arrays.asList(writeAcl, describeAcl, createAcl));
+ List.of(writeAcl, describeAcl, createAcl));
args = new ArrayList<>(cmdArgs);
args.addAll(cmd);
@@ -415,45 +399,42 @@ public class AclCommandTest {
callMain(args);
cluster.waitAcls(new AclBindingFilter(new
ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PREFIXED).toFilter(),
ANY),
- Collections.emptySet());
+ Set.of());
cluster.waitAcls(new AclBindingFilter(new
ResourcePattern(ResourceType.TOPIC, "Test-", PREFIXED).toFilter(), ANY),
- Collections.emptySet());
+ Set.of());
}
private static Map<Set<ResourcePattern>, Set<AccessControlEntry>>
producerResourceToAcls(boolean enableIdempotence) {
- Map<Set<ResourcePattern>, Set<AccessControlEntry>> result = new
HashMap<>();
- result.put(TOPIC_RESOURCES,
asJavaSet(AclCommand.getAcls(asScalaSet(USERS), ALLOW, asScalaSet(
- new HashSet<>(Arrays.asList(WRITE, DESCRIBE, CREATE))),
asScalaSet(HOSTS))));
- result.put(TRANSACTIONAL_ID_RESOURCES,
asJavaSet(AclCommand.getAcls(asScalaSet(USERS), ALLOW, asScalaSet(
- new HashSet<>(Arrays.asList(WRITE, DESCRIBE))),
asScalaSet(HOSTS))));
- result.put(Collections.singleton(CLUSTER_RESOURCE),
asJavaSet(AclCommand.getAcls(asScalaSet(USERS), ALLOW,
+ return Map.of(
+ TOPIC_RESOURCES, AclCommand.getAcls(USERS, ALLOW, Set.of(WRITE,
DESCRIBE, CREATE), HOSTS),
+ TRANSACTIONAL_ID_RESOURCES, AclCommand.getAcls(USERS, ALLOW,
Set.of(WRITE, DESCRIBE), HOSTS),
+ Set.of(CLUSTER_RESOURCE), AclCommand.getAcls(USERS, ALLOW,
enableIdempotence
- ? asScalaSet(Collections.singleton(IDEMPOTENT_WRITE))
- : asScalaSet(Collections.emptySet()),
asScalaSet(HOSTS))));
- return result;
+ ? Set.of(IDEMPOTENT_WRITE)
+ : Set.of(), HOSTS));
}
private List<String> adminArgs(String bootstrapServer, Optional<File>
commandConfig) {
- List<String> adminArgs = new
ArrayList<>(Arrays.asList(BOOTSTRAP_SERVER, bootstrapServer));
- commandConfig.ifPresent(file ->
adminArgs.addAll(Arrays.asList(COMMAND_CONFIG, file.getAbsolutePath())));
+ List<String> adminArgs = new ArrayList<>(List.of(BOOTSTRAP_SERVER,
bootstrapServer));
+ commandConfig.ifPresent(file ->
adminArgs.addAll(List.of(COMMAND_CONFIG, file.getAbsolutePath())));
return adminArgs;
}
private List<String> adminArgsWithBootstrapController(String
bootstrapController, Optional<File> commandConfig) {
- List<String> adminArgs = new
ArrayList<>(Arrays.asList(BOOTSTRAP_CONTROLLER, bootstrapController));
- commandConfig.ifPresent(file ->
adminArgs.addAll(Arrays.asList(COMMAND_CONFIG, file.getAbsolutePath())));
+ List<String> adminArgs = new ArrayList<>(List.of(BOOTSTRAP_CONTROLLER,
bootstrapController));
+ commandConfig.ifPresent(file ->
adminArgs.addAll(List.of(COMMAND_CONFIG, file.getAbsolutePath())));
return adminArgs;
}
private Map.Entry<String, String> callMain(List<String> args) {
- return grabConsoleOutputAndError(() ->
AclCommand.main(args.toArray(new String[0])));
+ return ToolsTestUtils.grabConsoleOutputAndError(() ->
AclCommand.main(args.toArray(new String[0])));
}
private void testAclCli(ClusterInstance cluster, List<String> cmdArgs)
throws InterruptedException {
for (Map.Entry<Set<ResourcePattern>, List<String>> entry :
RESOURCE_TO_COMMAND.entrySet()) {
Set<ResourcePattern> resources = entry.getKey();
List<String> resourceCmd = entry.getValue();
- Set<AclPermissionType> permissionTypes = new
HashSet<>(Arrays.asList(ALLOW, DENY));
+ Set<AclPermissionType> permissionTypes = Set.of(ALLOW, DENY);
for (AclPermissionType permissionType : permissionTypes) {
Map.Entry<Set<AclOperation>, List<String>> operationToCmd =
RESOURCE_TO_OPERATIONS.get(resources);
Map.Entry<Set<AccessControlEntry>, List<String>> aclToCommand
= getAclToCommand(permissionType, operationToCmd.getKey());
@@ -494,8 +475,8 @@ public class AclCommandTest {
String resourceType = resource.resourceType().toString();
List<String> cmd = resource == CLUSTER_RESOURCE
- ? Collections.singletonList("kafka-cluster")
- : resourceCmd.stream().filter(s ->
!s.startsWith("--")).collect(Collectors.toList());
+ ? List.of("kafka-cluster")
+ : resourceCmd.stream().filter(s ->
!s.startsWith("--")).toList();
cmd.forEach(name -> {
String expected = String.format(
@@ -509,36 +490,37 @@ public class AclCommandTest {
private void testPatternTypes(List<String> cmdArgs) {
Exit.setExitProcedure((status, message) -> {
- if ((int) status == 1)
+ if (status == 1)
throw new RuntimeException("Exiting command");
else
throw new AssertionError("Unexpected exit with status " +
status);
});
try {
-
Arrays.stream(PatternType.values()).sequential().forEach(patternType -> {
+ for (PatternType patternType : PatternType.values()) {
List<String> addCmd = new ArrayList<>(cmdArgs);
- addCmd.addAll(Arrays.asList("--allow-principal",
PRINCIPAL.toString(), PRODUCER, TOPIC, "Test",
+ addCmd.addAll(List.of("--allow-principal",
PRINCIPAL.toString(), PRODUCER, TOPIC, "Test",
ADD, RESOURCE_PATTERN_TYPE, patternType.toString()));
verifyPatternType(addCmd, patternType.isSpecific());
List<String> listCmd = new ArrayList<>(cmdArgs);
- listCmd.addAll(Arrays.asList(TOPIC, "Test", LIST,
RESOURCE_PATTERN_TYPE, patternType.toString()));
+ listCmd.addAll(List.of(TOPIC, "Test", LIST,
RESOURCE_PATTERN_TYPE, patternType.toString()));
verifyPatternType(listCmd, patternType != PatternType.UNKNOWN);
List<String> removeCmd = new ArrayList<>(cmdArgs);
- removeCmd.addAll(Arrays.asList(TOPIC, "Test", "--force",
REMOVE, RESOURCE_PATTERN_TYPE, patternType.toString()));
+ removeCmd.addAll(List.of(TOPIC, "Test", "--force", REMOVE,
RESOURCE_PATTERN_TYPE, patternType.toString()));
verifyPatternType(removeCmd, patternType !=
PatternType.UNKNOWN);
- });
+ }
} finally {
Exit.resetExitProcedure();
}
}
private void verifyPatternType(List<String> cmd, boolean isValid) {
- if (isValid)
+ if (isValid) {
callMain(cmd);
- else
+ } else {
assertThrows(RuntimeException.class, () -> callMain(cmd));
+ }
}
private void testRemove(
@@ -554,7 +536,7 @@ public class AclCommandTest {
Map.Entry<String, String> out = callMain(args);
assertEquals("", out.getValue());
for (ResourcePattern resource : resources) {
- cluster.waitAcls(new AclBindingFilter(resource.toFilter(), ANY),
Collections.emptySet());
+ cluster.waitAcls(new AclBindingFilter(resource.toFilter(), ANY),
Set.of());
}
}
@@ -562,8 +544,8 @@ public class AclCommandTest {
AclPermissionType permissionType,
Set<AclOperation> operations
) {
- return new SimpleImmutableEntry<>(
- asJavaSet(AclCommand.getAcls(asScalaSet(USERS),
permissionType, asScalaSet(operations), asScalaSet(HOSTS))),
+ return Map.entry(
+ AclCommand.getAcls(USERS, permissionType, operations, HOSTS),
getCmd(permissionType)
);
}
@@ -575,44 +557,12 @@ public class AclCommandTest {
List<String> fullCmd = new ArrayList<>();
for (KafkaPrincipal user : USERS) {
fullCmd.addAll(cmd);
- fullCmd.addAll(Arrays.asList(principalCmd, user.toString()));
+ fullCmd.addAll(List.of(principalCmd, user.toString()));
}
return fullCmd;
}
- /**
- * Capture both the console output and console error during the execution
of the provided function.
- */
- private static Map.Entry<String, String>
grabConsoleOutputAndError(Runnable runnable) {
- ByteArrayOutputStream outBuf = new ByteArrayOutputStream();
- ByteArrayOutputStream errBuf = new ByteArrayOutputStream();
- PrintStream out = new PrintStream(outBuf);
- PrintStream err = new PrintStream(errBuf);
- try {
- Console.withOut(out, () -> {
- Console.withErr(err, () -> {
- runnable.run();
- return null;
- });
- return null;
- });
- } finally {
- out.flush();
- err.flush();
- }
-
- return new SimpleImmutableEntry<>(outBuf.toString(),
errBuf.toString());
- }
-
- private static <T> scala.collection.immutable.Set<T> asScalaSet(Set<T>
javaSet) {
- return CollectionConverters.asScala(javaSet).toSet();
- }
-
- private static <T> Set<T> asJavaSet(scala.collection.immutable.Set<T>
scalaSet) {
- return CollectionConverters.asJava(scalaSet);
- }
-
private void assertInitializeInvalidOptionsExitCodeAndMsg(List<String>
args, String expectedMsg) {
Exit.setExitProcedure((exitCode, message) -> {
assertEquals(1, exitCode);
@@ -620,7 +570,7 @@ public class AclCommandTest {
throw new RuntimeException();
});
try {
- assertThrows(RuntimeException.class, () -> new
AclCommandOptions(args.toArray(new String[0])).checkArgs());
+ assertThrows(RuntimeException.class, () -> new
AclCommand.AclCommandOptions(args.toArray(new String[0])).checkArgs());
} finally {
Exit.resetExitProcedure();
}
@@ -628,15 +578,15 @@ public class AclCommandTest {
private void checkNotThrow(List<String> args) {
AtomicReference<Integer> exitStatus = new AtomicReference<>();
- org.apache.kafka.common.utils.Exit.setExitProcedure((status, __) -> {
+ Exit.setExitProcedure((status, __) -> {
exitStatus.set(status);
throw new RuntimeException();
});
try {
- assertDoesNotThrow(() -> new AclCommandOptions(args.toArray(new
String[0])).checkArgs());
+ assertDoesNotThrow(() -> new
AclCommand.AclCommandOptions(args.toArray(new String[0])).checkArgs());
assertNull(exitStatus.get());
} finally {
- org.apache.kafka.common.utils.Exit.resetExitProcedure();
+ Exit.resetExitProcedure();
}
}
}