This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 57eb5fd7dc3 KAFKA-14587: Move AclCommand to tools (#17880)
57eb5fd7dc3 is described below

commit 57eb5fd7dc3d4a16b0c50255e0e5e7f2dd265cc6
Author: Mickael Maison <[email protected]>
AuthorDate: Sat Dec 14 20:05:46 2024 +0100

    KAFKA-14587: Move AclCommand to tools (#17880)
    
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 bin/kafka-acls.sh                                  |   2 +-
 bin/windows/kafka-acls.bat                         |   2 +-
 checkstyle/import-control.xml                      |   2 +
 checkstyle/suppressions.xml                        |   4 +-
 .../apache/kafka/server/authorizer/Authorizer.java |   6 +-
 core/src/main/scala/kafka/admin/AclCommand.scala   | 510 -----------------
 .../kafka/api/EndToEndAuthorizationTest.scala      |   2 +-
 .../java/org/apache/kafka/tools/AclCommand.java    | 605 +++++++++++++++++++++
 .../org/apache/kafka/tools}/AclCommandTest.java    | 322 +++++------
 9 files changed, 750 insertions(+), 705 deletions(-)

diff --git a/bin/kafka-acls.sh b/bin/kafka-acls.sh
index 8fa65542e10..ffbb1e19810 100755
--- a/bin/kafka-acls.sh
+++ b/bin/kafka-acls.sh
@@ -14,4 +14,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-exec $(dirname $0)/kafka-run-class.sh kafka.admin.AclCommand "$@"
+exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.AclCommand "$@"
diff --git a/bin/windows/kafka-acls.bat b/bin/windows/kafka-acls.bat
index 8f0be85c045..12c4a9a69a7 100644
--- a/bin/windows/kafka-acls.bat
+++ b/bin/windows/kafka-acls.bat
@@ -14,4 +14,4 @@ rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied.
 rem See the License for the specific language governing permissions and
 rem limitations under the License.
 
-"%~dp0kafka-run-class.bat" kafka.admin.AclCommand %*
+"%~dp0kafka-run-class.bat" org.apache.kafka.tools.AclCommand %*
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 0b9e7dd717b..e3382fb3db3 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -290,6 +290,8 @@
     <allow pkg="org.apache.kafka.server.util" />
     <allow pkg="kafka.admin" />
     <allow pkg="kafka.server" />
+    <allow pkg="org.apache.kafka.metadata.authorizer" />
+    <allow pkg="org.apache.kafka.security.authorizer" />
     <allow pkg="org.apache.kafka.storage.internals" />
     <allow pkg="org.apache.kafka.server.config" />
     <allow pkg="org.apache.kafka.server.common" />
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 2f8ff2d84c2..2d05fac1e70 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -269,11 +269,11 @@
     <suppress checks="ClassDataAbstractionCoupling"
               files="VerifiableConsumer.java"/>
     <suppress checks="CyclomaticComplexity"
-              
files="(ConsoleConsumer|DefaultMessageFormatter|StreamsResetter|ProducerPerformance|Agent).java"/>
+              
files="(AclCommand|ConsoleConsumer|DefaultMessageFormatter|StreamsResetter|ProducerPerformance|Agent).java"/>
     <suppress checks="BooleanExpressionComplexity"
               files="(StreamsResetter|DefaultMessageFormatter).java"/>
     <suppress checks="NPathComplexity"
-              
files="(DefaultMessageFormatter|ProducerPerformance|StreamsResetter|Agent|TransactionalMessageCopier|ReplicaVerificationTool|LineMessageReader).java"/>
+              
files="(AclCommand|DefaultMessageFormatter|ProducerPerformance|StreamsResetter|Agent|TransactionalMessageCopier|ReplicaVerificationTool|LineMessageReader).java"/>
     <suppress checks="ImportControl"
               files="SignalLogger.java"/>
     <suppress checks="IllegalImport"
diff --git 
a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java 
b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
index 58241c51ea0..6dcc80f1e1d 100644
--- a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
+++ b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
@@ -112,8 +112,7 @@ public interface Authorizer extends Configurable, Closeable 
{
      * to process the update synchronously on the request thread.
      *
      * @param requestContext Request context if the ACL is being created by a 
broker to handle
-     *        a client request to create ACLs. This may be null if ACLs are 
created directly in ZooKeeper
-     *        using AclCommand.
+     *        a client request to create ACLs.
      * @param aclBindings ACL bindings to create
      *
      * @return Create result for each ACL binding in the same order as in the 
input list. Each result
@@ -131,8 +130,7 @@ public interface Authorizer extends Configurable, Closeable 
{
      * Refer to the authorizer implementation docs for details on concurrent 
update guarantees.
      *
      * @param requestContext Request context if the ACL is being deleted by a 
broker to handle
-     *        a client request to delete ACLs. This may be null if ACLs are 
deleted directly in ZooKeeper
-     *        using AclCommand.
+     *        a client request to delete ACLs.
      * @param aclBindingFilters Filters to match ACL bindings that are to be 
deleted
      *
      * @return Delete result for each filter in the same order as in the input 
list.
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala 
b/core/src/main/scala/kafka/admin/AclCommand.scala
deleted file mode 100644
index e83ffe30ec6..00000000000
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ /dev/null
@@ -1,510 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.admin
-
-import java.util.Properties
-import joptsimple._
-import joptsimple.util.EnumConverter
-import kafka.utils._
-import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
-import org.apache.kafka.common.acl._
-import org.apache.kafka.common.acl.AclOperation._
-import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
-import org.apache.kafka.common.resource.{PatternType, ResourcePattern, 
ResourcePatternFilter, Resource => JResource, ResourceType => JResourceType}
-import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.utils.{Exit, Utils, SecurityUtils => 
JSecurityUtils}
-import org.apache.kafka.security.authorizer.AclEntry
-import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
-
-import scala.jdk.CollectionConverters._
-import scala.collection.mutable
-import scala.io.StdIn
-
-object AclCommand extends Logging {
-
-  private val ClusterResourceFilter = new 
ResourcePatternFilter(JResourceType.CLUSTER, JResource.CLUSTER_NAME, 
PatternType.LITERAL)
-
-  private val Newline = scala.util.Properties.lineSeparator
-
-  def main(args: Array[String]): Unit = {
-
-    val opts = new AclCommandOptions(args)
-
-    CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to manage 
acls on kafka.")
-
-    opts.checkArgs()
-
-    val aclCommandService = new AdminClientService(opts)
-
-    try {
-      if (opts.options.has(opts.addOpt))
-        aclCommandService.addAcls()
-      else if (opts.options.has(opts.removeOpt))
-        aclCommandService.removeAcls()
-      else if (opts.options.has(opts.listOpt))
-        aclCommandService.listAcls()
-    } catch {
-      case e: Throwable =>
-        println(s"Error while executing ACL command: ${e.getMessage}")
-        println(Utils.stackTrace(e))
-        Exit.exit(1)
-    }
-  }
-
-  private class AdminClientService(val opts: AclCommandOptions) extends 
Logging {
-
-    private def withAdminClient(opts: AclCommandOptions)(f: Admin => Unit): 
Unit = {
-      val props = if (opts.options.has(opts.commandConfigOpt))
-        Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
-      else
-        new Properties()
-
-      if (opts.options.has(opts.bootstrapServerOpt)) {
-        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
opts.options.valueOf(opts.bootstrapServerOpt))
-      } else {
-        props.put(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, 
opts.options.valueOf(opts.bootstrapControllerOpt))
-      }
-      val adminClient = Admin.create(props)
-
-      try {
-        f(adminClient)
-      } finally {
-        adminClient.close()
-      }
-    }
-
-    def addAcls(): Unit = {
-      val resourceToAcl = getResourceToAcls(opts)
-      withAdminClient(opts) { adminClient =>
-        for ((resource, acls) <- resourceToAcl) {
-          println(s"Adding ACLs for resource `$resource`: $Newline 
${acls.map("\t" + _).mkString(Newline)} $Newline")
-          val aclBindings = acls.map(acl => new AclBinding(resource, 
acl)).asJavaCollection
-          adminClient.createAcls(aclBindings).all().get()
-        }
-      }
-    }
-
-    def removeAcls(): Unit = {
-      withAdminClient(opts) { adminClient =>
-        val filterToAcl = getResourceFilterToAcls(opts)
-
-        for ((filter, acls) <- filterToAcl) {
-          if (acls.isEmpty) {
-            if (confirmAction(opts, s"Are you sure you want to delete all ACLs 
for resource filter `$filter`? (y/n)"))
-              removeAcls(adminClient, acls, filter)
-          } else {
-            if (confirmAction(opts, s"Are you sure you want to remove ACLs: 
$Newline ${acls.map("\t" + _).mkString(Newline)} $Newline from resource filter 
`$filter`? (y/n)"))
-              removeAcls(adminClient, acls, filter)
-          }
-        }
-      }
-    }
-
-    def listAcls(): Unit = {
-      withAdminClient(opts) { adminClient =>
-        listAcls(adminClient)
-      }
-    }
-
-    private def listAcls(adminClient: Admin): Unit = {
-      val filters = getResourceFilter(opts, dieIfNoResourceFound = false)
-      val listPrincipals = getPrincipals(opts, opts.listPrincipalsOpt)
-      val resourceToAcls = getAcls(adminClient, filters)
-
-      if (listPrincipals.isEmpty) {
-        printResourceAcls(resourceToAcls)
-      } else {
-        listPrincipals.foreach{principal =>
-          println(s"ACLs for principal `$principal`")
-          val filteredResourceToAcls = resourceToAcls.map { case (resource, 
acls) =>
-            resource -> acls.filter(acl => 
principal.toString.equals(acl.principal))
-          }.filter { case (_, acls) => acls.nonEmpty }
-          printResourceAcls(filteredResourceToAcls)
-        }
-      }
-    }
-
-    private def printResourceAcls(resourceToAcls: Map[ResourcePattern, 
Set[AccessControlEntry]]): Unit = {
-      for ((resource, acls) <- resourceToAcls)
-        println(s"Current ACLs for resource `$resource`: $Newline 
${acls.map("\t" + _).mkString(Newline)} $Newline")
-    }
-
-    private def removeAcls(adminClient: Admin, acls: Set[AccessControlEntry], 
filter: ResourcePatternFilter): Unit = {
-      if (acls.isEmpty)
-        adminClient.deleteAcls(List(new AclBindingFilter(filter, 
AccessControlEntryFilter.ANY)).asJava).all().get()
-      else {
-        val aclBindingFilters = acls.map(acl => new AclBindingFilter(filter, 
acl.toFilter)).toList.asJava
-        adminClient.deleteAcls(aclBindingFilters).all().get()
-      }
-    }
-
-    private def getAcls(adminClient: Admin, filters: 
Set[ResourcePatternFilter]): Map[ResourcePattern, Set[AccessControlEntry]] = {
-      val aclBindings =
-        if (filters.isEmpty) 
adminClient.describeAcls(AclBindingFilter.ANY).values().get().asScala.toList
-        else {
-          val results = for (filter <- filters) yield {
-            adminClient.describeAcls(new AclBindingFilter(filter, 
AccessControlEntryFilter.ANY)).values().get().asScala.toList
-          }
-          results.reduceLeft(_ ++ _)
-        }
-
-      val resourceToAcls = mutable.Map[ResourcePattern, 
Set[AccessControlEntry]]().withDefaultValue(Set())
-
-      aclBindings.foreach(aclBinding => resourceToAcls(aclBinding.pattern()) = 
resourceToAcls(aclBinding.pattern()) + aclBinding.entry())
-      resourceToAcls.toMap
-    }
-  }
-
-  private def getResourceToAcls(opts: AclCommandOptions): Map[ResourcePattern, 
Set[AccessControlEntry]] = {
-    val patternType = opts.options.valueOf(opts.resourcePatternType)
-    if (!patternType.isSpecific)
-      CommandLineUtils.printUsageAndExit(opts.parser, s"A 
'--resource-pattern-type' value of '$patternType' is not valid when adding 
acls.")
-
-    val resourceToAcl = getResourceFilterToAcls(opts).map {
-      case (filter, acls) =>
-        new ResourcePattern(filter.resourceType(), filter.name(), 
filter.patternType()) -> acls
-    }
-
-    if (resourceToAcl.values.exists(_.isEmpty))
-      CommandLineUtils.printUsageAndExit(opts.parser, "You must specify one 
of: --allow-principal, --deny-principal when trying to add ACLs.")
-
-    resourceToAcl
-  }
-
-  private def getResourceFilterToAcls(opts: AclCommandOptions): 
Map[ResourcePatternFilter, Set[AccessControlEntry]] = {
-    var resourceToAcls = Map.empty[ResourcePatternFilter, 
Set[AccessControlEntry]]
-
-    //if none of the --producer or --consumer options are specified , just 
construct ACLs from CLI options.
-    if (!opts.options.has(opts.producerOpt) && 
!opts.options.has(opts.consumerOpt)) {
-      resourceToAcls ++= getCliResourceFilterToAcls(opts)
-    }
-
-    //users are allowed to specify both --producer and --consumer options in a 
single command.
-    if (opts.options.has(opts.producerOpt))
-      resourceToAcls ++= getProducerResourceFilterToAcls(opts)
-
-    if (opts.options.has(opts.consumerOpt))
-      resourceToAcls ++= getConsumerResourceFilterToAcls(opts).map { case (k, 
v) => k -> (v ++ resourceToAcls.getOrElse(k, Set.empty[AccessControlEntry])) }
-
-    validateOperation(opts, resourceToAcls)
-
-    resourceToAcls
-  }
-
-  private def getProducerResourceFilterToAcls(opts: AclCommandOptions): 
Map[ResourcePatternFilter, Set[AccessControlEntry]] = {
-    val filters = getResourceFilter(opts)
-
-    val topics = filters.filter(_.resourceType == JResourceType.TOPIC)
-    val transactionalIds = filters.filter(_.resourceType == 
JResourceType.TRANSACTIONAL_ID)
-    val enableIdempotence = opts.options.has(opts.idempotentOpt)
-
-    val topicAcls = getAcl(opts, Set(WRITE, DESCRIBE, CREATE))
-    val transactionalIdAcls = getAcl(opts, Set(WRITE, DESCRIBE))
-
-    //Write, Describe, Create permission on topics, Write, Describe on 
transactionalIds
-    topics.map(_ -> topicAcls).toMap ++
-      transactionalIds.map(_ -> transactionalIdAcls).toMap ++
-        (if (enableIdempotence)
-          Map(ClusterResourceFilter -> getAcl(opts, Set(IDEMPOTENT_WRITE)))
-        else
-          Map.empty)
-  }
-
-  private def getConsumerResourceFilterToAcls(opts: AclCommandOptions): 
Map[ResourcePatternFilter, Set[AccessControlEntry]] = {
-    val filters = getResourceFilter(opts)
-
-    val topics = filters.filter(_.resourceType == JResourceType.TOPIC)
-    val groups = filters.filter(_.resourceType == JResourceType.GROUP)
-
-    //Read, Describe on topic, Read on consumerGroup
-
-    val acls = getAcl(opts, Set(READ, DESCRIBE))
-
-    topics.map(_ -> acls).toMap[ResourcePatternFilter, 
Set[AccessControlEntry]] ++
-      groups.map(_ -> getAcl(opts, Set(READ))).toMap[ResourcePatternFilter, 
Set[AccessControlEntry]]
-  }
-
-  private def getCliResourceFilterToAcls(opts: AclCommandOptions): 
Map[ResourcePatternFilter, Set[AccessControlEntry]] = {
-    val acls = getAcl(opts)
-    val filters = getResourceFilter(opts)
-    filters.map(_ -> acls).toMap
-  }
-
-  private def getAcl(opts: AclCommandOptions, operations: Set[AclOperation]): 
Set[AccessControlEntry] = {
-    val allowedPrincipals = getPrincipals(opts, opts.allowPrincipalsOpt)
-
-    val deniedPrincipals = getPrincipals(opts, opts.denyPrincipalsOpt)
-
-    val allowedHosts = getHosts(opts, opts.allowHostsOpt, 
opts.allowPrincipalsOpt)
-
-    val deniedHosts = getHosts(opts, opts.denyHostsOpt, opts.denyPrincipalsOpt)
-
-    val acls = new collection.mutable.HashSet[AccessControlEntry]
-    if (allowedHosts.nonEmpty && allowedPrincipals.nonEmpty)
-      acls ++= getAcls(allowedPrincipals, ALLOW, operations, allowedHosts)
-
-    if (deniedHosts.nonEmpty && deniedPrincipals.nonEmpty)
-      acls ++= getAcls(deniedPrincipals, DENY, operations, deniedHosts)
-
-    acls.toSet
-  }
-
-  private def getAcl(opts: AclCommandOptions): Set[AccessControlEntry] = {
-    val operations = opts.options.valuesOf(opts.operationsOpt).asScala
-      .map(operation => JSecurityUtils.operation(operation.trim)).toSet
-    getAcl(opts, operations)
-  }
-
-  def getAcls(principals: Set[KafkaPrincipal], permissionType: 
AclPermissionType, operations: Set[AclOperation],
-              hosts: Set[String]): Set[AccessControlEntry] = {
-    for {
-      principal <- principals
-      operation <- operations
-      host <- hosts
-    } yield new AccessControlEntry(principal.toString, host, operation, 
permissionType)
-  }
-
-  private def getHosts(opts: AclCommandOptions, hostOptionSpec: 
OptionSpec[String],
-                       principalOptionSpec: OptionSpec[String]): Set[String] = 
{
-    if (opts.options.has(hostOptionSpec))
-      opts.options.valuesOf(hostOptionSpec).asScala.map(_.trim).toSet
-    else if (opts.options.has(principalOptionSpec))
-      Set[String](AclEntry.WILDCARD_HOST)
-    else
-      Set.empty[String]
-  }
-
-  private def getPrincipals(opts: AclCommandOptions, principalOptionSpec: 
OptionSpec[String]): Set[KafkaPrincipal] = {
-    if (opts.options.has(principalOptionSpec))
-      opts.options.valuesOf(principalOptionSpec).asScala.map(s => 
JSecurityUtils.parseKafkaPrincipal(s.trim)).toSet
-    else
-      Set.empty[KafkaPrincipal]
-  }
-
-  private def getResourceFilter(opts: AclCommandOptions, dieIfNoResourceFound: 
Boolean = true): Set[ResourcePatternFilter] = {
-    val patternType = opts.options.valueOf(opts.resourcePatternType)
-
-    var resourceFilters = Set.empty[ResourcePatternFilter]
-    if (opts.options.has(opts.topicOpt))
-      opts.options.valuesOf(opts.topicOpt).forEach(topic => resourceFilters += 
new ResourcePatternFilter(JResourceType.TOPIC, topic.trim, patternType))
-
-    if (patternType == PatternType.LITERAL && 
(opts.options.has(opts.clusterOpt) || opts.options.has(opts.idempotentOpt)))
-      resourceFilters += ClusterResourceFilter
-
-    if (opts.options.has(opts.groupOpt))
-      opts.options.valuesOf(opts.groupOpt).forEach(group => resourceFilters += 
new ResourcePatternFilter(JResourceType.GROUP, group.trim, patternType))
-
-    if (opts.options.has(opts.transactionalIdOpt))
-      opts.options.valuesOf(opts.transactionalIdOpt).forEach(transactionalId =>
-        resourceFilters += new 
ResourcePatternFilter(JResourceType.TRANSACTIONAL_ID, transactionalId, 
patternType))
-
-    if (opts.options.has(opts.delegationTokenOpt))
-      opts.options.valuesOf(opts.delegationTokenOpt).forEach(token => 
resourceFilters += new ResourcePatternFilter(JResourceType.DELEGATION_TOKEN, 
token.trim, patternType))
-
-    if (opts.options.has(opts.userPrincipalOpt))
-      opts.options.valuesOf(opts.userPrincipalOpt).forEach(user => 
resourceFilters += new ResourcePatternFilter(JResourceType.USER, user.trim, 
patternType))
-
-    if (resourceFilters.isEmpty && dieIfNoResourceFound)
-      CommandLineUtils.printUsageAndExit(opts.parser, "You must provide at 
least one resource: --topic <topic> or --cluster or --group <group> or 
--delegation-token <Delegation Token ID>")
-
-    resourceFilters
-  }
-
-  private def confirmAction(opts: AclCommandOptions, msg: String): Boolean = {
-    if (opts.options.has(opts.forceOpt))
-        return true
-    println(msg)
-    StdIn.readLine().equalsIgnoreCase("y")
-  }
-
-  private def validateOperation(opts: AclCommandOptions, resourceToAcls: 
Map[ResourcePatternFilter, Set[AccessControlEntry]]): Unit = {
-    for ((resource, acls) <- resourceToAcls) {
-      val validOps = 
AclEntry.supportedOperations(resource.resourceType).asScala.toSet + 
AclOperation.ALL
-      if ((acls.map(_.operation) -- validOps).nonEmpty)
-        CommandLineUtils.printUsageAndExit(opts.parser, s"ResourceType 
${resource.resourceType} only supports operations 
${validOps.map(JSecurityUtils.operationName).mkString(", ")}")
-    }
-  }
-
-  class AclCommandOptions(args: Array[String]) extends 
CommandDefaultOptions(args) {
-    val CommandConfigDoc = "A property file containing configs to be passed to 
Admin Client."
-
-    val bootstrapServerOpt: OptionSpec[String] = 
parser.accepts("bootstrap-server", "A list of host/port pairs to use for 
establishing the connection to the Kafka cluster." +
-      " This list should be in the form host1:port1,host2:port2,... This 
config is required for acl management using admin client API.")
-      .withRequiredArg
-      .describedAs("server to connect to")
-      .ofType(classOf[String])
-
-    val bootstrapControllerOpt: OptionSpec[String] = 
parser.accepts("bootstrap-controller", "A list of host/port pairs to use for 
establishing the connection to the Kafka cluster." +
-        " This list should be in the form host1:port1,host2:port2,... This 
config is required for acl management using admin client API.")
-      .withRequiredArg
-      .describedAs("controller to connect to")
-      .ofType(classOf[String])
-
-    val commandConfigOpt: OptionSpec[String] = 
parser.accepts("command-config", CommandConfigDoc)
-      .withOptionalArg()
-      .describedAs("command-config")
-      .ofType(classOf[String])
-
-    val topicOpt: OptionSpec[String] = parser.accepts("topic", "topic to which 
ACLs should be added or removed. " +
-      "A value of '*' indicates ACL should apply to all topics.")
-      .withRequiredArg
-      .describedAs("topic")
-      .ofType(classOf[String])
-
-    val clusterOpt: OptionSpecBuilder = parser.accepts("cluster", "Add/Remove 
cluster ACLs.")
-    val groupOpt: OptionSpec[String] = parser.accepts("group", "Consumer Group 
to which the ACLs should be added or removed. " +
-      "A value of '*' indicates the ACLs should apply to all groups.")
-      .withRequiredArg
-      .describedAs("group")
-      .ofType(classOf[String])
-
-    val transactionalIdOpt: OptionSpec[String] = 
parser.accepts("transactional-id", "The transactionalId to which ACLs should " +
-      "be added or removed. A value of '*' indicates the ACLs should apply to 
all transactionalIds.")
-      .withRequiredArg
-      .describedAs("transactional-id")
-      .ofType(classOf[String])
-
-    val idempotentOpt: OptionSpecBuilder = parser.accepts("idempotent", 
"Enable idempotence for the producer. This should be " +
-      "used in combination with the --producer option. Note that idempotence 
is enabled automatically if " +
-      "the producer is authorized to a particular transactional-id.")
-
-    val delegationTokenOpt: OptionSpec[String] = 
parser.accepts("delegation-token", "Delegation token to which ACLs should be 
added or removed. " +
-      "A value of '*' indicates ACL should apply to all tokens.")
-      .withRequiredArg
-      .describedAs("delegation-token")
-      .ofType(classOf[String])
-
-    val resourcePatternType: OptionSpec[PatternType] = 
parser.accepts("resource-pattern-type", "The type of the resource pattern or 
pattern filter. " +
-      "When adding acls, this should be a specific pattern type, e.g. 
'literal' or 'prefixed'. " +
-      "When listing or removing acls, a specific pattern type can be used to 
list or remove acls from specific resource patterns, " +
-      "or use the filter values of 'any' or 'match', where 'any' will match 
any pattern type, but will match the resource name exactly, " +
-      "where as 'match' will perform pattern matching to list or remove all 
acls that affect the supplied resource(s). " +
-      "WARNING: 'match', when used in combination with the '--remove' switch, 
should be used with care.")
-      .withRequiredArg()
-      .ofType(classOf[String])
-      .withValuesConvertedBy(new PatternTypeConverter())
-      .defaultsTo(PatternType.LITERAL)
-
-    val addOpt: OptionSpecBuilder = parser.accepts("add", "Indicates you are 
trying to add ACLs.")
-    val removeOpt: OptionSpecBuilder = parser.accepts("remove", "Indicates you 
are trying to remove ACLs.")
-    val listOpt: OptionSpecBuilder = parser.accepts("list", "List ACLs for the 
specified resource, use --topic <topic> or --group <group> or --cluster to 
specify a resource.")
-
-    val operationsOpt: OptionSpec[String] = parser.accepts("operation", 
"Operation that is being allowed or denied. Valid operation names are: " + 
Newline +
-      AclEntry.ACL_OPERATIONS.asScala.map("\t" + 
JSecurityUtils.operationName(_)).mkString(Newline) + Newline)
-      .withRequiredArg
-      .ofType(classOf[String])
-      .defaultsTo(JSecurityUtils.operationName(AclOperation.ALL))
-
-    val allowPrincipalsOpt: OptionSpec[String] = 
parser.accepts("allow-principal", "principal is in principalType:name format." +
-      " Note that principalType must be supported by the Authorizer being 
used." +
-      " For example, User:'*' is the wild card indicating all users.")
-      .withRequiredArg
-      .describedAs("allow-principal")
-      .ofType(classOf[String])
-
-    val denyPrincipalsOpt: OptionSpec[String] = 
parser.accepts("deny-principal", "principal is in principalType:name format. " +
-      "By default anyone not added through --allow-principal is denied access. 
" +
-      "You only need to use this option as negation to already allowed set. " +
-      "Note that principalType must be supported by the Authorizer being used. 
" +
-      "For example if you wanted to allow access to all users in the system 
but not test-user you can define an ACL that " +
-      "allows access to User:'*' and specify 
--deny-principal=User:[email protected]. " +
-      "AND PLEASE REMEMBER DENY RULES TAKES PRECEDENCE OVER ALLOW RULES.")
-      .withRequiredArg
-      .describedAs("deny-principal")
-      .ofType(classOf[String])
-
-    val listPrincipalsOpt: OptionSpec[String] = parser.accepts("principal", 
"List ACLs for the specified principal. principal is in principalType:name 
format." +
-      " Note that principalType must be supported by the Authorizer being 
used. Multiple --principal option can be passed.")
-      .withOptionalArg()
-      .describedAs("principal")
-      .ofType(classOf[String])
-
-    val allowHostsOpt: OptionSpec[String] = parser.accepts("allow-host", "Host 
from which principals listed in --allow-principal will have access. " +
-      "If you have specified --allow-principal then the default for this 
option will be set to '*' which allows access from all hosts.")
-      .withRequiredArg
-      .describedAs("allow-host")
-      .ofType(classOf[String])
-
-    val denyHostsOpt: OptionSpec[String] = parser.accepts("deny-host", "Host 
from which principals listed in --deny-principal will be denied access. " +
-      "If you have specified --deny-principal then the default for this option 
will be set to '*' which denies access from all hosts.")
-      .withRequiredArg
-      .describedAs("deny-host")
-      .ofType(classOf[String])
-
-    val producerOpt: OptionSpecBuilder = parser.accepts("producer", 
"Convenience option to add/remove ACLs for producer role. " +
-      "This will generate ACLs that allows WRITE,DESCRIBE and CREATE on 
topic.")
-
-    val consumerOpt: OptionSpecBuilder = parser.accepts("consumer", 
"Convenience option to add/remove ACLs for consumer role. " +
-      "This will generate ACLs that allows READ,DESCRIBE on topic and READ on 
group.")
-
-    val forceOpt: OptionSpecBuilder = parser.accepts("force", "Assume Yes to 
all queries and do not prompt.")
-
-    val userPrincipalOpt: OptionSpec[String] = 
parser.accepts("user-principal", "Specifies a user principal as a resource in 
relation with the operation. For instance " +
-      "one could grant CreateTokens or DescribeTokens permission on a given 
user principal.")
-      .withRequiredArg()
-      .describedAs("user-principal")
-      .ofType(classOf[String])
-
-    options = parser.parse(args: _*)
-
-    def checkArgs(): Unit = {
-      if (options.has(bootstrapServerOpt) && 
options.has(bootstrapControllerOpt))
-        CommandLineUtils.printUsageAndExit(parser, "Only one of 
--bootstrap-server or --bootstrap-controller must be specified")
-
-      if (!options.has(bootstrapServerOpt) && 
!options.has(bootstrapControllerOpt))
-        CommandLineUtils.printUsageAndExit(parser, "One of --bootstrap-server 
or --bootstrap-controller must be specified")
-
-      val actions = Seq(addOpt, removeOpt, listOpt).count(options.has)
-      if (actions != 1)
-        CommandLineUtils.printUsageAndExit(parser, "Command must include 
exactly one action: --list, --add, --remove. ")
-
-      CommandLineUtils.checkInvalidArgs(parser, options, listOpt, producerOpt, 
consumerOpt, allowHostsOpt, allowPrincipalsOpt, denyHostsOpt, denyPrincipalsOpt)
-
-      //when --producer or --consumer is specified , user should not specify 
operations as they are inferred and we also disallow --deny-principals and 
--deny-hosts.
-      CommandLineUtils.checkInvalidArgs(parser, options, producerOpt, 
operationsOpt, denyPrincipalsOpt, denyHostsOpt)
-      CommandLineUtils.checkInvalidArgs(parser, options, consumerOpt, 
operationsOpt, denyPrincipalsOpt, denyHostsOpt)
-
-      if (options.has(listPrincipalsOpt) && !options.has(listOpt))
-        CommandLineUtils.printUsageAndExit(parser, "The --principal option is 
only available if --list is set")
-
-      if (options.has(producerOpt) && !options.has(topicOpt))
-        CommandLineUtils.printUsageAndExit(parser, "With --producer you must 
specify a --topic")
-
-      if (options.has(idempotentOpt) && !options.has(producerOpt))
-        CommandLineUtils.printUsageAndExit(parser, "The --idempotent option is 
only available if --producer is set")
-
-      if (options.has(consumerOpt) && (!options.has(topicOpt) || 
!options.has(groupOpt) || (!options.has(producerOpt) && 
(options.has(clusterOpt) || options.has(transactionalIdOpt)))))
-        CommandLineUtils.printUsageAndExit(parser, "With --consumer you must 
specify a --topic and a --group and no --cluster or --transactional-id option 
should be specified.")
-    }
-  }
-}
-
-class PatternTypeConverter extends 
EnumConverter[PatternType](classOf[PatternType]) {
-
-  override def convert(value: String): PatternType = {
-    val patternType = super.convert(value)
-    if (patternType.isUnknown)
-      throw new ValueConversionException("Unknown resource-pattern-type: " + 
value)
-
-    patternType
-  }
-
-  override def valuePattern: String = PatternType.values
-    .filter(_ != PatternType.UNKNOWN)
-    .mkString("|")
-}
diff --git 
a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala 
b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index 9560f060d0f..fe9336a04f4 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -58,7 +58,7 @@ import scala.jdk.CollectionConverters._
   * brokers.
   *
   * To start brokers we need to set a cluster ACL, which happens optionally in 
KafkaServerTestHarness.
-  * The remaining ACLs to enable access to producers and consumers are set 
here. To set ACLs, we use AclCommand directly.
+  * The remaining ACLs to enable access to producers and consumers are set 
here.
   *
   * Finally, we rely on SaslSetup to bootstrap and setup Kerberos. We don't use
   * SaslTestHarness here directly because it extends QuorumTestHarness, and we
diff --git a/tools/src/main/java/org/apache/kafka/tools/AclCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/AclCommand.java
new file mode 100644
index 00000000000..d54970ad042
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/AclCommand.java
@@ -0,0 +1,605 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AccessControlEntryFilter;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.Resource;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourcePatternFilter;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.security.authorizer.AclEntry;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import joptsimple.AbstractOptionSpec;
+import joptsimple.OptionException;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.ValueConversionException;
+import joptsimple.util.EnumConverter;
+
+import static org.apache.kafka.common.acl.AclOperation.CREATE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.acl.AclOperation.IDEMPOTENT_WRITE;
+import static org.apache.kafka.common.acl.AclOperation.READ;
+import static org.apache.kafka.common.acl.AclOperation.WRITE;
+import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
+import static org.apache.kafka.common.acl.AclPermissionType.DENY;
+
+public class AclCommand {
+
+    private static final ResourcePatternFilter CLUSTER_RESOURCE_FILTER =
+            new ResourcePatternFilter(ResourceType.CLUSTER, 
Resource.CLUSTER_NAME, PatternType.LITERAL);
+    private static final String NL = System.lineSeparator();
+
+    public static void main(String[] args) {
+        AclCommandOptions opts = new AclCommandOptions(args);
+        AdminClientService aclCommandService = new AdminClientService(opts);
+        try (Admin admin = Admin.create(adminConfigs(opts))) {
+            if (opts.options.has(opts.addOpt)) {
+                aclCommandService.addAcls(admin);
+            } else if (opts.options.has(opts.removeOpt)) {
+                aclCommandService.removeAcls(admin);
+            } else if (opts.options.has(opts.listOpt)) {
+                aclCommandService.listAcls(admin);
+            }
+        } catch (Throwable e) {
+            System.out.println("Error while executing ACL command: " + 
e.getMessage());
+            System.out.println(Utils.stackTrace(e));
+            Exit.exit(1);
+        }
+    }
+
+    private static Properties adminConfigs(AclCommandOptions opts) throws 
IOException {
+        Properties props = new Properties();
+        if (opts.options.has(opts.commandConfigOpt)) {
+            props = 
Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt));
+        }
+        if (opts.options.has(opts.bootstrapServerOpt)) {
+            props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
opts.options.valueOf(opts.bootstrapServerOpt));
+        } else {
+            props.put(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, 
opts.options.valueOf(opts.bootstrapControllerOpt));
+        }
+        return props;
+    }
+
+    private static class AdminClientService {
+
+        private final AclCommandOptions opts;
+
+        AdminClientService(AclCommandOptions opts) {
+            this.opts = opts;
+        }
+
+        void addAcls(Admin admin) throws ExecutionException, 
InterruptedException {
+            Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcl = 
getResourceToAcls(opts);
+            for (Map.Entry<ResourcePattern, Set<AccessControlEntry>> entry : 
resourceToAcl.entrySet()) {
+                ResourcePattern resource = entry.getKey();
+                Set<AccessControlEntry> acls = entry.getValue();
+                System.out.println("Adding ACLs for resource `" + resource + 
"`: " + NL + " " + acls.stream().map(a -> "\t" + 
a).collect(Collectors.joining(NL)) + NL);
+                Collection<AclBinding> aclBindings = acls.stream().map(acl -> 
new AclBinding(resource, acl)).collect(Collectors.toList());
+                admin.createAcls(aclBindings).all().get();
+            }
+        }
+
+        void removeAcls(Admin admin) throws ExecutionException, 
InterruptedException {
+            Map<ResourcePatternFilter, Set<AccessControlEntry>> filterToAcl = 
getResourceFilterToAcls(opts);
+            for (Map.Entry<ResourcePatternFilter, Set<AccessControlEntry>> 
entry : filterToAcl.entrySet()) {
+                ResourcePatternFilter filter = entry.getKey();
+                Set<AccessControlEntry> acls = entry.getValue();
+                if (acls.isEmpty()) {
+                    if (confirmAction(opts, "Are you sure you want to delete 
all ACLs for resource filter `" + filter + "`? (y/n)")) {
+                        removeAcls(admin, acls, filter);
+                    }
+                } else {
+                    String msg = "Are you sure you want to remove ACLs: " + NL 
+
+                            " " + acls.stream().map(a -> "\t" + 
a).collect(Collectors.joining(NL)) + NL +
+                            " from resource filter `" + filter + "`? (y/n)";
+                    if (confirmAction(opts, msg)) {
+                        removeAcls(admin, acls, filter);
+                    }
+                }
+            }
+        }
+
+        private void listAcls(Admin admin) throws ExecutionException, 
InterruptedException {
+            Set<ResourcePatternFilter> filters = getResourceFilter(opts, 
false);
+            Set<KafkaPrincipal> listPrincipals = getPrincipals(opts, 
opts.listPrincipalsOpt);
+            Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcls = 
getAcls(admin, filters);
+
+            if (listPrincipals.isEmpty()) {
+                printResourceAcls(resourceToAcls);
+            } else {
+                listPrincipals.forEach(principal -> {
+                    System.out.println("ACLs for principal `" + principal + 
"`");
+                    Map<ResourcePattern, Set<AccessControlEntry>> 
filteredResourceToAcls = resourceToAcls.entrySet().stream()
+                            .map(entry -> {
+                                ResourcePattern resource = entry.getKey();
+                                Set<AccessControlEntry> acls = 
entry.getValue().stream()
+                                        .filter(acl -> 
principal.toString().equals(acl.principal()))
+                                        .collect(Collectors.toSet());
+                                return new AbstractMap.SimpleEntry<>(resource, 
acls);
+                            })
+                            .filter(entry -> !entry.getValue().isEmpty())
+                            .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+                    printResourceAcls(filteredResourceToAcls);
+                });
+            }
+        }
+
+        private static void printResourceAcls(Map<ResourcePattern, 
Set<AccessControlEntry>> resourceToAcls) {
+            resourceToAcls.forEach((resource, acls) ->
+                System.out.println("Current ACLs for resource `" + resource + 
"`:" + NL +
+                        acls.stream().map(acl -> "\t" + 
acl).collect(Collectors.joining(NL)) + NL)
+            );
+        }
+
+        private static void removeAcls(Admin adminClient, 
Set<AccessControlEntry> acls, ResourcePatternFilter filter) throws 
ExecutionException, InterruptedException {
+            if (acls.isEmpty()) {
+                adminClient.deleteAcls(Collections.singletonList(new 
AclBindingFilter(filter, AccessControlEntryFilter.ANY))).all().get();
+            } else {
+                List<AclBindingFilter> aclBindingFilters = 
acls.stream().map(acl -> new AclBindingFilter(filter, 
acl.toFilter())).collect(Collectors.toList());
+                adminClient.deleteAcls(aclBindingFilters).all().get();
+            }
+        }
+
+        private Map<ResourcePattern, Set<AccessControlEntry>> getAcls(Admin 
adminClient, Set<ResourcePatternFilter> filters) throws ExecutionException, 
InterruptedException {
+            Collection<AclBinding> aclBindings;
+            if (filters.isEmpty()) {
+                aclBindings = 
adminClient.describeAcls(AclBindingFilter.ANY).values().get();
+            } else {
+                aclBindings = new ArrayList<>();
+                for (ResourcePatternFilter filter : filters) {
+                    aclBindings.addAll(adminClient.describeAcls(new 
AclBindingFilter(filter, AccessControlEntryFilter.ANY)).values().get());
+                }
+            }
+
+            Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcls = new 
HashMap<>();
+            for (AclBinding aclBinding : aclBindings) {
+                ResourcePattern resource = aclBinding.pattern();
+                Set<AccessControlEntry> acls = 
resourceToAcls.getOrDefault(resource, new HashSet<>());
+                acls.add(aclBinding.entry());
+                resourceToAcls.put(resource, acls);
+            }
+            return resourceToAcls;
+        }
+    }
+
+    private static Map<ResourcePattern, Set<AccessControlEntry>> 
getResourceToAcls(AclCommandOptions opts) {
+        PatternType patternType = 
opts.options.valueOf(opts.resourcePatternType);
+        if (!patternType.isSpecific()) {
+            CommandLineUtils.printUsageAndExit(opts.parser, "A 
'--resource-pattern-type' value of '" + patternType + "' is not valid when 
adding acls.");
+        }
+        Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcl = 
getResourceFilterToAcls(opts).entrySet().stream()
+                .collect(Collectors.toMap(entry -> new 
ResourcePattern(entry.getKey().resourceType(), entry.getKey().name(), 
entry.getKey().patternType()),
+                                          Map.Entry::getValue));
+
+        if (resourceToAcl.values().stream().anyMatch(Set::isEmpty)) {
+            CommandLineUtils.printUsageAndExit(opts.parser, "You must specify 
one of: --allow-principal, --deny-principal when trying to add ACLs.");
+        }
+        return resourceToAcl;
+    }
+
+    private static Map<ResourcePatternFilter, Set<AccessControlEntry>> 
getResourceFilterToAcls(AclCommandOptions opts) {
+        Map<ResourcePatternFilter, Set<AccessControlEntry>> resourceToAcls = 
new HashMap<>();
+        //if none of the --producer or --consumer options are specified , just 
construct ACLs from CLI options.
+        if (!opts.options.has(opts.producerOpt) && 
!opts.options.has(opts.consumerOpt)) {
+            resourceToAcls.putAll(getCliResourceFilterToAcls(opts));
+        }
+        //users are allowed to specify both --producer and --consumer options 
in a single command.
+        if (opts.options.has(opts.producerOpt)) {
+            resourceToAcls.putAll(getProducerResourceFilterToAcls(opts));
+        }
+        if (opts.options.has(opts.consumerOpt)) {
+            getConsumerResourceFilterToAcls(opts).forEach((k, v) -> {
+                Set<AccessControlEntry> existingAcls = 
resourceToAcls.getOrDefault(k, new HashSet<>());
+                existingAcls.addAll(v);
+                resourceToAcls.put(k, existingAcls);
+            });
+        }
+        validateOperation(opts, resourceToAcls);
+        return resourceToAcls;
+    }
+
+    private static Map<ResourcePatternFilter, Set<AccessControlEntry>> 
getProducerResourceFilterToAcls(AclCommandOptions opts) {
+        Set<ResourcePatternFilter> filters = getResourceFilter(opts, true);
+
+        Set<ResourcePatternFilter> topics = filters.stream().filter(f -> 
f.resourceType() == ResourceType.TOPIC).collect(Collectors.toSet());
+        Set<ResourcePatternFilter> transactionalIds = 
filters.stream().filter(f -> f.resourceType() == 
ResourceType.TRANSACTIONAL_ID).collect(Collectors.toSet());
+        boolean enableIdempotence = opts.options.has(opts.idempotentOpt);
+
+        Set<AccessControlEntry> topicAcls = getAcl(opts, new 
HashSet<>(Arrays.asList(WRITE, DESCRIBE, CREATE)));
+        Set<AccessControlEntry> transactionalIdAcls = getAcl(opts, new 
HashSet<>(Arrays.asList(WRITE, DESCRIBE)));
+
+        //Write, Describe, Create permission on topics, Write, Describe on 
transactionalIds
+        Map<ResourcePatternFilter, Set<AccessControlEntry>> result = new 
HashMap<>();
+        for (ResourcePatternFilter topic : topics) {
+            result.put(topic, topicAcls);
+        }
+        for (ResourcePatternFilter transactionalId : transactionalIds) {
+            result.put(transactionalId, transactionalIdAcls);
+        }
+        if (enableIdempotence) {
+            result.put(CLUSTER_RESOURCE_FILTER, getAcl(opts, 
Collections.singleton(IDEMPOTENT_WRITE)));
+        }
+        return result;
+    }
+
+    private static Map<ResourcePatternFilter, Set<AccessControlEntry>> 
getConsumerResourceFilterToAcls(AclCommandOptions opts) {
+        Set<ResourcePatternFilter> filters = getResourceFilter(opts, true);
+        Set<ResourcePatternFilter> topics = filters.stream().filter(f -> 
f.resourceType() == ResourceType.TOPIC).collect(Collectors.toSet());
+        Set<ResourcePatternFilter> groups = filters.stream().filter(f -> 
f.resourceType() == ResourceType.GROUP).collect(Collectors.toSet());
+
+        //Read, Describe on topic, Read on consumerGroup
+        Set<AccessControlEntry> topicAcls = getAcl(opts, new 
HashSet<>(Arrays.asList(READ, DESCRIBE)));
+        Set<AccessControlEntry> groupAcls = getAcl(opts, 
Collections.singleton(READ));
+
+        Map<ResourcePatternFilter, Set<AccessControlEntry>> result = new 
HashMap<>();
+        for (ResourcePatternFilter topic : topics) {
+            result.put(topic, topicAcls);
+        }
+        for (ResourcePatternFilter group : groups) {
+            result.put(group, groupAcls);
+        }
+        return result;
+    }
+
+    private static Map<ResourcePatternFilter, Set<AccessControlEntry>> 
getCliResourceFilterToAcls(AclCommandOptions opts) {
+        Set<AccessControlEntry> acls = getAcl(opts);
+        Set<ResourcePatternFilter> filters = getResourceFilter(opts, true);
+        return filters.stream().collect(Collectors.toMap(filter -> filter, 
filter -> acls));
+    }
+
+    private static Set<AccessControlEntry> getAcl(AclCommandOptions opts, 
Set<AclOperation> operations) {
+        Set<KafkaPrincipal> allowedPrincipals = getPrincipals(opts, 
opts.allowPrincipalsOpt);
+        Set<KafkaPrincipal> deniedPrincipals = getPrincipals(opts, 
opts.denyPrincipalsOpt);
+        Set<String> allowedHosts = getHosts(opts, opts.allowHostsOpt, 
opts.allowPrincipalsOpt);
+        Set<String> deniedHosts = getHosts(opts, opts.denyHostsOpt, 
opts.denyPrincipalsOpt);
+
+        Set<AccessControlEntry> acls = new HashSet<>();
+        if (!allowedHosts.isEmpty() && !allowedPrincipals.isEmpty()) {
+            acls.addAll(getAcls(allowedPrincipals, ALLOW, operations, 
allowedHosts));
+        }
+        if (!deniedHosts.isEmpty() && !deniedPrincipals.isEmpty()) {
+            acls.addAll(getAcls(deniedPrincipals, DENY, operations, 
deniedHosts));
+        }
+        return acls;
+    }
+
+    private static Set<AccessControlEntry> getAcl(AclCommandOptions opts) {
+        Set<AclOperation> operations = 
opts.options.valuesOf(opts.operationsOpt)
+                .stream().map(operation -> 
SecurityUtils.operation(operation.trim()))
+                .collect(Collectors.toSet());
+        return getAcl(opts, operations);
+    }
+
+    static Set<AccessControlEntry> getAcls(Set<KafkaPrincipal> principals,
+                                                   AclPermissionType 
permissionType,
+                                                   Set<AclOperation> 
operations,
+                                                   Set<String> hosts) {
+        Set<AccessControlEntry> acls = new HashSet<>();
+        for (KafkaPrincipal principal : principals) {
+            for (AclOperation operation : operations) {
+                for (String host : hosts) {
+                    acls.add(new AccessControlEntry(principal.toString(), 
host, operation, permissionType));
+                }
+            }
+        }
+        return acls;
+    }
+
+    private static Set<String> getHosts(AclCommandOptions opts, 
OptionSpec<String> hostOptionSpec, OptionSpec<String> principalOptionSpec) {
+        if (opts.options.has(hostOptionSpec)) {
+            return 
opts.options.valuesOf(hostOptionSpec).stream().map(String::trim).collect(Collectors.toSet());
+        } else if (opts.options.has(principalOptionSpec)) {
+            return Collections.singleton(AclEntry.WILDCARD_HOST);
+        } else {
+            return Collections.emptySet();
+        }
+    }
+
+    private static Set<KafkaPrincipal> getPrincipals(AclCommandOptions opts, 
OptionSpec<String> principalOptionSpec) {
+        if (opts.options.has(principalOptionSpec)) {
+            return opts.options.valuesOf(principalOptionSpec).stream()
+                    .map(s -> SecurityUtils.parseKafkaPrincipal(s.trim()))
+                    .collect(Collectors.toSet());
+        } else {
+            return Collections.emptySet();
+        }
+    }
+
+    private static Set<ResourcePatternFilter> 
getResourceFilter(AclCommandOptions opts, boolean dieIfNoResourceFound) {
+        PatternType patternType = 
opts.options.valueOf(opts.resourcePatternType);
+        Set<ResourcePatternFilter> resourceFilters = new HashSet<>();
+        if (opts.options.has(opts.topicOpt)) {
+            opts.options.valuesOf(opts.topicOpt).forEach(topic -> 
resourceFilters.add(new ResourcePatternFilter(ResourceType.TOPIC, topic.trim(), 
patternType)));
+        }
+        if (patternType == PatternType.LITERAL && 
(opts.options.has(opts.clusterOpt) || opts.options.has(opts.idempotentOpt))) {
+            resourceFilters.add(CLUSTER_RESOURCE_FILTER);
+        }
+        if (opts.options.has(opts.groupOpt)) {
+            opts.options.valuesOf(opts.groupOpt).forEach(group -> 
resourceFilters.add(new ResourcePatternFilter(ResourceType.GROUP, group.trim(), 
patternType)));
+        }
+        if (opts.options.has(opts.transactionalIdOpt)) {
+            
opts.options.valuesOf(opts.transactionalIdOpt).forEach(transactionalId ->
+                    resourceFilters.add(new 
ResourcePatternFilter(ResourceType.TRANSACTIONAL_ID, transactionalId, 
patternType)));
+        }
+        if (opts.options.has(opts.delegationTokenOpt)) {
+            opts.options.valuesOf(opts.delegationTokenOpt).forEach(token -> 
resourceFilters.add(new ResourcePatternFilter(ResourceType.DELEGATION_TOKEN, 
token.trim(), patternType)));
+        }
+        if (opts.options.has(opts.userPrincipalOpt)) {
+            opts.options.valuesOf(opts.userPrincipalOpt).forEach(user -> 
resourceFilters.add(new ResourcePatternFilter(ResourceType.USER, user.trim(), 
patternType)));
+        }
+        if (resourceFilters.isEmpty() && dieIfNoResourceFound) {
+            CommandLineUtils.printUsageAndExit(opts.parser, "You must provide 
at least one resource: --topic <topic> or --cluster or --group <group> or 
--delegation-token <Delegation Token ID>");
+        }
+        return resourceFilters;
+    }
+
+    private static boolean confirmAction(AclCommandOptions opts, String msg) {
+        if (opts.options.has(opts.forceOpt)) {
+            return true;
+        }
+        System.out.println(msg);
+        return System.console().readLine().equalsIgnoreCase("y");
+    }
+
+    private static void validateOperation(AclCommandOptions opts, 
Map<ResourcePatternFilter, Set<AccessControlEntry>> resourceToAcls) {
+        for (Map.Entry<ResourcePatternFilter, Set<AccessControlEntry>> entry : 
resourceToAcls.entrySet()) {
+            ResourcePatternFilter resource = entry.getKey();
+            Set<AccessControlEntry> acls = entry.getValue();
+            Collection<AclOperation> validOps = new 
HashSet<>(AclEntry.supportedOperations(resource.resourceType()));
+            validOps.add(AclOperation.ALL);
+            Set<AclOperation> unsupportedOps = new HashSet<>();
+            for (AccessControlEntry acl : acls) {
+                if (!validOps.contains(acl.operation())) {
+                    unsupportedOps.add(acl.operation());
+                }
+            }
+            if (!unsupportedOps.isEmpty()) {
+                String msg = String.format("ResourceType %s only supports 
operations %s", resource.resourceType(), validOps);
+                CommandLineUtils.printUsageAndExit(opts.parser, msg);
+            }
+        }
+    }
+
+    public static class AclCommandOptions extends CommandDefaultOptions {
+
+        private final OptionSpec<String> bootstrapServerOpt;
+        private final OptionSpec<String> bootstrapControllerOpt;
+        private final OptionSpec<String> commandConfigOpt;
+        private final OptionSpec<String> topicOpt;
+        private final OptionSpecBuilder clusterOpt;
+        private final OptionSpec<String> groupOpt;
+        private final OptionSpec<String> transactionalIdOpt;
+        private final OptionSpecBuilder idempotentOpt;
+        private final OptionSpec<String> delegationTokenOpt;
+        private final OptionSpec<PatternType> resourcePatternType;
+        private final OptionSpecBuilder addOpt;
+        private final OptionSpecBuilder removeOpt;
+        private final OptionSpecBuilder listOpt;
+        private final OptionSpec<String> operationsOpt;
+        private final OptionSpec<String> allowPrincipalsOpt;
+        private final OptionSpec<String> denyPrincipalsOpt;
+        private final OptionSpec<String> listPrincipalsOpt;
+        private final OptionSpec<String> allowHostsOpt;
+        private final OptionSpec<String> denyHostsOpt;
+        private final OptionSpecBuilder producerOpt;
+        private final OptionSpecBuilder consumerOpt;
+        private final OptionSpecBuilder forceOpt;
+        private final OptionSpec<String> userPrincipalOpt;
+
+        @SuppressWarnings("this-escape")
+        public AclCommandOptions(String[] args) {
+            super(args);
+            bootstrapServerOpt = parser.accepts("bootstrap-server", "A list of 
host/port pairs to use for establishing the connection to the Kafka cluster." +
+                            " This list should be in the form 
host1:port1,host2:port2,... This config is required for acl management using 
admin client API.")
+                    .withRequiredArg()
+                    .describedAs("server to connect to")
+                    .ofType(String.class);
+            bootstrapControllerOpt = parser.accepts("bootstrap-controller", "A 
list of host/port pairs to use for establishing the connection to the Kafka 
cluster." +
+                            " This list should be in the form 
host1:port1,host2:port2,... This config is required for acl management using 
admin client API.")
+                    .withRequiredArg()
+                    .describedAs("controller to connect to")
+                    .ofType(String.class);
+            commandConfigOpt = parser.accepts("command-config", "A property 
file containing configs to be passed to Admin Client.")
+                    .withOptionalArg()
+                    .describedAs("command-config")
+                    .ofType(String.class);
+            topicOpt = parser.accepts("topic", "topic to which ACLs should be 
added or removed. " +
+                            "A value of '*' indicates ACL should apply to all 
topics.")
+                    .withRequiredArg()
+                    .describedAs("topic")
+                    .ofType(String.class);
+            clusterOpt = parser.accepts("cluster", "Add/Remove cluster ACLs.");
+            groupOpt = parser.accepts("group", "Consumer Group to which the 
ACLs should be added or removed. " +
+                            "A value of '*' indicates the ACLs should apply to 
all groups.")
+                    .withRequiredArg()
+                    .describedAs("group")
+                    .ofType(String.class);
+            transactionalIdOpt = parser.accepts("transactional-id", "The 
transactionalId to which ACLs should " +
+                            "be added or removed. A value of '*' indicates the 
ACLs should apply to all transactionalIds.")
+                    .withRequiredArg()
+                    .describedAs("transactional-id")
+                    .ofType(String.class);
+            idempotentOpt = parser.accepts("idempotent", "Enable idempotence 
for the producer. This should be " +
+                    "used in combination with the --producer option. Note that 
idempotence is enabled automatically if " +
+                    "the producer is authorized to a particular 
transactional-id.");
+            delegationTokenOpt = parser.accepts("delegation-token", 
"Delegation token to which ACLs should be added or removed. " +
+                            "A value of '*' indicates ACL should apply to all 
tokens.")
+                    .withRequiredArg()
+                    .describedAs("delegation-token")
+                    .ofType(String.class);
+            resourcePatternType = parser.accepts("resource-pattern-type", "The 
type of the resource pattern or pattern filter. " +
+                            "When adding acls, this should be a specific 
pattern type, e.g. 'literal' or 'prefixed'. " +
+                            "When listing or removing acls, a specific pattern 
type can be used to list or remove acls from specific resource patterns, " +
+                            "or use the filter values of 'any' or 'match', 
where 'any' will match any pattern type, but will match the resource name 
exactly, " +
+                            "where as 'match' will perform pattern matching to 
list or remove all acls that affect the supplied resource(s). " +
+                            "WARNING: 'match', when used in combination with 
the '--remove' switch, should be used with care.")
+                    .withRequiredArg()
+                    .ofType(String.class)
+                    .withValuesConvertedBy(new PatternTypeConverter())
+                    .defaultsTo(PatternType.LITERAL);
+            addOpt = parser.accepts("add", "Indicates you are trying to add 
ACLs.");
+            removeOpt = parser.accepts("remove", "Indicates you are trying to 
remove ACLs.");
+            listOpt = parser.accepts("list", "List ACLs for the specified 
resource, use --topic <topic> or --group <group> or --cluster to specify a 
resource.");
+            operationsOpt = parser.accepts("operation", "Operation that is 
being allowed or denied. Valid operation names are: " + NL +
+                            AclEntry.ACL_OPERATIONS.stream().map(o -> "\t" + 
SecurityUtils.operationName(o)).collect(Collectors.joining(NL)) + NL)
+                    .withRequiredArg()
+                    .ofType(String.class)
+                    .defaultsTo(SecurityUtils.operationName(AclOperation.ALL));
+            allowPrincipalsOpt = parser.accepts("allow-principal", "principal 
is in principalType:name format." +
+                            " Note that principalType must be supported by the 
Authorizer being used." +
+                            " For example, User:'*' is the wild card 
indicating all users.")
+                    .withRequiredArg()
+                    .describedAs("allow-principal")
+                    .ofType(String.class);
+            denyPrincipalsOpt = parser.accepts("deny-principal", "principal is 
in principalType:name format. " +
+                            "By default anyone not added through 
--allow-principal is denied access. " +
+                            "You only need to use this option as negation to 
already allowed set. " +
+                            "Note that principalType must be supported by the 
Authorizer being used. " +
+                            "For example if you wanted to allow access to all 
users in the system but not test-user you can define an ACL that " +
+                            "allows access to User:'*' and specify 
--deny-principal=User:[email protected]. " +
+                            "AND PLEASE REMEMBER DENY RULES TAKES PRECEDENCE 
OVER ALLOW RULES.")
+                    .withRequiredArg()
+                    .describedAs("deny-principal")
+                    .ofType(String.class);
+            listPrincipalsOpt = parser.accepts("principal", "List ACLs for the 
specified principal. principal is in principalType:name format." +
+                            " Note that principalType must be supported by the 
Authorizer being used. Multiple --principal option can be passed.")
+                    .withOptionalArg()
+                    .describedAs("principal")
+                    .ofType(String.class);
+            allowHostsOpt = parser.accepts("allow-host", "Host from which 
principals listed in --allow-principal will have access. " +
+                            "If you have specified --allow-principal then the 
default for this option will be set to '*' which allows access from all hosts.")
+                    .withRequiredArg()
+                    .describedAs("allow-host")
+                    .ofType(String.class);
+            denyHostsOpt = parser.accepts("deny-host", "Host from which 
principals listed in --deny-principal will be denied access. " +
+                            "If you have specified --deny-principal then the 
default for this option will be set to '*' which denies access from all hosts.")
+                    .withRequiredArg()
+                    .describedAs("deny-host")
+                    .ofType(String.class);
+            producerOpt = parser.accepts("producer", "Convenience option to 
add/remove ACLs for producer role. " +
+                    "This will generate ACLs that allows WRITE,DESCRIBE and 
CREATE on topic.");
+            consumerOpt = parser.accepts("consumer", "Convenience option to 
add/remove ACLs for consumer role. " +
+                    "This will generate ACLs that allows READ,DESCRIBE on 
topic and READ on group.");
+            forceOpt = parser.accepts("force", "Assume Yes to all queries and 
do not prompt.");
+            userPrincipalOpt = parser.accepts("user-principal", "Specifies a 
user principal as a resource in relation with the operation. For instance " +
+                            "one could grant CreateTokens or DescribeTokens 
permission on a given user principal.")
+                    .withRequiredArg()
+                    .describedAs("user-principal")
+                    .ofType(String.class);
+
+            try {
+                options = parser.parse(args);
+            } catch (OptionException e) {
+                CommandLineUtils.printUsageAndExit(parser, e.getMessage());
+            }
+            checkArgs();
+        }
+
+        void checkArgs() {
+            CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to 
manage acls on kafka.");
+
+            if (options.has(bootstrapServerOpt) && 
options.has(bootstrapControllerOpt)) {
+                CommandLineUtils.printUsageAndExit(parser, "Only one of 
--bootstrap-server or --bootstrap-controller must be specified");
+            }
+            if (!options.has(bootstrapServerOpt) && 
!options.has(bootstrapControllerOpt)) {
+                CommandLineUtils.printUsageAndExit(parser, "One of 
--bootstrap-server or --bootstrap-controller must be specified");
+            }
+            List<AbstractOptionSpec<?>> mutuallyExclusiveOptions = 
Arrays.asList(addOpt, removeOpt, listOpt);
+            long mutuallyExclusiveOptionsCount = 
mutuallyExclusiveOptions.stream()
+                    .filter(abstractOptionSpec -> 
options.has(abstractOptionSpec))
+                    .count();
+            if (mutuallyExclusiveOptionsCount != 1) {
+                CommandLineUtils.printUsageAndExit(parser, "Command must 
include exactly one action: --list, --add, --remove. ");
+            }
+            CommandLineUtils.checkInvalidArgs(parser, options, listOpt, 
producerOpt, consumerOpt, allowHostsOpt, allowPrincipalsOpt, denyHostsOpt, 
denyPrincipalsOpt);
+
+            //when --producer or --consumer is specified , user should not 
specify operations as they are inferred and we also disallow --deny-principals 
and --deny-hosts.
+            CommandLineUtils.checkInvalidArgs(parser, options, producerOpt, 
operationsOpt, denyPrincipalsOpt, denyHostsOpt);
+            CommandLineUtils.checkInvalidArgs(parser, options, consumerOpt, 
operationsOpt, denyPrincipalsOpt, denyHostsOpt);
+
+            if (options.has(listPrincipalsOpt) && !options.has(listOpt)) {
+                CommandLineUtils.printUsageAndExit(parser, "The --principal 
option is only available if --list is set");
+            }
+            if (options.has(producerOpt) && !options.has(topicOpt)) {
+                CommandLineUtils.printUsageAndExit(parser, "With --producer 
you must specify a --topic");
+            }
+            if (options.has(idempotentOpt) && !options.has(producerOpt)) {
+                CommandLineUtils.printUsageAndExit(parser, "The --idempotent 
option is only available if --producer is set");
+            }
+            if (options.has(consumerOpt) && (!options.has(topicOpt) || 
!options.has(groupOpt) || (!options.has(producerOpt) && 
(options.has(clusterOpt) || options.has(transactionalIdOpt))))) {
+                CommandLineUtils.printUsageAndExit(parser, "With --consumer 
you must specify a --topic and a --group and no --cluster or --transactional-id 
option should be specified.");
+            }
+        }
+    }
+
+    static class PatternTypeConverter extends EnumConverter<PatternType> {
+
+        PatternTypeConverter() {
+            super(PatternType.class);
+        }
+
+        @Override
+        public PatternType convert(String value) {
+            PatternType patternType = super.convert(value);
+            if (patternType.isUnknown())
+                throw new ValueConversionException("Unknown 
resource-pattern-type: " + value);
+
+            return patternType;
+        }
+
+        @Override
+        public String valuePattern() {
+            List<PatternType> values = Arrays.asList(PatternType.values());
+            List<PatternType> filteredValues = values.stream()
+                    .filter(type -> type != PatternType.UNKNOWN)
+                    .collect(Collectors.toList());
+            return filteredValues.stream()
+                    .map(Object::toString)
+                    .collect(Collectors.joining("|"));
+        }
+    }
+}
diff --git a/core/src/test/java/kafka/admin/AclCommandTest.java 
b/tools/src/test/java/org/apache/kafka/tools/AclCommandTest.java
similarity index 64%
rename from core/src/test/java/kafka/admin/AclCommandTest.java
rename to tools/src/test/java/org/apache/kafka/tools/AclCommandTest.java
index e71c348c272..12f4dab8801 100644
--- a/core/src/test/java/kafka/admin/AclCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/AclCommandTest.java
@@ -14,9 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package kafka.admin;
-
-import kafka.admin.AclCommand.AclCommandOptions;
+package org.apache.kafka.tools;
 
 import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AclBindingFilter;
@@ -44,15 +42,9 @@ import org.apache.logging.log4j.Level;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
-import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
-import java.io.PrintStream;
-import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -63,9 +55,6 @@ import java.util.stream.Collectors;
 
 import javax.management.InstanceAlreadyExistsException;
 
-import scala.Console;
-import scala.jdk.javaapi.CollectionConverters;
-
 import static org.apache.kafka.common.acl.AccessControlEntryFilter.ANY;
 import static org.apache.kafka.common.acl.AclOperation.ALTER;
 import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS;
@@ -120,102 +109,98 @@ public class AclCommandTest {
     private static final String TOPIC = "--topic";
     private static final String RESOURCE_PATTERN_TYPE = 
"--resource-pattern-type";
     private static final KafkaPrincipal PRINCIPAL = 
SecurityUtils.parseKafkaPrincipal("User:test2");
-    private static final Set<KafkaPrincipal> USERS = new 
HashSet<>(Arrays.asList(
-            
SecurityUtils.parseKafkaPrincipal("User:CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown"),
-            PRINCIPAL,
-            SecurityUtils.parseKafkaPrincipal("User:CN=\\#User with special 
chars in CN : (\\, \\+ \" \\ \\< \\> \\; ')")
-    ));
-    private static final Set<String> HOSTS = new 
HashSet<>(Arrays.asList("host1", "host2"));
-    private static final List<String> ALLOW_HOST_COMMAND = 
Arrays.asList("--allow-host", "host1", "--allow-host", "host2");
-    private static final List<String> DENY_HOST_COMMAND = 
Arrays.asList("--deny-host", "host1", "--deny-host", "host2");
+    private static final Set<KafkaPrincipal> USERS = Set.of(
+        
SecurityUtils.parseKafkaPrincipal("User:CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown"),
+        PRINCIPAL,
+        SecurityUtils.parseKafkaPrincipal("User:CN=\\#User with special chars 
in CN : (\\, \\+ \" \\ \\< \\> \\; ')")
+    );
+    private static final Set<String> HOSTS = Set.of("host1", "host2");
+    private static final List<String> ALLOW_HOST_COMMAND = 
List.of("--allow-host", "host1", "--allow-host", "host2");
+    private static final List<String> DENY_HOST_COMMAND = 
List.of("--deny-host", "host1", "--deny-host", "host2");
 
     private static final ResourcePattern CLUSTER_RESOURCE = new 
ResourcePattern(CLUSTER, Resource.CLUSTER_NAME, LITERAL);
-    private static final Set<ResourcePattern> TOPIC_RESOURCES = new 
HashSet<>(Arrays.asList(
-            new ResourcePattern(ResourceType.TOPIC, "test-1", LITERAL),
-            new ResourcePattern(ResourceType.TOPIC, "test-2", LITERAL)
-    ));
-    private static final Set<ResourcePattern> GROUP_RESOURCES = new 
HashSet<>(Arrays.asList(
-            new ResourcePattern(ResourceType.GROUP, "testGroup-1", LITERAL),
-            new ResourcePattern(ResourceType.GROUP, "testGroup-2", LITERAL)
-    ));
-    private static final Set<ResourcePattern> TRANSACTIONAL_ID_RESOURCES = new 
HashSet<>(Arrays.asList(
+    private static final Set<ResourcePattern> TOPIC_RESOURCES = Set.of(
+        new ResourcePattern(ResourceType.TOPIC, "test-1", LITERAL),
+        new ResourcePattern(ResourceType.TOPIC, "test-2", LITERAL)
+    );
+    private static final Set<ResourcePattern> GROUP_RESOURCES = Set.of(
+        new ResourcePattern(ResourceType.GROUP, "testGroup-1", LITERAL),
+        new ResourcePattern(ResourceType.GROUP, "testGroup-2", LITERAL)
+    );
+    private static final Set<ResourcePattern> TRANSACTIONAL_ID_RESOURCES = 
Set.of(
             new ResourcePattern(TRANSACTIONAL_ID, "t0", LITERAL),
             new ResourcePattern(TRANSACTIONAL_ID, "t1", LITERAL)
-    ));
-    private static final Set<ResourcePattern> TOKEN_RESOURCES = new 
HashSet<>(Arrays.asList(
-            new ResourcePattern(DELEGATION_TOKEN, "token1", LITERAL),
-            new ResourcePattern(DELEGATION_TOKEN, "token2", LITERAL)
-    ));
-    private static final Set<ResourcePattern> USER_RESOURCES = new 
HashSet<>(Arrays.asList(
-            new ResourcePattern(USER, "User:test-user1", LITERAL),
-            new ResourcePattern(USER, "User:test-user2", LITERAL)
-    ));
-
-    private static final Map<Set<ResourcePattern>, List<String>> 
RESOURCE_TO_COMMAND = new HashMap<Set<ResourcePattern>, List<String>>() {{
-            put(TOPIC_RESOURCES, Arrays.asList(TOPIC, "test-1", TOPIC, 
"test-2"));
-            put(Collections.singleton(CLUSTER_RESOURCE), 
Collections.singletonList("--cluster"));
-            put(GROUP_RESOURCES, Arrays.asList(GROUP, "testGroup-1", GROUP, 
"testGroup-2"));
-            put(TRANSACTIONAL_ID_RESOURCES, 
Arrays.asList("--transactional-id", "t0", "--transactional-id", "t1"));
-            put(TOKEN_RESOURCES, Arrays.asList("--delegation-token", "token1", 
"--delegation-token", "token2"));
-            put(USER_RESOURCES, Arrays.asList("--user-principal", 
"User:test-user1", "--user-principal", "User:test-user2"));
-        }};
-
-    private static final Map<Set<ResourcePattern>, 
Map.Entry<Set<AclOperation>, List<String>>> RESOURCE_TO_OPERATIONS =
-            new HashMap<Set<ResourcePattern>, Map.Entry<Set<AclOperation>, 
List<String>>>() {{
-                put(TOPIC_RESOURCES, new SimpleImmutableEntry<>(
-                        new HashSet<>(Arrays.asList(READ, WRITE, CREATE, 
DESCRIBE, DELETE, DESCRIBE_CONFIGS, ALTER_CONFIGS, ALTER)),
-                        Arrays.asList(OPERATION, "Read", OPERATION, "Write", 
OPERATION, "Create",
-                                OPERATION, "Describe", OPERATION, "Delete", 
OPERATION, "DescribeConfigs",
-                                OPERATION, "AlterConfigs", OPERATION, "Alter"))
-                );
-                put(Collections.singleton(CLUSTER_RESOURCE), new 
SimpleImmutableEntry<>(
-                        new HashSet<>(Arrays.asList(CREATE, CLUSTER_ACTION, 
DESCRIBE_CONFIGS, ALTER_CONFIGS, IDEMPOTENT_WRITE, ALTER, DESCRIBE)),
-                        Arrays.asList(OPERATION, "Create", OPERATION, 
"ClusterAction", OPERATION, "DescribeConfigs",
-                                OPERATION, "AlterConfigs", OPERATION, 
"IdempotentWrite", OPERATION, "Alter", OPERATION, "Describe"))
-                );
-                put(GROUP_RESOURCES, new SimpleImmutableEntry<>(
-                        new HashSet<>(Arrays.asList(READ, DESCRIBE, DELETE)),
-                        Arrays.asList(OPERATION, "Read", OPERATION, 
"Describe", OPERATION, "Delete"))
-                );
-                put(TRANSACTIONAL_ID_RESOURCES, new SimpleImmutableEntry<>(
-                        new HashSet<>(Arrays.asList(DESCRIBE, WRITE)),
-                        Arrays.asList(OPERATION, "Describe", OPERATION, 
"Write"))
-                );
-                put(TOKEN_RESOURCES, new 
SimpleImmutableEntry<>(Collections.singleton(DESCRIBE), 
Arrays.asList(OPERATION, "Describe")));
-                put(USER_RESOURCES, new SimpleImmutableEntry<>(
-                        new HashSet<>(Arrays.asList(CREATE_TOKENS, 
DESCRIBE_TOKENS)),
-                        Arrays.asList(OPERATION, "CreateTokens", OPERATION, 
"DescribeTokens"))
-                );
-            }};
-
-    private static final Map<Set<ResourcePattern>, Set<AccessControlEntry>> 
CONSUMER_RESOURCE_TO_ACLS =
-            new HashMap<Set<ResourcePattern>, Set<AccessControlEntry>>() {{
-                put(TOPIC_RESOURCES, 
asJavaSet(AclCommand.getAcls(asScalaSet(USERS), ALLOW,
-                        asScalaSet(new HashSet<>(Arrays.asList(READ, 
DESCRIBE))), asScalaSet(HOSTS))));
-                put(GROUP_RESOURCES, 
asJavaSet(AclCommand.getAcls(asScalaSet(USERS), ALLOW,
-                        asScalaSet(Collections.singleton(READ)), 
asScalaSet(HOSTS))));
-            }};
-
-    private static final Map<List<String>, Map<Set<ResourcePattern>, 
Set<AccessControlEntry>>> CMD_TO_RESOURCES_TO_ACL =
-            new HashMap<List<String>, Map<Set<ResourcePattern>, 
Set<AccessControlEntry>>>() {{
-                put(Collections.singletonList(PRODUCER), 
producerResourceToAcls(false));
-                put(Arrays.asList(PRODUCER, IDEMPOTENT), 
producerResourceToAcls(true));
-                put(Collections.singletonList(CONSUMER), 
CONSUMER_RESOURCE_TO_ACLS);
-                put(Arrays.asList(PRODUCER, CONSUMER),
-                        
CONSUMER_RESOURCE_TO_ACLS.entrySet().stream().map(entry -> {
-                            Set<AccessControlEntry> value = new 
HashSet<>(entry.getValue());
-                            value.addAll(producerResourceToAcls(false)
-                                    .getOrDefault(entry.getKey(), 
Collections.emptySet()));
-                            return new SimpleEntry<>(entry.getKey(), value);
-                        }).collect(Collectors.toMap(Entry::getKey, 
Entry::getValue)));
-                put(Arrays.asList(PRODUCER, IDEMPOTENT, CONSUMER),
-                        
CONSUMER_RESOURCE_TO_ACLS.entrySet().stream().map(entry -> {
-                            Set<AccessControlEntry> value = new 
HashSet<>(entry.getValue());
-                            value.addAll(producerResourceToAcls(true)
-                                    .getOrDefault(entry.getKey(), 
Collections.emptySet()));
-                            return new SimpleEntry<>(entry.getKey(), value);
-                        }).collect(Collectors.toMap(Entry::getKey, 
Entry::getValue)));
-            }};
+    );
+    private static final Set<ResourcePattern> TOKEN_RESOURCES = Set.of(
+        new ResourcePattern(DELEGATION_TOKEN, "token1", LITERAL),
+        new ResourcePattern(DELEGATION_TOKEN, "token2", LITERAL)
+    );
+    private static final Set<ResourcePattern> USER_RESOURCES = Set.of(
+        new ResourcePattern(USER, "User:test-user1", LITERAL),
+        new ResourcePattern(USER, "User:test-user2", LITERAL)
+    );
+
+    private static final Map<Set<ResourcePattern>, List<String>> 
RESOURCE_TO_COMMAND = Map.of(
+        TOPIC_RESOURCES, List.of(TOPIC, "test-1", TOPIC, "test-2"),
+        Set.of(CLUSTER_RESOURCE), List.of("--cluster"),
+        GROUP_RESOURCES, List.of(GROUP, "testGroup-1", GROUP, "testGroup-2"),
+        TRANSACTIONAL_ID_RESOURCES, List.of("--transactional-id", "t0", 
"--transactional-id", "t1"),
+        TOKEN_RESOURCES, List.of("--delegation-token", "token1", 
"--delegation-token", "token2"),
+        USER_RESOURCES, List.of("--user-principal", "User:test-user1", 
"--user-principal", "User:test-user2")
+    );
+
+    private static final Map<Set<ResourcePattern>, 
Map.Entry<Set<AclOperation>, List<String>>> RESOURCE_TO_OPERATIONS = Map.of(
+        TOPIC_RESOURCES, Map.entry(
+            Set.of(READ, WRITE, CREATE, DESCRIBE, DELETE, DESCRIBE_CONFIGS, 
ALTER_CONFIGS, ALTER),
+            List.of(OPERATION, "Read", OPERATION, "Write", OPERATION, "Create",
+                    OPERATION, "Describe", OPERATION, "Delete", OPERATION, 
"DescribeConfigs",
+                    OPERATION, "AlterConfigs", OPERATION, "Alter")),
+        Set.of(CLUSTER_RESOURCE), Map.entry(
+            Set.of(CREATE, CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, 
IDEMPOTENT_WRITE, ALTER, DESCRIBE),
+            List.of(OPERATION, "Create", OPERATION, "ClusterAction", 
OPERATION, "DescribeConfigs",
+                    OPERATION, "AlterConfigs", OPERATION, "IdempotentWrite", 
OPERATION, "Alter", OPERATION, "Describe")),
+        GROUP_RESOURCES, Map.entry(
+            Set.of(READ, DESCRIBE, DELETE),
+            List.of(OPERATION, "Read", OPERATION, "Describe", OPERATION, 
"Delete")),
+        TRANSACTIONAL_ID_RESOURCES, Map.entry(
+            Set.of(DESCRIBE, WRITE),
+            List.of(OPERATION, "Describe", OPERATION, "Write")),
+        TOKEN_RESOURCES, Map.entry(
+            Set.of(DESCRIBE),
+            List.of(OPERATION, "Describe")),
+        USER_RESOURCES, Map.entry(
+            Set.of(CREATE_TOKENS, DESCRIBE_TOKENS),
+            List.of(OPERATION, "CreateTokens", OPERATION, "DescribeTokens"))
+    );
+
+    private static final Map<Set<ResourcePattern>, Set<AccessControlEntry>> 
CONSUMER_RESOURCE_TO_ACLS = Map.of(
+        TOPIC_RESOURCES, AclCommand.getAcls(USERS, ALLOW, Set.of(READ, 
DESCRIBE), HOSTS),
+        GROUP_RESOURCES, AclCommand.getAcls(USERS, ALLOW, Set.of(READ), HOSTS)
+    );
+
+    private static final Map<List<String>, Map<Set<ResourcePattern>, 
Set<AccessControlEntry>>> CMD_TO_RESOURCES_TO_ACL = Map.of(
+        List.of(PRODUCER), producerResourceToAcls(false),
+        List.of(PRODUCER, IDEMPOTENT), producerResourceToAcls(true),
+        List.of(CONSUMER), CONSUMER_RESOURCE_TO_ACLS,
+        List.of(PRODUCER, CONSUMER),
+            
CONSUMER_RESOURCE_TO_ACLS.entrySet().stream().collect(Collectors.toMap(
+                Map.Entry::getKey,
+                entry -> {
+                    Set<AccessControlEntry> value = new 
HashSet<>(entry.getValue());
+                    
value.addAll(producerResourceToAcls(false).getOrDefault(entry.getKey(), 
Set.of()));
+                    return value;
+                }
+            )),
+        List.of(PRODUCER, IDEMPOTENT, CONSUMER),
+            
CONSUMER_RESOURCE_TO_ACLS.entrySet().stream().collect(Collectors.toMap(
+                Map.Entry::getKey,
+                entry -> {
+                    Set<AccessControlEntry> value = new 
HashSet<>(entry.getValue());
+                    
value.addAll(producerResourceToAcls(true).getOrDefault(entry.getKey(), 
Set.of()));
+                    return value;
+                }
+            ))
+    );
 
     @ClusterTest
     public void testAclCliWithAdminAPI(ClusterInstance cluster) throws 
InterruptedException {
@@ -295,7 +280,7 @@ public class AclCommandTest {
     @Test
     public void testUseBootstrapServerOptWithBootstrapControllerOpt() {
         assertInitializeInvalidOptionsExitCodeAndMsg(
-                Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, 
BOOTSTRAP_CONTROLLER, LOCALHOST),
+                List.of(BOOTSTRAP_SERVER, LOCALHOST, BOOTSTRAP_CONTROLLER, 
LOCALHOST),
                 "Only one of --bootstrap-server or --bootstrap-controller must 
be specified"
         );
     }
@@ -303,7 +288,7 @@ public class AclCommandTest {
     @Test
     public void testUseWithoutBootstrapServerOptAndBootstrapControllerOpt() {
         assertInitializeInvalidOptionsExitCodeAndMsg(
-                Collections.emptyList(),
+                List.of(ADD),
                 "One of --bootstrap-server or --bootstrap-controller must be 
specified"
         );
     }
@@ -311,14 +296,14 @@ public class AclCommandTest {
     @Test
     public void testExactlyOneAction() {
         String errMsg = "Command must include exactly one action: --list, 
--add, --remove. ";
-        
assertInitializeInvalidOptionsExitCodeAndMsg(Arrays.asList(BOOTSTRAP_SERVER, 
LOCALHOST, ADD, LIST), errMsg);
-        
assertInitializeInvalidOptionsExitCodeAndMsg(Arrays.asList(BOOTSTRAP_SERVER, 
LOCALHOST, ADD, LIST, REMOVE), errMsg);
+        assertInitializeInvalidOptionsExitCodeAndMsg(List.of(BOOTSTRAP_SERVER, 
LOCALHOST, ADD, LIST), errMsg);
+        assertInitializeInvalidOptionsExitCodeAndMsg(List.of(BOOTSTRAP_SERVER, 
LOCALHOST, ADD, LIST, REMOVE), errMsg);
     }
 
     @Test
     public void testUseListPrincipalsOptWithoutListOpt() {
         assertInitializeInvalidOptionsExitCodeAndMsg(
-                Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, ADD, "--principal", 
"User:CN=client"),
+                List.of(BOOTSTRAP_SERVER, LOCALHOST, ADD, "--principal", 
"User:CN=client"),
                 "The --principal option is only available if --list is set"
         );
     }
@@ -326,7 +311,7 @@ public class AclCommandTest {
     @Test
     public void testUseProducerOptWithoutTopicOpt() {
         assertInitializeInvalidOptionsExitCodeAndMsg(
-                Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, ADD, PRODUCER),
+                List.of(BOOTSTRAP_SERVER, LOCALHOST, ADD, PRODUCER),
                 "With --producer you must specify a --topic"
         );
     }
@@ -334,7 +319,7 @@ public class AclCommandTest {
     @Test
     public void testUseIdempotentOptWithoutProducerOpt() {
         assertInitializeInvalidOptionsExitCodeAndMsg(
-                Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, ADD, IDEMPOTENT),
+                List.of(BOOTSTRAP_SERVER, LOCALHOST, ADD, IDEMPOTENT),
                 "The --idempotent option is only available if --producer is 
set"
         );
     }
@@ -342,24 +327,24 @@ public class AclCommandTest {
     @Test
     public void testUseConsumerOptWithoutRequiredOpt() {
         assertInitializeInvalidOptionsExitCodeAndMsg(
-                Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, ADD, CONSUMER),
+                List.of(BOOTSTRAP_SERVER, LOCALHOST, ADD, CONSUMER),
                 "With --consumer you must specify a --topic and a --group and 
no --cluster or --transactional-id option should be specified."
         );
-        checkNotThrow(Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, ADD, 
CONSUMER, TOPIC, "test-topic", GROUP, "test-group"));
+        checkNotThrow(List.of(BOOTSTRAP_SERVER, LOCALHOST, ADD, CONSUMER, 
TOPIC, "test-topic", GROUP, "test-group"));
     }
 
     @Test
     public void testInvalidArgs() {
         assertInitializeInvalidOptionsExitCodeAndMsg(
-                Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, LIST, PRODUCER),
+                List.of(BOOTSTRAP_SERVER, LOCALHOST, LIST, PRODUCER),
                 "Option \"[list]\" can't be used with option \"[producer]\""
         );
         assertInitializeInvalidOptionsExitCodeAndMsg(
-                Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, ADD, PRODUCER, 
OPERATION),
+                List.of(BOOTSTRAP_SERVER, LOCALHOST, ADD, PRODUCER, OPERATION, 
"all"),
                 "Option \"[producer]\" can't be used with option 
\"[operation]\""
         );
         assertInitializeInvalidOptionsExitCodeAndMsg(
-                Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, ADD, CONSUMER, 
OPERATION, TOPIC, "test-topic", GROUP, "test-group"),
+                List.of(BOOTSTRAP_SERVER, LOCALHOST, ADD, CONSUMER, OPERATION, 
TOPIC, "test-topic", GROUP, "test-group"),
                 "Option \"[consumer]\" can't be used with option 
\"[operation]\""
         );
     }
@@ -394,8 +379,7 @@ public class AclCommandTest {
     }
 
     private void testAclsOnPrefixedResources(ClusterInstance cluster, 
List<String> cmdArgs) throws InterruptedException {
-        List<String> cmd = Arrays.asList("--allow-principal", 
PRINCIPAL.toString(), PRODUCER, TOPIC, "Test-",
-                RESOURCE_PATTERN_TYPE, "Prefixed");
+        List<String> cmd = List.of("--allow-principal", PRINCIPAL.toString(), 
PRODUCER, TOPIC, "Test-", RESOURCE_PATTERN_TYPE, "Prefixed");
 
         List<String> args = new ArrayList<>(cmdArgs);
         args.addAll(cmd);
@@ -406,7 +390,7 @@ public class AclCommandTest {
         AccessControlEntry describeAcl = new 
AccessControlEntry(PRINCIPAL.toString(), WILDCARD_HOST, DESCRIBE, ALLOW);
         AccessControlEntry createAcl = new 
AccessControlEntry(PRINCIPAL.toString(), WILDCARD_HOST, CREATE, ALLOW);
         cluster.waitAcls(new AclBindingFilter(new 
ResourcePattern(ResourceType.TOPIC, "Test-", PREFIXED).toFilter(), ANY),
-                Arrays.asList(writeAcl, describeAcl, createAcl));
+                List.of(writeAcl, describeAcl, createAcl));
 
         args = new ArrayList<>(cmdArgs);
         args.addAll(cmd);
@@ -415,45 +399,42 @@ public class AclCommandTest {
         callMain(args);
 
         cluster.waitAcls(new AclBindingFilter(new 
ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PREFIXED).toFilter(), 
ANY),
-                Collections.emptySet());
+                Set.of());
         cluster.waitAcls(new AclBindingFilter(new 
ResourcePattern(ResourceType.TOPIC, "Test-", PREFIXED).toFilter(), ANY),
-                Collections.emptySet());
+                Set.of());
     }
 
     private static Map<Set<ResourcePattern>, Set<AccessControlEntry>> 
producerResourceToAcls(boolean enableIdempotence) {
-        Map<Set<ResourcePattern>, Set<AccessControlEntry>> result = new 
HashMap<>();
-        result.put(TOPIC_RESOURCES, 
asJavaSet(AclCommand.getAcls(asScalaSet(USERS), ALLOW, asScalaSet(
-                new HashSet<>(Arrays.asList(WRITE, DESCRIBE, CREATE))), 
asScalaSet(HOSTS))));
-        result.put(TRANSACTIONAL_ID_RESOURCES, 
asJavaSet(AclCommand.getAcls(asScalaSet(USERS), ALLOW, asScalaSet(
-                new HashSet<>(Arrays.asList(WRITE, DESCRIBE))), 
asScalaSet(HOSTS))));
-        result.put(Collections.singleton(CLUSTER_RESOURCE), 
asJavaSet(AclCommand.getAcls(asScalaSet(USERS), ALLOW,
+        return Map.of(
+            TOPIC_RESOURCES, AclCommand.getAcls(USERS, ALLOW, Set.of(WRITE, 
DESCRIBE, CREATE), HOSTS),
+            TRANSACTIONAL_ID_RESOURCES, AclCommand.getAcls(USERS, ALLOW, 
Set.of(WRITE, DESCRIBE), HOSTS),
+            Set.of(CLUSTER_RESOURCE), AclCommand.getAcls(USERS, ALLOW,
                 enableIdempotence
-                        ? asScalaSet(Collections.singleton(IDEMPOTENT_WRITE))
-                        : asScalaSet(Collections.emptySet()), 
asScalaSet(HOSTS))));
-        return result;
+                        ? Set.of(IDEMPOTENT_WRITE)
+                        : Set.of(), HOSTS));
     }
 
     private List<String> adminArgs(String bootstrapServer, Optional<File> 
commandConfig) {
-        List<String> adminArgs = new 
ArrayList<>(Arrays.asList(BOOTSTRAP_SERVER, bootstrapServer));
-        commandConfig.ifPresent(file -> 
adminArgs.addAll(Arrays.asList(COMMAND_CONFIG, file.getAbsolutePath())));
+        List<String> adminArgs = new ArrayList<>(List.of(BOOTSTRAP_SERVER, 
bootstrapServer));
+        commandConfig.ifPresent(file -> 
adminArgs.addAll(List.of(COMMAND_CONFIG, file.getAbsolutePath())));
         return adminArgs;
     }
 
     private List<String> adminArgsWithBootstrapController(String 
bootstrapController, Optional<File> commandConfig) {
-        List<String> adminArgs = new 
ArrayList<>(Arrays.asList(BOOTSTRAP_CONTROLLER, bootstrapController));
-        commandConfig.ifPresent(file -> 
adminArgs.addAll(Arrays.asList(COMMAND_CONFIG, file.getAbsolutePath())));
+        List<String> adminArgs = new ArrayList<>(List.of(BOOTSTRAP_CONTROLLER, 
bootstrapController));
+        commandConfig.ifPresent(file -> 
adminArgs.addAll(List.of(COMMAND_CONFIG, file.getAbsolutePath())));
         return adminArgs;
     }
 
     private Map.Entry<String, String> callMain(List<String> args) {
-        return grabConsoleOutputAndError(() -> 
AclCommand.main(args.toArray(new String[0])));
+        return ToolsTestUtils.grabConsoleOutputAndError(() -> 
AclCommand.main(args.toArray(new String[0])));
     }
 
     private void testAclCli(ClusterInstance cluster, List<String> cmdArgs) 
throws InterruptedException {
         for (Map.Entry<Set<ResourcePattern>, List<String>> entry : 
RESOURCE_TO_COMMAND.entrySet()) {
             Set<ResourcePattern> resources = entry.getKey();
             List<String> resourceCmd = entry.getValue();
-            Set<AclPermissionType> permissionTypes = new 
HashSet<>(Arrays.asList(ALLOW, DENY));
+            Set<AclPermissionType> permissionTypes = Set.of(ALLOW, DENY);
             for (AclPermissionType permissionType : permissionTypes) {
                 Map.Entry<Set<AclOperation>, List<String>> operationToCmd = 
RESOURCE_TO_OPERATIONS.get(resources);
                 Map.Entry<Set<AccessControlEntry>, List<String>> aclToCommand 
= getAclToCommand(permissionType, operationToCmd.getKey());
@@ -494,8 +475,8 @@ public class AclCommandTest {
             String resourceType = resource.resourceType().toString();
 
             List<String> cmd = resource == CLUSTER_RESOURCE
-                    ? Collections.singletonList("kafka-cluster")
-                    : resourceCmd.stream().filter(s -> 
!s.startsWith("--")).collect(Collectors.toList());
+                    ? List.of("kafka-cluster")
+                    : resourceCmd.stream().filter(s -> 
!s.startsWith("--")).toList();
 
             cmd.forEach(name -> {
                 String expected = String.format(
@@ -509,36 +490,37 @@ public class AclCommandTest {
 
     private void testPatternTypes(List<String> cmdArgs) {
         Exit.setExitProcedure((status, message) -> {
-            if ((int) status == 1)
+            if (status == 1)
                 throw new RuntimeException("Exiting command");
             else
                 throw new AssertionError("Unexpected exit with status " + 
status);
         });
         try {
-            
Arrays.stream(PatternType.values()).sequential().forEach(patternType -> {
+            for (PatternType patternType : PatternType.values()) {
                 List<String> addCmd = new ArrayList<>(cmdArgs);
-                addCmd.addAll(Arrays.asList("--allow-principal", 
PRINCIPAL.toString(), PRODUCER, TOPIC, "Test",
+                addCmd.addAll(List.of("--allow-principal", 
PRINCIPAL.toString(), PRODUCER, TOPIC, "Test",
                         ADD, RESOURCE_PATTERN_TYPE, patternType.toString()));
                 verifyPatternType(addCmd, patternType.isSpecific());
 
                 List<String> listCmd = new ArrayList<>(cmdArgs);
-                listCmd.addAll(Arrays.asList(TOPIC, "Test", LIST, 
RESOURCE_PATTERN_TYPE, patternType.toString()));
+                listCmd.addAll(List.of(TOPIC, "Test", LIST, 
RESOURCE_PATTERN_TYPE, patternType.toString()));
                 verifyPatternType(listCmd, patternType != PatternType.UNKNOWN);
 
                 List<String> removeCmd = new ArrayList<>(cmdArgs);
-                removeCmd.addAll(Arrays.asList(TOPIC, "Test", "--force", 
REMOVE, RESOURCE_PATTERN_TYPE, patternType.toString()));
+                removeCmd.addAll(List.of(TOPIC, "Test", "--force", REMOVE, 
RESOURCE_PATTERN_TYPE, patternType.toString()));
                 verifyPatternType(removeCmd, patternType != 
PatternType.UNKNOWN);
-            });
+            }
         } finally {
             Exit.resetExitProcedure();
         }
     }
 
     private void verifyPatternType(List<String> cmd, boolean isValid) {
-        if (isValid)
+        if (isValid) {
             callMain(cmd);
-        else
+        } else {
             assertThrows(RuntimeException.class, () -> callMain(cmd));
+        }
     }
 
     private void testRemove(
@@ -554,7 +536,7 @@ public class AclCommandTest {
         Map.Entry<String, String> out = callMain(args);
         assertEquals("", out.getValue());
         for (ResourcePattern resource : resources) {
-            cluster.waitAcls(new AclBindingFilter(resource.toFilter(), ANY), 
Collections.emptySet());
+            cluster.waitAcls(new AclBindingFilter(resource.toFilter(), ANY), 
Set.of());
         }
     }
 
@@ -562,8 +544,8 @@ public class AclCommandTest {
             AclPermissionType permissionType,
             Set<AclOperation> operations
     ) {
-        return new SimpleImmutableEntry<>(
-                asJavaSet(AclCommand.getAcls(asScalaSet(USERS), 
permissionType, asScalaSet(operations), asScalaSet(HOSTS))),
+        return Map.entry(
+                AclCommand.getAcls(USERS, permissionType, operations, HOSTS),
                 getCmd(permissionType)
         );
     }
@@ -575,44 +557,12 @@ public class AclCommandTest {
         List<String> fullCmd = new ArrayList<>();
         for (KafkaPrincipal user : USERS) {
             fullCmd.addAll(cmd);
-            fullCmd.addAll(Arrays.asList(principalCmd, user.toString()));
+            fullCmd.addAll(List.of(principalCmd, user.toString()));
         }
 
         return fullCmd;
     }
 
-    /**
-     * Capture both the console output and console error during the execution 
of the provided function.
-     */
-    private static Map.Entry<String, String> 
grabConsoleOutputAndError(Runnable runnable) {
-        ByteArrayOutputStream outBuf = new ByteArrayOutputStream();
-        ByteArrayOutputStream errBuf = new ByteArrayOutputStream();
-        PrintStream out = new PrintStream(outBuf);
-        PrintStream err = new PrintStream(errBuf);
-        try {
-            Console.withOut(out, () -> {
-                Console.withErr(err, () -> {
-                    runnable.run();
-                    return null;
-                });
-                return null;
-            });
-        } finally {
-            out.flush();
-            err.flush();
-        }
-
-        return new SimpleImmutableEntry<>(outBuf.toString(), 
errBuf.toString());
-    }
-
-    private static <T> scala.collection.immutable.Set<T> asScalaSet(Set<T> 
javaSet) {
-        return CollectionConverters.asScala(javaSet).toSet();
-    }
-
-    private static <T> Set<T> asJavaSet(scala.collection.immutable.Set<T> 
scalaSet) {
-        return CollectionConverters.asJava(scalaSet);
-    }
-
     private void assertInitializeInvalidOptionsExitCodeAndMsg(List<String> 
args, String expectedMsg) {
         Exit.setExitProcedure((exitCode, message) -> {
             assertEquals(1, exitCode);
@@ -620,7 +570,7 @@ public class AclCommandTest {
             throw new RuntimeException();
         });
         try {
-            assertThrows(RuntimeException.class, () -> new 
AclCommandOptions(args.toArray(new String[0])).checkArgs());
+            assertThrows(RuntimeException.class, () -> new 
AclCommand.AclCommandOptions(args.toArray(new String[0])).checkArgs());
         } finally {
             Exit.resetExitProcedure();
         }
@@ -628,15 +578,15 @@ public class AclCommandTest {
 
     private void checkNotThrow(List<String> args) {
         AtomicReference<Integer> exitStatus = new AtomicReference<>();
-        org.apache.kafka.common.utils.Exit.setExitProcedure((status, __) -> {
+        Exit.setExitProcedure((status, __) -> {
             exitStatus.set(status);
             throw new RuntimeException();
         });
         try {
-            assertDoesNotThrow(() -> new AclCommandOptions(args.toArray(new 
String[0])).checkArgs());
+            assertDoesNotThrow(() -> new 
AclCommand.AclCommandOptions(args.toArray(new String[0])).checkArgs());
             assertNull(exitStatus.get());
         } finally {
-            org.apache.kafka.common.utils.Exit.resetExitProcedure();
+            Exit.resetExitProcedure();
         }
     }
 }

Reply via email to