This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f348f10  KAFKA-7117: Support AdminClient API in AclCommand (KIP-332) 
(#5463)
f348f10 is described below

commit f348f10ef87925081fdf9455ace6d2a86179b483
Author: Manikumar Reddy O <[email protected]>
AuthorDate: Sat Sep 8 06:10:59 2018 +0530

    KAFKA-7117: Support AdminClient API in AclCommand (KIP-332) (#5463)
    
    Reviewers: Colin Patrick McCabe <[email protected]>, Jun Rao 
<[email protected]>
---
 core/src/main/scala/kafka/admin/AclCommand.scala   | 280 ++++++++++++++++-----
 .../main/scala/kafka/security/SecurityUtils.scala  |   5 +-
 .../scala/unit/kafka/admin/AclCommandTest.scala    |  89 +++++--
 docs/security.html                                 |  24 +-
 4 files changed, 307 insertions(+), 91 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala 
b/core/src/main/scala/kafka/admin/AclCommand.scala
index 31e6c53..c2dda33 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -17,17 +17,22 @@
 
 package kafka.admin
 
+import java.util.Properties
+
 import joptsimple._
 import joptsimple.util.EnumConverter
 import kafka.security.auth._
 import kafka.server.KafkaConfig
 import kafka.utils._
+import org.apache.kafka.clients.admin.{AdminClientConfig, AdminClient => 
JAdminClient}
+import org.apache.kafka.common.acl._
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.common.resource.{PatternType, ResourcePatternFilter, 
Resource => JResource, ResourceType => JResourceType}
+import org.apache.kafka.common.utils.{SecurityUtils, Utils}
+import org.apache.kafka.common.resource.{PatternType, ResourcePattern, 
ResourcePatternFilter, Resource => JResource, ResourceType => JResourceType}
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 object AclCommand extends Logging {
 
@@ -52,13 +57,21 @@ object AclCommand extends Logging {
 
     opts.checkArgs()
 
+    val aclCommandService = {
+      if (opts.options.has(opts.bootstrapServerOpt)) {
+        new AdminClientService(opts)
+      } else {
+        new AuthorizerService(opts)
+      }
+    }
+
     try {
       if (opts.options.has(opts.addOpt))
-        addAcl(opts)
+        aclCommandService.addAcls()
       else if (opts.options.has(opts.removeOpt))
-        removeAcl(opts)
+        aclCommandService.removeAcls()
       else if (opts.options.has(opts.listOpt))
-        listAcl(opts)
+        aclCommandService.listAcls()
     } catch {
       case e: Throwable =>
         println(s"Error while executing ACL command: ${e.getMessage}")
@@ -67,91 +80,202 @@ object AclCommand extends Logging {
     }
   }
 
-  def withAuthorizer(opts: AclCommandOptions)(f: Authorizer => Unit) {
-    val defaultProps = Map(KafkaConfig.ZkEnableSecureAclsProp -> 
JaasUtils.isZkSecurityEnabled)
-    val authorizerProperties =
-      if (opts.options.has(opts.authorizerPropertiesOpt)) {
-        val authorizerProperties = 
opts.options.valuesOf(opts.authorizerPropertiesOpt).asScala
-        defaultProps ++ 
CommandLineUtils.parseKeyValueArgs(authorizerProperties, acceptMissingValue = 
false).asScala
-      } else {
-        defaultProps
+  sealed trait AclCommandService {
+    def addAcls(): Unit
+    def removeAcls(): Unit
+    def listAcls(): Unit
+  }
+
+  class AdminClientService(val opts: AclCommandOptions) extends 
AclCommandService with Logging {
+
+    private def withAdminClient(opts: AclCommandOptions)(f: JAdminClient => 
Unit) {
+      val props = if (opts.options.has(opts.commandConfigOpt))
+        Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
+      else
+        new Properties()
+      props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
opts.options.valueOf(opts.bootstrapServerOpt))
+      val adminClient = JAdminClient.create(props)
+
+      try {
+        f(adminClient)
+      } finally {
+        adminClient.close()
       }
+    }
 
-    val authorizerClass = opts.options.valueOf(opts.authorizerOpt)
-    val authZ = CoreUtils.createObject[Authorizer](authorizerClass)
-    try {
-      authZ.configure(authorizerProperties.asJava)
-      f(authZ)
+    def addAcls(): Unit = {
+      val resourceToAcl = getResourceToAcls(opts)
+      withAdminClient(opts) { adminClient =>
+        for ((resource, acls) <- resourceToAcl) {
+          val resourcePattern = resource.toPattern
+          println(s"Adding ACLs for resource `$resourcePattern`: $Newline 
${acls.map("\t" + _).mkString(Newline)} $Newline")
+          val aclBindings = acls.map(acl => new AclBinding(resourcePattern, 
getAccessControlEntry(acl))).asJavaCollection
+          adminClient.createAcls(aclBindings).all().get()
+        }
+
+        listAcls()
+      }
     }
-    finally CoreUtils.swallow(authZ.close(), this)
-  }
 
-  private def addAcl(opts: AclCommandOptions) {
-    val patternType: PatternType = 
opts.options.valueOf(opts.resourcePatternType)
-    if (!patternType.isSpecific)
-      CommandLineUtils.printUsageAndDie(opts.parser, s"A 
'--resource-pattern-type' value of '$patternType' is not valid when adding 
acls.")
+    def removeAcls(): Unit = {
+      withAdminClient(opts) { adminClient =>
+        val filterToAcl = getResourceFilterToAcls(opts)
+
+        for ((filter, acls) <- filterToAcl) {
+          if (acls.isEmpty) {
+            if (confirmAction(opts, s"Are you sure you want to delete all ACLs 
for resource filter `$filter`? (y/n)"))
+              removeAcls(adminClient, acls, filter)
+          } else {
+            if (confirmAction(opts, s"Are you sure you want to remove ACLs: 
$Newline ${acls.map("\t" + _).mkString(Newline)} $Newline from resource filter 
`$filter`? (y/n)"))
+              removeAcls(adminClient, acls, filter)
+          }
+        }
+
+        listAcls()
+      }
+    }
 
-    withAuthorizer(opts) { authorizer =>
-      val resourceToAcl = getResourceFilterToAcls(opts).map {
-        case (filter, acls) =>
-          Resource(ResourceType.fromJava(filter.resourceType()), 
filter.name(), filter.patternType()) -> acls
+    def listAcls(): Unit = {
+      withAdminClient(opts) { adminClient =>
+        val filters = getResourceFilter(opts, dieIfNoResourceFound = false)
+        val resourceToAcls = getAcls(adminClient, filters)
+
+        for ((resource, acls) <- resourceToAcls)
+          println(s"Current ACLs for resource `$resource`: $Newline 
${acls.map("\t" + _).mkString(Newline)} $Newline")
       }
+    }
 
-      if (resourceToAcl.values.exists(_.isEmpty))
-        CommandLineUtils.printUsageAndDie(opts.parser, "You must specify one 
of: --allow-principal, --deny-principal when trying to add ACLs.")
+    private def getAccessControlEntry(acl: Acl): AccessControlEntry = {
+      new AccessControlEntry(acl.principal.toString, acl.host, 
acl.operation.toJava, acl.permissionType.toJava)
+    }
 
-      for ((resource, acls) <- resourceToAcl) {
-        println(s"Adding ACLs for resource `$resource`: $Newline 
${acls.map("\t" + _).mkString(Newline)} $Newline")
-        authorizer.addAcls(acls, resource)
+    private def removeAcls(adminClient: JAdminClient, acls: Set[Acl], filter: 
ResourcePatternFilter): Unit = {
+      if (acls.isEmpty)
+        adminClient.deleteAcls(List(new AclBindingFilter(filter, 
AccessControlEntryFilter.ANY)).asJava).all().get()
+      else {
+        val aclBindingFilters = acls.map(acl => new AclBindingFilter(filter, 
getAccessControlEntryFilter(acl))).toList.asJava
+        adminClient.deleteAcls(aclBindingFilters).all().get()
       }
+    }
+
+    private def getAccessControlEntryFilter(acl: Acl): 
AccessControlEntryFilter = {
+      new AccessControlEntryFilter(acl.principal.toString, acl.host, 
acl.operation.toJava, acl.permissionType.toJava)
+    }
 
-      listAcl(opts)
+    private def getAcls(adminClient: JAdminClient, filters: 
Set[ResourcePatternFilter]): Map[ResourcePattern, Set[AccessControlEntry]] = {
+      val aclBindings =
+        if (filters.isEmpty) 
adminClient.describeAcls(AclBindingFilter.ANY).values().get().asScala.toList
+        else {
+          val results = for (filter <- filters) yield {
+            adminClient.describeAcls(new AclBindingFilter(filter, 
AccessControlEntryFilter.ANY)).values().get().asScala.toList
+          }
+          results.reduceLeft(_ ++ _)
+        }
+
+      val resourceToAcls = mutable.Map[ResourcePattern, 
Set[AccessControlEntry]]().withDefaultValue(Set())
+
+      aclBindings.foreach(aclBinding => resourceToAcls(aclBinding.pattern()) = 
resourceToAcls(aclBinding.pattern()) + aclBinding.entry())
+      resourceToAcls.toMap
     }
   }
 
-  private def removeAcl(opts: AclCommandOptions) {
-    withAuthorizer(opts) { authorizer =>
-      val filterToAcl = getResourceFilterToAcls(opts)
+  class AuthorizerService(val opts: AclCommandOptions) extends 
AclCommandService with Logging {
 
-      for ((filter, acls) <- filterToAcl) {
-        if (acls.isEmpty) {
-          if (confirmAction(opts, s"Are you sure you want to delete all ACLs 
for resource filter `$filter`? (y/n)"))
-            removeAcls(authorizer, acls, filter)
+    private def withAuthorizer()(f: Authorizer => Unit) {
+      val defaultProps = Map(KafkaConfig.ZkEnableSecureAclsProp -> 
JaasUtils.isZkSecurityEnabled)
+      val authorizerProperties =
+        if (opts.options.has(opts.authorizerPropertiesOpt)) {
+          val authorizerProperties = 
opts.options.valuesOf(opts.authorizerPropertiesOpt).asScala
+          defaultProps ++ 
CommandLineUtils.parseKeyValueArgs(authorizerProperties, acceptMissingValue = 
false).asScala
         } else {
-          if (confirmAction(opts, s"Are you sure you want to remove ACLs: 
$Newline ${acls.map("\t" + _).mkString(Newline)} $Newline from resource filter 
`$filter`? (y/n)"))
-            removeAcls(authorizer, acls, filter)
+          defaultProps
         }
+
+      val authorizerClass = if (opts.options.has(opts.authorizerOpt))
+        opts.options.valueOf(opts.authorizerOpt)
+      else
+        classOf[SimpleAclAuthorizer].getName
+
+      val authZ = CoreUtils.createObject[Authorizer](authorizerClass)
+      try {
+        authZ.configure(authorizerProperties.asJava)
+        f(authZ)
       }
+      finally CoreUtils.swallow(authZ.close(), this)
+    }
+
+    def addAcls(): Unit = {
+      val resourceToAcl = getResourceToAcls(opts)
+      withAuthorizer() { authorizer =>
+        for ((resource, acls) <- resourceToAcl) {
+          println(s"Adding ACLs for resource `$resource`: $Newline 
${acls.map("\t" + _).mkString(Newline)} $Newline")
+          authorizer.addAcls(acls, resource)
+        }
 
-      listAcl(opts)
+        listAcls()
+      }
     }
-  }
 
-  private def removeAcls(authorizer: Authorizer, acls: Set[Acl], filter: 
ResourcePatternFilter) {
-    getAcls(authorizer, filter)
-      .keys
-      .foreach(resource =>
-        if (acls.isEmpty) authorizer.removeAcls(resource)
-        else authorizer.removeAcls(acls, resource)
-      )
-  }
+    def removeAcls(): Unit = {
+      withAuthorizer() { authorizer =>
+        val filterToAcl = getResourceFilterToAcls(opts)
+
+        for ((filter, acls) <- filterToAcl) {
+          if (acls.isEmpty) {
+            if (confirmAction(opts, s"Are you sure you want to delete all ACLs 
for resource filter `$filter`? (y/n)"))
+              removeAcls(authorizer, acls, filter)
+          } else {
+            if (confirmAction(opts, s"Are you sure you want to remove ACLs: 
$Newline ${acls.map("\t" + _).mkString(Newline)} $Newline from resource filter 
`$filter`? (y/n)"))
+              removeAcls(authorizer, acls, filter)
+          }
+        }
 
-  private def listAcl(opts: AclCommandOptions) {
-    withAuthorizer(opts) { authorizer =>
-      val filters = getResourceFilter(opts, dieIfNoResourceFound = false)
+        listAcls()
+      }
+    }
+
+    def listAcls(): Unit = {
+      withAuthorizer() { authorizer =>
+        val filters = getResourceFilter(opts, dieIfNoResourceFound = false)
+
+        val resourceToAcls: Iterable[(Resource, Set[Acl])] =
+          if (filters.isEmpty) authorizer.getAcls()
+          else filters.flatMap(filter => getAcls(authorizer, filter))
 
-      val resourceToAcls: Iterable[(Resource, Set[Acl])] =
-        if (filters.isEmpty) authorizer.getAcls()
-        else filters.flatMap(filter => getAcls(authorizer, filter))
+        for ((resource, acls) <- resourceToAcls)
+          println(s"Current ACLs for resource `$resource`: $Newline 
${acls.map("\t" + _).mkString(Newline)} $Newline")
+      }
+    }
 
-      for ((resource, acls) <- resourceToAcls)
-        println(s"Current ACLs for resource `$resource`: $Newline 
${acls.map("\t" + _).mkString(Newline)} $Newline")
+    private def removeAcls(authorizer: Authorizer, acls: Set[Acl], filter: 
ResourcePatternFilter) {
+      getAcls(authorizer, filter)
+        .keys
+        .foreach(resource =>
+          if (acls.isEmpty) authorizer.removeAcls(resource)
+          else authorizer.removeAcls(acls, resource)
+        )
     }
+
+    private def getAcls(authorizer: Authorizer, filter: 
ResourcePatternFilter): Map[Resource, Set[Acl]] =
+      authorizer.getAcls()
+        .filter { case (resource, acl) => filter.matches(resource.toPattern) }
   }
 
-  private def getAcls(authorizer: Authorizer, filter: ResourcePatternFilter): 
Map[Resource, Set[Acl]] =
-    authorizer.getAcls()
-      .filter { case (resource, acl) => filter.matches(resource.toPattern) }
+  private def getResourceToAcls(opts: AclCommandOptions): Map[Resource, 
Set[Acl]] = {
+    val patternType: PatternType = 
opts.options.valueOf(opts.resourcePatternType)
+    if (!patternType.isSpecific)
+      CommandLineUtils.printUsageAndDie(opts.parser, s"A 
'--resource-pattern-type' value of '$patternType' is not valid when adding 
acls.")
+
+    val resourceToAcl = getResourceFilterToAcls(opts).map {
+      case (filter, acls) =>
+        Resource(ResourceType.fromJava(filter.resourceType()), filter.name(), 
filter.patternType()) -> acls
+    }
+
+    if (resourceToAcl.values.exists(_.isEmpty))
+      CommandLineUtils.printUsageAndDie(opts.parser, "You must specify one of: 
--allow-principal, --deny-principal when trying to add ACLs.")
+
+    resourceToAcl
+  }
 
   private def getResourceFilterToAcls(opts: AclCommandOptions): 
Map[ResourcePatternFilter, Set[Acl]] = {
     var resourceToAcls = Map.empty[ResourcePatternFilter, Set[Acl]]
@@ -257,7 +381,7 @@ object AclCommand extends Logging {
 
   private def getPrincipals(opts: AclCommandOptions, principalOptionSpec: 
ArgumentAcceptingOptionSpec[String]): Set[KafkaPrincipal] = {
     if (opts.options.has(principalOptionSpec))
-      opts.options.valuesOf(principalOptionSpec).asScala.map(s => 
KafkaPrincipal.fromString(s.trim)).toSet
+      opts.options.valuesOf(principalOptionSpec).asScala.map(s => 
SecurityUtils.parseKafkaPrincipal(s.trim)).toSet
     else
       Set.empty[KafkaPrincipal]
   }
@@ -305,11 +429,23 @@ object AclCommand extends Logging {
 
   class AclCommandOptions(args: Array[String]) {
     val parser = new OptionParser(false)
+    val CommandConfigDoc = "A property file containing configs to be passed to 
Admin Client."
+
+    val bootstrapServerOpt = parser.accepts("bootstrap-server", "A list of 
host/port pairs to use for establishing the connection to the Kafka cluster." +
+      " This list should be in the form host1:port1,host2:port2,... This 
config is required for acl management using admin client API.")
+      .withRequiredArg
+      .describedAs("server to connect to")
+      .ofType(classOf[String])
+
+    val commandConfigOpt = parser.accepts("command-config", CommandConfigDoc)
+      .withOptionalArg()
+      .describedAs("command-config")
+      .ofType(classOf[String])
+
     val authorizerOpt = parser.accepts("authorizer", "Fully qualified class 
name of the authorizer, defaults to kafka.security.auth.SimpleAclAuthorizer.")
       .withRequiredArg
       .describedAs("authorizer")
       .ofType(classOf[String])
-      .defaultsTo(classOf[SimpleAclAuthorizer].getName)
 
     val authorizerPropertiesOpt = parser.accepts("authorizer-properties", 
"REQUIRED: properties required to configure an instance of Authorizer. " +
       "These are key=val pairs. For the default authorizer the example values 
are: zookeeper.connect=localhost:2181")
@@ -410,7 +546,17 @@ object AclCommand extends Logging {
     val options = parser.parse(args: _*)
 
     def checkArgs() {
-      CommandLineUtils.checkRequiredArgs(parser, options, 
authorizerPropertiesOpt)
+      if (options.has(bootstrapServerOpt) && options.has(authorizerOpt))
+        CommandLineUtils.printUsageAndDie(parser, "Only one of 
--bootstrap-server or --authorizer must be specified")
+
+      if (!options.has(bootstrapServerOpt))
+        CommandLineUtils.checkRequiredArgs(parser, options, 
authorizerPropertiesOpt)
+
+      if (options.has(commandConfigOpt) && !options.has(bootstrapServerOpt))
+        CommandLineUtils.printUsageAndDie(parser, "The --command-config option 
can only be used with --bootstrap-server option")
+
+      if (options.has(authorizerPropertiesOpt) && 
options.has(bootstrapServerOpt))
+        CommandLineUtils.printUsageAndDie(parser, "The --authorizer-properties 
option can only be used with --authorizer option")
 
       val actions = Seq(addOpt, removeOpt, listOpt).count(options.has)
       if (actions != 1)
diff --git a/core/src/main/scala/kafka/security/SecurityUtils.scala 
b/core/src/main/scala/kafka/security/SecurityUtils.scala
index 5d42871..311e195 100644
--- a/core/src/main/scala/kafka/security/SecurityUtils.scala
+++ b/core/src/main/scala/kafka/security/SecurityUtils.scala
@@ -22,8 +22,7 @@ import org.apache.kafka.common.acl.{AccessControlEntry, 
AclBinding, AclBindingFi
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.ApiError
 import org.apache.kafka.common.resource.ResourcePattern
-import org.apache.kafka.common.security.auth.KafkaPrincipal
-
+import org.apache.kafka.common.utils.SecurityUtils._
 import scala.util.{Failure, Success, Try}
 
 
@@ -32,7 +31,7 @@ object SecurityUtils {
   def convertToResourceAndAcl(filter: AclBindingFilter): Either[ApiError, 
(Resource, Acl)] = {
     (for {
       resourceType <- 
Try(ResourceType.fromJava(filter.patternFilter.resourceType))
-      principal <- Try(KafkaPrincipal.fromString(filter.entryFilter.principal))
+      principal <- Try(parseKafkaPrincipal(filter.entryFilter.principal))
       operation <- Try(Operation.fromJava(filter.entryFilter.operation))
       permissionType <- 
Try(PermissionType.fromJava(filter.entryFilter.permissionType))
       resource = Resource(resourceType, filter.patternFilter.name, 
filter.patternFilter.patternType)
diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala 
b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
index 05f6189..d5535a5 100644
--- a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
@@ -20,20 +20,24 @@ import java.util.Properties
 
 import kafka.admin.AclCommand.AclCommandOptions
 import kafka.security.auth._
-import kafka.server.KafkaConfig
+import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.utils.{Exit, Logging, TestUtils}
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.resource.PatternType
+import org.apache.kafka.common.network.ListenerName
+
 import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
-import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.junit.{Before, Test}
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.common.utils.SecurityUtils
+import org.junit.{After, Before, Test}
 
 class AclCommandTest extends ZooKeeperTestHarness with Logging {
 
-  private val principal: KafkaPrincipal = 
KafkaPrincipal.fromString("User:test2")
-  private val Users = 
Set(KafkaPrincipal.fromString("User:CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown"),
-    principal,
-    KafkaPrincipal.fromString("""User:CN=\#User with special chars in CN : (\, 
\+ \" \\ \< \> \; ')"""))
+  var servers: Seq[KafkaServer] = Seq()
+
+  private val principal: KafkaPrincipal = 
SecurityUtils.parseKafkaPrincipal("User:test2")
+  private val Users = 
Set(SecurityUtils.parseKafkaPrincipal("User:CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown"),
+    principal, SecurityUtils.parseKafkaPrincipal("""User:CN=\#User with 
special chars in CN : (\, \+ \" \\ \< \> \; ')"""))
   private val Hosts = Set("host1", "host2")
   private val AllowHostCommand = Array("--allow-host", "host1", 
"--allow-host", "host2")
   private val DenyHostCommand = Array("--deny-host", "host1", "--deny-host", 
"host2")
@@ -87,6 +91,7 @@ class AclCommandTest extends ZooKeeperTestHarness with 
Logging {
 
   private var brokerProps: Properties = _
   private var zkArgs: Array[String] = _
+  private var adminArgs: Array[String] = _
 
   @Before
   override def setUp(): Unit = {
@@ -94,33 +99,66 @@ class AclCommandTest extends ZooKeeperTestHarness with 
Logging {
 
     brokerProps = TestUtils.createBrokerConfig(0, zkConnect)
     brokerProps.put(KafkaConfig.AuthorizerClassNameProp, 
"kafka.security.auth.SimpleAclAuthorizer")
+    brokerProps.put(SimpleAclAuthorizer.SuperUsersProp, "User:ANONYMOUS")
 
     zkArgs = Array("--authorizer-properties", "zookeeper.connect=" + zkConnect)
   }
 
+  @After
+  override def tearDown() {
+    TestUtils.shutdownServers(servers)
+    super.tearDown()
+  }
+
   @Test
-  def testAclCli() {
+  def testAclCliWithAuthorizer(): Unit = {
+    testAclCli(zkArgs)
+  }
+
+  @Test
+  def testAclCliWithAdminAPI(): Unit = {
+    createServer()
+    testAclCli(adminArgs)
+  }
+
+  private def createServer(): Unit = {
+    servers = Seq(TestUtils.createServer(KafkaConfig.fromProps(brokerProps)))
+    val listenerName = 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
+    adminArgs = Array("--bootstrap-server", 
TestUtils.bootstrapServers(servers, listenerName))
+  }
+
+  private def testAclCli(cmdArgs: Array[String]) {
     for ((resources, resourceCmd) <- ResourceToCommand) {
       for (permissionType <- PermissionType.values) {
         val operationToCmd = ResourceToOperations(resources)
         val (acls, cmd) = getAclToCommand(permissionType, operationToCmd._1)
-          AclCommand.main(zkArgs ++ cmd ++ resourceCmd ++ operationToCmd._2 :+ 
"--add")
+          AclCommand.main(cmdArgs ++ cmd ++ resourceCmd ++ operationToCmd._2 
:+ "--add")
           for (resource <- resources) {
             withAuthorizer() { authorizer =>
               TestUtils.waitAndVerifyAcls(acls, authorizer, resource)
             }
           }
 
-          testRemove(resources, resourceCmd, brokerProps)
+          testRemove(cmdArgs, resources, resourceCmd)
       }
     }
   }
 
   @Test
-  def testProducerConsumerCli() {
+  def testProducerConsumerCliWithAuthorizer(): Unit = {
+    testProducerConsumerCli(zkArgs)
+  }
+
+  @Test
+  def testProducerConsumerCliWithAdminAPI(): Unit = {
+    createServer()
+    testProducerConsumerCli(adminArgs)
+  }
+
+  private def testProducerConsumerCli(cmdArgs: Array[String]) {
     for ((cmd, resourcesToAcls) <- CmdToResourcesToAcl) {
       val resourceCommand: Array[String] = 
resourcesToAcls.keys.map(ResourceToCommand).foldLeft(Array[String]())(_ ++ _)
-      AclCommand.main(zkArgs ++ getCmd(Allow) ++ resourceCommand ++ cmd :+ 
"--add")
+      AclCommand.main(cmdArgs ++ getCmd(Allow) ++ resourceCommand ++ cmd :+ 
"--add")
       for ((resources, acls) <- resourcesToAcls) {
         for (resource <- resources) {
           withAuthorizer() { authorizer =>
@@ -128,15 +166,25 @@ class AclCommandTest extends ZooKeeperTestHarness with 
Logging {
           }
         }
       }
-      testRemove(resourcesToAcls.keys.flatten.toSet, resourceCommand ++ cmd, 
brokerProps)
+      testRemove(cmdArgs, resourcesToAcls.keys.flatten.toSet, resourceCommand 
++ cmd)
     }
   }
 
   @Test
-  def testAclsOnPrefixedResources(): Unit = {
+  def testAclsOnPrefixedResourcesWithAuthorizer(): Unit = {
+    testAclsOnPrefixedResources(zkArgs)
+  }
+
+  @Test
+  def testAclsOnPrefixedResourcesWithAdminAPI(): Unit = {
+    createServer()
+    testAclsOnPrefixedResources(adminArgs)
+  }
+
+  private def testAclsOnPrefixedResources(cmdArgs: Array[String]): Unit = {
     val cmd = Array("--allow-principal", principal.toString, "--producer", 
"--topic", "Test-", "--resource-pattern-type", "Prefixed")
 
-    AclCommand.main(zkArgs ++ cmd :+ "--add")
+    AclCommand.main(cmdArgs ++ cmd :+ "--add")
 
     withAuthorizer() { authorizer =>
       val writeAcl = Acl(principal, Allow, Acl.WildCardHost, Write)
@@ -145,7 +193,7 @@ class AclCommandTest extends ZooKeeperTestHarness with 
Logging {
       TestUtils.waitAndVerifyAcls(Set(writeAcl, describeAcl, createAcl), 
authorizer, Resource(Topic, "Test-", PREFIXED))
     }
 
-    AclCommand.main(zkArgs ++ cmd :+ "--remove" :+ "--force")
+    AclCommand.main(cmdArgs ++ cmd :+ "--remove" :+ "--force")
 
     withAuthorizer() { authorizer =>
       TestUtils.waitAndVerifyAcls(Set.empty[Acl], authorizer, 
Resource(Cluster, "kafka-cluster", LITERAL))
@@ -156,7 +204,8 @@ class AclCommandTest extends ZooKeeperTestHarness with 
Logging {
   @Test(expected = classOf[IllegalArgumentException])
   def testInvalidAuthorizerProperty() {
     val args = Array("--authorizer-properties", "zookeeper.connect " + 
zkConnect)
-    AclCommand.withAuthorizer(new AclCommandOptions(args))(null)
+    val aclCommandService = new AclCommand.AuthorizerService(new 
AclCommandOptions(args))
+    aclCommandService.listAcls()
   }
 
   @Test
@@ -188,9 +237,9 @@ class AclCommandTest extends ZooKeeperTestHarness with 
Logging {
     }
   }
 
-  private def testRemove(resources: Set[Resource], resourceCmd: Array[String], 
brokerProps: Properties) {
+  private def testRemove(cmdArgs: Array[String], resources: Set[Resource], 
resourceCmd: Array[String]) {
     for (resource <- resources) {
-      AclCommand.main(zkArgs ++ resourceCmd :+ "--remove" :+ "--force")
+      AclCommand.main(cmdArgs ++ resourceCmd :+ "--remove" :+ "--force")
       withAuthorizer() { authorizer =>
         TestUtils.waitAndVerifyAcls(Set.empty[Acl], authorizer, resource)
       }
@@ -208,7 +257,7 @@ class AclCommandTest extends ZooKeeperTestHarness with 
Logging {
     Users.foldLeft(cmd) ((cmd, user) => cmd ++ Array(principalCmd, 
user.toString))
   }
 
-  def withAuthorizer()(f: Authorizer => Unit) {
+  private def withAuthorizer()(f: Authorizer => Unit) {
     val kafkaConfig = KafkaConfig.fromProps(brokerProps, doLog = false)
     val authZ = new SimpleAclAuthorizer
     try {
diff --git a/docs/security.html b/docs/security.html
index d7859e0..e856a7e 100644
--- a/docs/security.html
+++ b/docs/security.html
@@ -1076,6 +1076,18 @@
             <td>Configuration</td>
         </tr>
         <tr>
+            <td>--bootstrap-server</td>
+            <td>A list of host/port pairs to use for establishing the 
connection to the Kafka cluster. Only one of --bootstrap-server or --authorizer 
option must be specified.</td>
+            <td></td>
+            <td>Configuration</td>
+        </tr>
+        <tr>
+            <td>--command-config</td>
+            <td>A property file containing configs to be passed to Admin 
Client. This option can only be used with --bootstrap-server option.</td>
+            <td></td>
+            <td>Configuration</td>
+        </tr>
+        <tr>
             <td>--cluster</td>
             <td>Indicates to the script that the user is trying to interact 
with acls on the singular cluster resource.</td>
             <td></td>
@@ -1199,7 +1211,17 @@
             <pre class="brush: bash;"> bin/kafka-acls.sh 
--authorizer-properties zookeeper.connect=localhost:2181 --add 
--allow-principal User:Bob --consumer --topic Test-topic --group Group-1 </pre>
                 Note that for consumer option we must also specify the 
consumer group.
                 In order to remove a principal from producer or consumer role 
we just need to pass --remove option. </li>
-        </ul>
+
+        <li><b>AdminClient API based acl management</b><br>
+            Users having Alter permission on ClusterResource can use 
AdminClient API for ACL management. kafka-acls.sh script supports AdminClient 
API to manage ACLs without interacting with zookeeper/authorizer directly.
+            All the above examples can be executed by using 
<b>--bootstrap-server</b> option. For example:
+
+            <pre class="brush: bash;">
+            bin/kafka-acls.sh --bootstrap-server localhost:9092 
--command-config /tmp/adminclient-configs.conf --add --allow-principal User:Bob 
--producer --topic Test-topic
+            bin/kafka-acls.sh --bootstrap-server localhost:9092 
--command-config /tmp/adminclient-configs.conf --add --allow-principal User:Bob 
--consumer --topic Test-topic --group Group-1
+            bin/kafka-acls.sh --bootstrap-server localhost:9092 
--command-config /tmp/adminclient-configs.conf --list --topic 
Test-topic</pre></li>
+
+    </ul>
 
     <h3><a id="security_rolling_upgrade" href="#security_rolling_upgrade">7.5 
Incorporating Security Features in a Running Cluster</a></h3>
         You can secure a running cluster via one or more of the supported 
protocols discussed previously. This is done in phases:

Reply via email to