This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f348f10 KAFKA-7117: Support AdminClient API in AclCommand (KIP-332)
(#5463)
f348f10 is described below
commit f348f10ef87925081fdf9455ace6d2a86179b483
Author: Manikumar Reddy O <[email protected]>
AuthorDate: Sat Sep 8 06:10:59 2018 +0530
KAFKA-7117: Support AdminClient API in AclCommand (KIP-332) (#5463)
Reviewers: Colin Patrick McCabe <[email protected]>, Jun Rao
<[email protected]>
---
core/src/main/scala/kafka/admin/AclCommand.scala | 280 ++++++++++++++++-----
.../main/scala/kafka/security/SecurityUtils.scala | 5 +-
.../scala/unit/kafka/admin/AclCommandTest.scala | 89 +++++--
docs/security.html | 24 +-
4 files changed, 307 insertions(+), 91 deletions(-)
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala
b/core/src/main/scala/kafka/admin/AclCommand.scala
index 31e6c53..c2dda33 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -17,17 +17,22 @@
package kafka.admin
+import java.util.Properties
+
import joptsimple._
import joptsimple.util.EnumConverter
import kafka.security.auth._
import kafka.server.KafkaConfig
import kafka.utils._
+import org.apache.kafka.clients.admin.{AdminClientConfig, AdminClient =>
JAdminClient}
+import org.apache.kafka.common.acl._
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.common.resource.{PatternType, ResourcePatternFilter,
Resource => JResource, ResourceType => JResourceType}
+import org.apache.kafka.common.utils.{SecurityUtils, Utils}
+import org.apache.kafka.common.resource.{PatternType, ResourcePattern,
ResourcePatternFilter, Resource => JResource, ResourceType => JResourceType}
import scala.collection.JavaConverters._
+import scala.collection.mutable
object AclCommand extends Logging {
@@ -52,13 +57,21 @@ object AclCommand extends Logging {
opts.checkArgs()
+ val aclCommandService = {
+ if (opts.options.has(opts.bootstrapServerOpt)) {
+ new AdminClientService(opts)
+ } else {
+ new AuthorizerService(opts)
+ }
+ }
+
try {
if (opts.options.has(opts.addOpt))
- addAcl(opts)
+ aclCommandService.addAcls()
else if (opts.options.has(opts.removeOpt))
- removeAcl(opts)
+ aclCommandService.removeAcls()
else if (opts.options.has(opts.listOpt))
- listAcl(opts)
+ aclCommandService.listAcls()
} catch {
case e: Throwable =>
println(s"Error while executing ACL command: ${e.getMessage}")
@@ -67,91 +80,202 @@ object AclCommand extends Logging {
}
}
- def withAuthorizer(opts: AclCommandOptions)(f: Authorizer => Unit) {
- val defaultProps = Map(KafkaConfig.ZkEnableSecureAclsProp ->
JaasUtils.isZkSecurityEnabled)
- val authorizerProperties =
- if (opts.options.has(opts.authorizerPropertiesOpt)) {
- val authorizerProperties =
opts.options.valuesOf(opts.authorizerPropertiesOpt).asScala
- defaultProps ++
CommandLineUtils.parseKeyValueArgs(authorizerProperties, acceptMissingValue =
false).asScala
- } else {
- defaultProps
+ sealed trait AclCommandService {
+ def addAcls(): Unit
+ def removeAcls(): Unit
+ def listAcls(): Unit
+ }
+
+ class AdminClientService(val opts: AclCommandOptions) extends
AclCommandService with Logging {
+
+ private def withAdminClient(opts: AclCommandOptions)(f: JAdminClient =>
Unit) {
+ val props = if (opts.options.has(opts.commandConfigOpt))
+ Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
+ else
+ new Properties()
+ props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
opts.options.valueOf(opts.bootstrapServerOpt))
+ val adminClient = JAdminClient.create(props)
+
+ try {
+ f(adminClient)
+ } finally {
+ adminClient.close()
}
+ }
- val authorizerClass = opts.options.valueOf(opts.authorizerOpt)
- val authZ = CoreUtils.createObject[Authorizer](authorizerClass)
- try {
- authZ.configure(authorizerProperties.asJava)
- f(authZ)
+ def addAcls(): Unit = {
+ val resourceToAcl = getResourceToAcls(opts)
+ withAdminClient(opts) { adminClient =>
+ for ((resource, acls) <- resourceToAcl) {
+ val resourcePattern = resource.toPattern
+ println(s"Adding ACLs for resource `$resourcePattern`: $Newline
${acls.map("\t" + _).mkString(Newline)} $Newline")
+ val aclBindings = acls.map(acl => new AclBinding(resourcePattern,
getAccessControlEntry(acl))).asJavaCollection
+ adminClient.createAcls(aclBindings).all().get()
+ }
+
+ listAcls()
+ }
}
- finally CoreUtils.swallow(authZ.close(), this)
- }
- private def addAcl(opts: AclCommandOptions) {
- val patternType: PatternType =
opts.options.valueOf(opts.resourcePatternType)
- if (!patternType.isSpecific)
- CommandLineUtils.printUsageAndDie(opts.parser, s"A
'--resource-pattern-type' value of '$patternType' is not valid when adding
acls.")
+ def removeAcls(): Unit = {
+ withAdminClient(opts) { adminClient =>
+ val filterToAcl = getResourceFilterToAcls(opts)
+
+ for ((filter, acls) <- filterToAcl) {
+ if (acls.isEmpty) {
+ if (confirmAction(opts, s"Are you sure you want to delete all ACLs
for resource filter `$filter`? (y/n)"))
+ removeAcls(adminClient, acls, filter)
+ } else {
+ if (confirmAction(opts, s"Are you sure you want to remove ACLs:
$Newline ${acls.map("\t" + _).mkString(Newline)} $Newline from resource filter
`$filter`? (y/n)"))
+ removeAcls(adminClient, acls, filter)
+ }
+ }
+
+ listAcls()
+ }
+ }
- withAuthorizer(opts) { authorizer =>
- val resourceToAcl = getResourceFilterToAcls(opts).map {
- case (filter, acls) =>
- Resource(ResourceType.fromJava(filter.resourceType()),
filter.name(), filter.patternType()) -> acls
+ def listAcls(): Unit = {
+ withAdminClient(opts) { adminClient =>
+ val filters = getResourceFilter(opts, dieIfNoResourceFound = false)
+ val resourceToAcls = getAcls(adminClient, filters)
+
+ for ((resource, acls) <- resourceToAcls)
+ println(s"Current ACLs for resource `$resource`: $Newline
${acls.map("\t" + _).mkString(Newline)} $Newline")
}
+ }
- if (resourceToAcl.values.exists(_.isEmpty))
- CommandLineUtils.printUsageAndDie(opts.parser, "You must specify one
of: --allow-principal, --deny-principal when trying to add ACLs.")
+ private def getAccessControlEntry(acl: Acl): AccessControlEntry = {
+ new AccessControlEntry(acl.principal.toString, acl.host,
acl.operation.toJava, acl.permissionType.toJava)
+ }
- for ((resource, acls) <- resourceToAcl) {
- println(s"Adding ACLs for resource `$resource`: $Newline
${acls.map("\t" + _).mkString(Newline)} $Newline")
- authorizer.addAcls(acls, resource)
+ private def removeAcls(adminClient: JAdminClient, acls: Set[Acl], filter:
ResourcePatternFilter): Unit = {
+ if (acls.isEmpty)
+ adminClient.deleteAcls(List(new AclBindingFilter(filter,
AccessControlEntryFilter.ANY)).asJava).all().get()
+ else {
+ val aclBindingFilters = acls.map(acl => new AclBindingFilter(filter,
getAccessControlEntryFilter(acl))).toList.asJava
+ adminClient.deleteAcls(aclBindingFilters).all().get()
}
+ }
+
+ private def getAccessControlEntryFilter(acl: Acl):
AccessControlEntryFilter = {
+ new AccessControlEntryFilter(acl.principal.toString, acl.host,
acl.operation.toJava, acl.permissionType.toJava)
+ }
- listAcl(opts)
+ private def getAcls(adminClient: JAdminClient, filters:
Set[ResourcePatternFilter]): Map[ResourcePattern, Set[AccessControlEntry]] = {
+ val aclBindings =
+ if (filters.isEmpty)
adminClient.describeAcls(AclBindingFilter.ANY).values().get().asScala.toList
+ else {
+ val results = for (filter <- filters) yield {
+ adminClient.describeAcls(new AclBindingFilter(filter,
AccessControlEntryFilter.ANY)).values().get().asScala.toList
+ }
+ results.reduceLeft(_ ++ _)
+ }
+
+ val resourceToAcls = mutable.Map[ResourcePattern,
Set[AccessControlEntry]]().withDefaultValue(Set())
+
+ aclBindings.foreach(aclBinding => resourceToAcls(aclBinding.pattern()) =
resourceToAcls(aclBinding.pattern()) + aclBinding.entry())
+ resourceToAcls.toMap
}
}
- private def removeAcl(opts: AclCommandOptions) {
- withAuthorizer(opts) { authorizer =>
- val filterToAcl = getResourceFilterToAcls(opts)
+ class AuthorizerService(val opts: AclCommandOptions) extends
AclCommandService with Logging {
- for ((filter, acls) <- filterToAcl) {
- if (acls.isEmpty) {
- if (confirmAction(opts, s"Are you sure you want to delete all ACLs
for resource filter `$filter`? (y/n)"))
- removeAcls(authorizer, acls, filter)
+ private def withAuthorizer()(f: Authorizer => Unit) {
+ val defaultProps = Map(KafkaConfig.ZkEnableSecureAclsProp ->
JaasUtils.isZkSecurityEnabled)
+ val authorizerProperties =
+ if (opts.options.has(opts.authorizerPropertiesOpt)) {
+ val authorizerProperties =
opts.options.valuesOf(opts.authorizerPropertiesOpt).asScala
+ defaultProps ++
CommandLineUtils.parseKeyValueArgs(authorizerProperties, acceptMissingValue =
false).asScala
} else {
- if (confirmAction(opts, s"Are you sure you want to remove ACLs:
$Newline ${acls.map("\t" + _).mkString(Newline)} $Newline from resource filter
`$filter`? (y/n)"))
- removeAcls(authorizer, acls, filter)
+ defaultProps
}
+
+ val authorizerClass = if (opts.options.has(opts.authorizerOpt))
+ opts.options.valueOf(opts.authorizerOpt)
+ else
+ classOf[SimpleAclAuthorizer].getName
+
+ val authZ = CoreUtils.createObject[Authorizer](authorizerClass)
+ try {
+ authZ.configure(authorizerProperties.asJava)
+ f(authZ)
}
+ finally CoreUtils.swallow(authZ.close(), this)
+ }
+
+ def addAcls(): Unit = {
+ val resourceToAcl = getResourceToAcls(opts)
+ withAuthorizer() { authorizer =>
+ for ((resource, acls) <- resourceToAcl) {
+ println(s"Adding ACLs for resource `$resource`: $Newline
${acls.map("\t" + _).mkString(Newline)} $Newline")
+ authorizer.addAcls(acls, resource)
+ }
- listAcl(opts)
+ listAcls()
+ }
}
- }
- private def removeAcls(authorizer: Authorizer, acls: Set[Acl], filter:
ResourcePatternFilter) {
- getAcls(authorizer, filter)
- .keys
- .foreach(resource =>
- if (acls.isEmpty) authorizer.removeAcls(resource)
- else authorizer.removeAcls(acls, resource)
- )
- }
+ def removeAcls(): Unit = {
+ withAuthorizer() { authorizer =>
+ val filterToAcl = getResourceFilterToAcls(opts)
+
+ for ((filter, acls) <- filterToAcl) {
+ if (acls.isEmpty) {
+ if (confirmAction(opts, s"Are you sure you want to delete all ACLs
for resource filter `$filter`? (y/n)"))
+ removeAcls(authorizer, acls, filter)
+ } else {
+ if (confirmAction(opts, s"Are you sure you want to remove ACLs:
$Newline ${acls.map("\t" + _).mkString(Newline)} $Newline from resource filter
`$filter`? (y/n)"))
+ removeAcls(authorizer, acls, filter)
+ }
+ }
- private def listAcl(opts: AclCommandOptions) {
- withAuthorizer(opts) { authorizer =>
- val filters = getResourceFilter(opts, dieIfNoResourceFound = false)
+ listAcls()
+ }
+ }
+
+ def listAcls(): Unit = {
+ withAuthorizer() { authorizer =>
+ val filters = getResourceFilter(opts, dieIfNoResourceFound = false)
+
+ val resourceToAcls: Iterable[(Resource, Set[Acl])] =
+ if (filters.isEmpty) authorizer.getAcls()
+ else filters.flatMap(filter => getAcls(authorizer, filter))
- val resourceToAcls: Iterable[(Resource, Set[Acl])] =
- if (filters.isEmpty) authorizer.getAcls()
- else filters.flatMap(filter => getAcls(authorizer, filter))
+ for ((resource, acls) <- resourceToAcls)
+ println(s"Current ACLs for resource `$resource`: $Newline
${acls.map("\t" + _).mkString(Newline)} $Newline")
+ }
+ }
- for ((resource, acls) <- resourceToAcls)
- println(s"Current ACLs for resource `$resource`: $Newline
${acls.map("\t" + _).mkString(Newline)} $Newline")
+ private def removeAcls(authorizer: Authorizer, acls: Set[Acl], filter:
ResourcePatternFilter) {
+ getAcls(authorizer, filter)
+ .keys
+ .foreach(resource =>
+ if (acls.isEmpty) authorizer.removeAcls(resource)
+ else authorizer.removeAcls(acls, resource)
+ )
}
+
+ private def getAcls(authorizer: Authorizer, filter:
ResourcePatternFilter): Map[Resource, Set[Acl]] =
+ authorizer.getAcls()
+ .filter { case (resource, acl) => filter.matches(resource.toPattern) }
}
- private def getAcls(authorizer: Authorizer, filter: ResourcePatternFilter):
Map[Resource, Set[Acl]] =
- authorizer.getAcls()
- .filter { case (resource, acl) => filter.matches(resource.toPattern) }
+ private def getResourceToAcls(opts: AclCommandOptions): Map[Resource,
Set[Acl]] = {
+ val patternType: PatternType =
opts.options.valueOf(opts.resourcePatternType)
+ if (!patternType.isSpecific)
+ CommandLineUtils.printUsageAndDie(opts.parser, s"A
'--resource-pattern-type' value of '$patternType' is not valid when adding
acls.")
+
+ val resourceToAcl = getResourceFilterToAcls(opts).map {
+ case (filter, acls) =>
+ Resource(ResourceType.fromJava(filter.resourceType()), filter.name(),
filter.patternType()) -> acls
+ }
+
+ if (resourceToAcl.values.exists(_.isEmpty))
+ CommandLineUtils.printUsageAndDie(opts.parser, "You must specify one of:
--allow-principal, --deny-principal when trying to add ACLs.")
+
+ resourceToAcl
+ }
private def getResourceFilterToAcls(opts: AclCommandOptions):
Map[ResourcePatternFilter, Set[Acl]] = {
var resourceToAcls = Map.empty[ResourcePatternFilter, Set[Acl]]
@@ -257,7 +381,7 @@ object AclCommand extends Logging {
private def getPrincipals(opts: AclCommandOptions, principalOptionSpec:
ArgumentAcceptingOptionSpec[String]): Set[KafkaPrincipal] = {
if (opts.options.has(principalOptionSpec))
- opts.options.valuesOf(principalOptionSpec).asScala.map(s =>
KafkaPrincipal.fromString(s.trim)).toSet
+ opts.options.valuesOf(principalOptionSpec).asScala.map(s =>
SecurityUtils.parseKafkaPrincipal(s.trim)).toSet
else
Set.empty[KafkaPrincipal]
}
@@ -305,11 +429,23 @@ object AclCommand extends Logging {
class AclCommandOptions(args: Array[String]) {
val parser = new OptionParser(false)
+ val CommandConfigDoc = "A property file containing configs to be passed to
Admin Client."
+
+ val bootstrapServerOpt = parser.accepts("bootstrap-server", "A list of
host/port pairs to use for establishing the connection to the Kafka cluster." +
+ " This list should be in the form host1:port1,host2:port2,... This
config is required for acl management using admin client API.")
+ .withRequiredArg
+ .describedAs("server to connect to")
+ .ofType(classOf[String])
+
+ val commandConfigOpt = parser.accepts("command-config", CommandConfigDoc)
+ .withOptionalArg()
+ .describedAs("command-config")
+ .ofType(classOf[String])
+
val authorizerOpt = parser.accepts("authorizer", "Fully qualified class
name of the authorizer, defaults to kafka.security.auth.SimpleAclAuthorizer.")
.withRequiredArg
.describedAs("authorizer")
.ofType(classOf[String])
- .defaultsTo(classOf[SimpleAclAuthorizer].getName)
val authorizerPropertiesOpt = parser.accepts("authorizer-properties",
"REQUIRED: properties required to configure an instance of Authorizer. " +
"These are key=val pairs. For the default authorizer the example values
are: zookeeper.connect=localhost:2181")
@@ -410,7 +546,17 @@ object AclCommand extends Logging {
val options = parser.parse(args: _*)
def checkArgs() {
- CommandLineUtils.checkRequiredArgs(parser, options,
authorizerPropertiesOpt)
+ if (options.has(bootstrapServerOpt) && options.has(authorizerOpt))
+ CommandLineUtils.printUsageAndDie(parser, "Only one of
--bootstrap-server or --authorizer must be specified")
+
+ if (!options.has(bootstrapServerOpt))
+ CommandLineUtils.checkRequiredArgs(parser, options,
authorizerPropertiesOpt)
+
+ if (options.has(commandConfigOpt) && !options.has(bootstrapServerOpt))
+ CommandLineUtils.printUsageAndDie(parser, "The --command-config option
can only be used with --bootstrap-server option")
+
+ if (options.has(authorizerPropertiesOpt) &&
options.has(bootstrapServerOpt))
+ CommandLineUtils.printUsageAndDie(parser, "The --authorizer-properties
option can only be used with --authorizer option")
val actions = Seq(addOpt, removeOpt, listOpt).count(options.has)
if (actions != 1)
diff --git a/core/src/main/scala/kafka/security/SecurityUtils.scala
b/core/src/main/scala/kafka/security/SecurityUtils.scala
index 5d42871..311e195 100644
--- a/core/src/main/scala/kafka/security/SecurityUtils.scala
+++ b/core/src/main/scala/kafka/security/SecurityUtils.scala
@@ -22,8 +22,7 @@ import org.apache.kafka.common.acl.{AccessControlEntry,
AclBinding, AclBindingFi
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.ApiError
import org.apache.kafka.common.resource.ResourcePattern
-import org.apache.kafka.common.security.auth.KafkaPrincipal
-
+import org.apache.kafka.common.utils.SecurityUtils._
import scala.util.{Failure, Success, Try}
@@ -32,7 +31,7 @@ object SecurityUtils {
def convertToResourceAndAcl(filter: AclBindingFilter): Either[ApiError,
(Resource, Acl)] = {
(for {
resourceType <-
Try(ResourceType.fromJava(filter.patternFilter.resourceType))
- principal <- Try(KafkaPrincipal.fromString(filter.entryFilter.principal))
+ principal <- Try(parseKafkaPrincipal(filter.entryFilter.principal))
operation <- Try(Operation.fromJava(filter.entryFilter.operation))
permissionType <-
Try(PermissionType.fromJava(filter.entryFilter.permissionType))
resource = Resource(resourceType, filter.patternFilter.name,
filter.patternFilter.patternType)
diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
index 05f6189..d5535a5 100644
--- a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
@@ -20,20 +20,24 @@ import java.util.Properties
import kafka.admin.AclCommand.AclCommandOptions
import kafka.security.auth._
-import kafka.server.KafkaConfig
+import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.{Exit, Logging, TestUtils}
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.resource.PatternType
+import org.apache.kafka.common.network.ListenerName
+
import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
-import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.junit.{Before, Test}
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.common.utils.SecurityUtils
+import org.junit.{After, Before, Test}
class AclCommandTest extends ZooKeeperTestHarness with Logging {
- private val principal: KafkaPrincipal =
KafkaPrincipal.fromString("User:test2")
- private val Users =
Set(KafkaPrincipal.fromString("User:CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown"),
- principal,
- KafkaPrincipal.fromString("""User:CN=\#User with special chars in CN : (\,
\+ \" \\ \< \> \; ')"""))
+ var servers: Seq[KafkaServer] = Seq()
+
+ private val principal: KafkaPrincipal =
SecurityUtils.parseKafkaPrincipal("User:test2")
+ private val Users =
Set(SecurityUtils.parseKafkaPrincipal("User:CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown"),
+ principal, SecurityUtils.parseKafkaPrincipal("""User:CN=\#User with
special chars in CN : (\, \+ \" \\ \< \> \; ')"""))
private val Hosts = Set("host1", "host2")
private val AllowHostCommand = Array("--allow-host", "host1",
"--allow-host", "host2")
private val DenyHostCommand = Array("--deny-host", "host1", "--deny-host",
"host2")
@@ -87,6 +91,7 @@ class AclCommandTest extends ZooKeeperTestHarness with
Logging {
private var brokerProps: Properties = _
private var zkArgs: Array[String] = _
+ private var adminArgs: Array[String] = _
@Before
override def setUp(): Unit = {
@@ -94,33 +99,66 @@ class AclCommandTest extends ZooKeeperTestHarness with
Logging {
brokerProps = TestUtils.createBrokerConfig(0, zkConnect)
brokerProps.put(KafkaConfig.AuthorizerClassNameProp,
"kafka.security.auth.SimpleAclAuthorizer")
+ brokerProps.put(SimpleAclAuthorizer.SuperUsersProp, "User:ANONYMOUS")
zkArgs = Array("--authorizer-properties", "zookeeper.connect=" + zkConnect)
}
+ @After
+ override def tearDown() {
+ TestUtils.shutdownServers(servers)
+ super.tearDown()
+ }
+
@Test
- def testAclCli() {
+ def testAclCliWithAuthorizer(): Unit = {
+ testAclCli(zkArgs)
+ }
+
+ @Test
+ def testAclCliWithAdminAPI(): Unit = {
+ createServer()
+ testAclCli(adminArgs)
+ }
+
+ private def createServer(): Unit = {
+ servers = Seq(TestUtils.createServer(KafkaConfig.fromProps(brokerProps)))
+ val listenerName =
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
+ adminArgs = Array("--bootstrap-server",
TestUtils.bootstrapServers(servers, listenerName))
+ }
+
+ private def testAclCli(cmdArgs: Array[String]) {
for ((resources, resourceCmd) <- ResourceToCommand) {
for (permissionType <- PermissionType.values) {
val operationToCmd = ResourceToOperations(resources)
val (acls, cmd) = getAclToCommand(permissionType, operationToCmd._1)
- AclCommand.main(zkArgs ++ cmd ++ resourceCmd ++ operationToCmd._2 :+
"--add")
+ AclCommand.main(cmdArgs ++ cmd ++ resourceCmd ++ operationToCmd._2
:+ "--add")
for (resource <- resources) {
withAuthorizer() { authorizer =>
TestUtils.waitAndVerifyAcls(acls, authorizer, resource)
}
}
- testRemove(resources, resourceCmd, brokerProps)
+ testRemove(cmdArgs, resources, resourceCmd)
}
}
}
@Test
- def testProducerConsumerCli() {
+ def testProducerConsumerCliWithAuthorizer(): Unit = {
+ testProducerConsumerCli(zkArgs)
+ }
+
+ @Test
+ def testProducerConsumerCliWithAdminAPI(): Unit = {
+ createServer()
+ testProducerConsumerCli(adminArgs)
+ }
+
+ private def testProducerConsumerCli(cmdArgs: Array[String]) {
for ((cmd, resourcesToAcls) <- CmdToResourcesToAcl) {
val resourceCommand: Array[String] =
resourcesToAcls.keys.map(ResourceToCommand).foldLeft(Array[String]())(_ ++ _)
- AclCommand.main(zkArgs ++ getCmd(Allow) ++ resourceCommand ++ cmd :+
"--add")
+ AclCommand.main(cmdArgs ++ getCmd(Allow) ++ resourceCommand ++ cmd :+
"--add")
for ((resources, acls) <- resourcesToAcls) {
for (resource <- resources) {
withAuthorizer() { authorizer =>
@@ -128,15 +166,25 @@ class AclCommandTest extends ZooKeeperTestHarness with
Logging {
}
}
}
- testRemove(resourcesToAcls.keys.flatten.toSet, resourceCommand ++ cmd,
brokerProps)
+ testRemove(cmdArgs, resourcesToAcls.keys.flatten.toSet, resourceCommand
++ cmd)
}
}
@Test
- def testAclsOnPrefixedResources(): Unit = {
+ def testAclsOnPrefixedResourcesWithAuthorizer(): Unit = {
+ testAclsOnPrefixedResources(zkArgs)
+ }
+
+ @Test
+ def testAclsOnPrefixedResourcesWithAdminAPI(): Unit = {
+ createServer()
+ testAclsOnPrefixedResources(adminArgs)
+ }
+
+ private def testAclsOnPrefixedResources(cmdArgs: Array[String]): Unit = {
val cmd = Array("--allow-principal", principal.toString, "--producer",
"--topic", "Test-", "--resource-pattern-type", "Prefixed")
- AclCommand.main(zkArgs ++ cmd :+ "--add")
+ AclCommand.main(cmdArgs ++ cmd :+ "--add")
withAuthorizer() { authorizer =>
val writeAcl = Acl(principal, Allow, Acl.WildCardHost, Write)
@@ -145,7 +193,7 @@ class AclCommandTest extends ZooKeeperTestHarness with
Logging {
TestUtils.waitAndVerifyAcls(Set(writeAcl, describeAcl, createAcl),
authorizer, Resource(Topic, "Test-", PREFIXED))
}
- AclCommand.main(zkArgs ++ cmd :+ "--remove" :+ "--force")
+ AclCommand.main(cmdArgs ++ cmd :+ "--remove" :+ "--force")
withAuthorizer() { authorizer =>
TestUtils.waitAndVerifyAcls(Set.empty[Acl], authorizer,
Resource(Cluster, "kafka-cluster", LITERAL))
@@ -156,7 +204,8 @@ class AclCommandTest extends ZooKeeperTestHarness with
Logging {
@Test(expected = classOf[IllegalArgumentException])
def testInvalidAuthorizerProperty() {
val args = Array("--authorizer-properties", "zookeeper.connect " +
zkConnect)
- AclCommand.withAuthorizer(new AclCommandOptions(args))(null)
+ val aclCommandService = new AclCommand.AuthorizerService(new
AclCommandOptions(args))
+ aclCommandService.listAcls()
}
@Test
@@ -188,9 +237,9 @@ class AclCommandTest extends ZooKeeperTestHarness with
Logging {
}
}
- private def testRemove(resources: Set[Resource], resourceCmd: Array[String],
brokerProps: Properties) {
+ private def testRemove(cmdArgs: Array[String], resources: Set[Resource],
resourceCmd: Array[String]) {
for (resource <- resources) {
- AclCommand.main(zkArgs ++ resourceCmd :+ "--remove" :+ "--force")
+ AclCommand.main(cmdArgs ++ resourceCmd :+ "--remove" :+ "--force")
withAuthorizer() { authorizer =>
TestUtils.waitAndVerifyAcls(Set.empty[Acl], authorizer, resource)
}
@@ -208,7 +257,7 @@ class AclCommandTest extends ZooKeeperTestHarness with
Logging {
Users.foldLeft(cmd) ((cmd, user) => cmd ++ Array(principalCmd,
user.toString))
}
- def withAuthorizer()(f: Authorizer => Unit) {
+ private def withAuthorizer()(f: Authorizer => Unit) {
val kafkaConfig = KafkaConfig.fromProps(brokerProps, doLog = false)
val authZ = new SimpleAclAuthorizer
try {
diff --git a/docs/security.html b/docs/security.html
index d7859e0..e856a7e 100644
--- a/docs/security.html
+++ b/docs/security.html
@@ -1076,6 +1076,18 @@
<td>Configuration</td>
</tr>
<tr>
+ <td>--bootstrap-server</td>
+ <td>A list of host/port pairs to use for establishing the
connection to the Kafka cluster. Only one of --bootstrap-server or --authorizer
option must be specified.</td>
+ <td></td>
+ <td>Configuration</td>
+ </tr>
+ <tr>
+ <td>--command-config</td>
+ <td>A property file containing configs to be passed to Admin
Client. This option can only be used with --bootstrap-server option.</td>
+ <td></td>
+ <td>Configuration</td>
+ </tr>
+ <tr>
<td>--cluster</td>
<td>Indicates to the script that the user is trying to interact
with acls on the singular cluster resource.</td>
<td></td>
@@ -1199,7 +1211,17 @@
<pre class="brush: bash;"> bin/kafka-acls.sh
--authorizer-properties zookeeper.connect=localhost:2181 --add
--allow-principal User:Bob --consumer --topic Test-topic --group Group-1 </pre>
Note that for consumer option we must also specify the
consumer group.
In order to remove a principal from producer or consumer role
we just need to pass --remove option. </li>
- </ul>
+
+ <li><b>AdminClient API based acl management</b><br>
+ Users having Alter permission on ClusterResource can use
AdminClient API for ACL management. kafka-acls.sh script supports AdminClient
API to manage ACLs without interacting with zookeeper/authorizer directly.
+ All the above examples can be executed by using
<b>--bootstrap-server</b> option. For example:
+
+ <pre class="brush: bash;">
+ bin/kafka-acls.sh --bootstrap-server localhost:9092
--command-config /tmp/adminclient-configs.conf --add --allow-principal User:Bob
--producer --topic Test-topic
+ bin/kafka-acls.sh --bootstrap-server localhost:9092
--command-config /tmp/adminclient-configs.conf --add --allow-principal User:Bob
--consumer --topic Test-topic --group Group-1
+ bin/kafka-acls.sh --bootstrap-server localhost:9092
--command-config /tmp/adminclient-configs.conf --list --topic
Test-topic</pre></li>
+
+ </ul>
<h3><a id="security_rolling_upgrade" href="#security_rolling_upgrade">7.5
Incorporating Security Features in a Running Cluster</a></h3>
You can secure a running cluster via one or more of the supported
protocols discussed previously. This is done in phases: