Repository: kafka
Updated Branches:
  refs/heads/trunk 6ec88f7f8 -> 0990b6ba6


KAFKA-2211: Adding simpleAclAuthorizer implementation and test cases.

Author: Parth Brahmbhatt <brahmbhatt.pa...@gmail.com>

Reviewers: Ismael Juma <ism...@juma.me.uk>, Jun Rao <jun...@gmail.com>

Closes #195 from Parth-Brahmbhatt/KAFKA-2211


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0990b6ba
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0990b6ba
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0990b6ba

Branch: refs/heads/trunk
Commit: 0990b6ba6a28276f79a1a8bfaf48455c6eddfa1f
Parents: 6ec88f7
Author: Parth Brahmbhatt <brahmbhatt.pa...@gmail.com>
Authored: Mon Sep 21 17:47:18 2015 -0700
Committer: Jun Rao <jun...@gmail.com>
Committed: Mon Sep 21 17:47:18 2015 -0700

----------------------------------------------------------------------
 .../common/requests/LeaderAndIsrRequest.java    |   2 +-
 config/log4j.properties                         |  11 +
 .../ZkNodeChangeNotificationListener.scala      | 129 +++++++++
 .../scala/kafka/network/RequestChannel.scala    |   2 +-
 .../main/scala/kafka/network/SocketServer.scala |   3 +-
 .../scala/kafka/security/auth/Authorizer.scala  |   4 +-
 .../security/auth/SimpleAclAuthorizer.scala     | 284 +++++++++++++++++++
 .../ZkNodeChangeNotificationListenerTest.scala  |  61 ++++
 .../security/auth/SimpleAclAuthorizerTest.scala | 233 +++++++++++++++
 9 files changed, 724 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0990b6ba/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
index 6b16496..002beef 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
-import java.nio.ByteBuffer;;
+import java.nio.ByteBuffer;
 import java.util.*;
 
 public class LeaderAndIsrRequest extends AbstractRequest {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0990b6ba/config/log4j.properties
----------------------------------------------------------------------
diff --git a/config/log4j.properties b/config/log4j.properties
index c51ab8b..bf816e7 100644
--- a/config/log4j.properties
+++ b/config/log4j.properties
@@ -49,6 +49,12 @@ 
log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log
 log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
 log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
 
+log4j.appender.authorizerAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.authorizerAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.authorizerAppender.File=${kafka.logs.dir}/kafka-authorizer.log
+log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
 # Turn on all our debugging info
 #log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender
 #log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender
@@ -74,3 +80,8 @@ log4j.additivity.kafka.log.LogCleaner=false
 
 log4j.logger.state.change.logger=TRACE, stateChangeAppender
 log4j.additivity.state.change.logger=false
+
+#Change this to debug to get the actual audit log for authorizer.
+log4j.logger.kafka.authorizer.logger=WARN, authorizerAppender
+log4j.additivity.kafka.authorizer.logger=false
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/0990b6ba/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala 
b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
new file mode 100644
index 0000000..eb44c31
--- /dev/null
+++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.common
+
+import kafka.utils.{Time, SystemTime, ZkUtils, Logging}
+import org.I0Itec.zkclient.{IZkChildListener, ZkClient}
+import scala.collection.JavaConverters._
+
+/**
+ * Handle the notificationMessage.
+ */
+trait NotificationHandler {
+  def processNotification(notificationMessage: String)
+}
+
+/**
+ * A listener that subscribes to seqNodeRoot for any child changes where all 
children are assumed to be sequence node
+ * with seqNodePrefix. When a child is added under seqNodeRoot this class gets 
notified, it looks at lastExecutedChange
+ * number to avoid duplicate processing and if it finds an unprocessed child, 
it reads its data and calls supplied
+ * notificationHandler's processNotification() method with the child's data as 
argument. As part of processing these changes it also
+ * purges any children with currentTime - createTime > changeExpirationMs.
+ *
+ * The caller/user of this class should ensure that they use 
zkClient.subscribeStateChanges and call processAllNotifications
+ * method of this class from ZkStateChangeListener's handleNewSession() 
method. This is necessary to ensure that if zk session
+ * is terminated and reestablished any missed notification will be processed 
immediately.
+ * @param zkClient
+ * @param seqNodeRoot
+ * @param seqNodePrefix
+ * @param notificationHandler
+ * @param changeExpirationMs
+ * @param time
+ */
+class ZkNodeChangeNotificationListener(private val zkClient: ZkClient,
+                                       private val seqNodeRoot: String,
+                                       private val seqNodePrefix: String,
+                                       private val notificationHandler: 
NotificationHandler,
+                                       private val changeExpirationMs: Long = 
15 * 60 * 1000,
+                                       private val time: Time = SystemTime) 
extends Logging {
+  private var lastExecutedChange = -1L
+
+  /**
+   * create seqNodeRoot and begin watching for any new children nodes.
+   */
+  def init() {
+    ZkUtils.makeSurePersistentPathExists(zkClient, seqNodeRoot)
+    zkClient.subscribeChildChanges(seqNodeRoot, NodeChangeListener)
+    processAllNotifications()
+  }
+
+  /**
+   * Process all changes
+   */
+  def processAllNotifications() {
+    val changes = zkClient.getChildren(seqNodeRoot)
+    processNotifications(changes.asScala.sorted)
+  }
+
+  /**
+   * Process the given list of notifications
+   */
+  private def processNotifications(notifications: Seq[String]) {
+    if (notifications.nonEmpty) {
+      info(s"Processing notification(s) to $seqNodeRoot")
+      val now = time.milliseconds
+      for (notification <- notifications) {
+        val changeId = changeNumber(notification)
+        if (changeId > lastExecutedChange) {
+          val changeZnode = seqNodeRoot + "/" + notification
+          val (data, stat) = ZkUtils.readDataMaybeNull(zkClient, changeZnode)
+          data map (notificationHandler.processNotification(_)) 
getOrElse(logger.warn(s"read null data from $changeZnode when processing 
notification $notification"))
+        }
+        lastExecutedChange = changeId
+      }
+      purgeObsoleteNotifications(now, notifications)
+    }
+  }
+
+  /**
+   * Purges expired notifications.
+   * @param now
+   * @param notifications
+   */
+  private def purgeObsoleteNotifications(now: Long, notifications: 
Seq[String]) {
+    for (notification <- notifications.sorted) {
+      val notificationNode = seqNodeRoot + "/" + notification
+      val (data, stat) = ZkUtils.readDataMaybeNull(zkClient, notificationNode)
+      if (data.isDefined) {
+        if (now - stat.getCtime > changeExpirationMs) {
+          debug(s"Purging change notification $notificationNode")
+          ZkUtils.deletePath(zkClient, notificationNode)
+        }
+      }
+    }
+  }
+
+  /* get the change number from a change notification znode */
+  private def changeNumber(name: String): Long = 
name.substring(seqNodePrefix.length).toLong
+
+  /**
+   * A listener that gets invoked when a node is created to notify changes.
+   */
+  object NodeChangeListener extends IZkChildListener {
+    override def handleChildChange(path: String, notifications: 
java.util.List[String]) {
+      try {
+        import scala.collection.JavaConverters._
+        if (notifications != null)
+          processNotifications(notifications.asScala.sorted)
+      } catch {
+        case e: Exception => error(s"Error processing notification change for 
path = $path and notification= $notifications :", e)
+      }
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/0990b6ba/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala 
b/core/src/main/scala/kafka/network/RequestChannel.scala
index 5d09a16..798e01d 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -46,7 +46,7 @@ object RequestChannel extends Logging {
     byteBuffer
   }
 
-  case class Session(principal: Principal, host: String)
+  case class Session(principal: KafkaPrincipal, host: String)
 
   case class Request(processor: Int, connectionId: String, session: Session, 
private var buffer: ByteBuffer, startTimeMs: Long, securityProtocol: 
SecurityProtocol) {
     @volatile var requestDequeueTimeMs = -1L

http://git-wip-us.apache.org/repos/asf/kafka/blob/0990b6ba/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index d46603b..57ee318 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -33,6 +33,7 @@ import kafka.utils._
 import org.apache.kafka.common.MetricName
 import org.apache.kafka.common.metrics._
 import org.apache.kafka.common.network.{ChannelBuilders, 
InvalidReceiveException, ChannelBuilder, PlaintextChannelBuilder, 
SSLChannelBuilder}
+import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.security.ssl.SSLFactory
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.protocol.types.SchemaException
@@ -417,7 +418,7 @@ private[kafka] class Processor(val id: Int,
           try {
 
             val channel = selector.channelForId(receive.source);
-            val session = RequestChannel.Session(channel.principal, 
channel.socketDescription)
+            val session = RequestChannel.Session(new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal().getName), 
channel.socketDescription)
             val req = RequestChannel.Request(processor = id, connectionId = 
receive.source, session = session, buffer = receive.payload, startTimeMs = 
time.milliseconds, securityProtocol = protocol)
             requestChannel.sendRequest(req)
           } catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0990b6ba/core/src/main/scala/kafka/security/auth/Authorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/Authorizer.scala 
b/core/src/main/scala/kafka/security/auth/Authorizer.scala
index 1f0547d..1569471 100644
--- a/core/src/main/scala/kafka/security/auth/Authorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/Authorizer.scala
@@ -74,8 +74,8 @@ trait Authorizer extends Configurable {
   /**
    * get the acls for this principal.
    * @param principal
-   * @return empty set if no acls exist for this principal, otherwise the acls 
for the principal.
+   * @return empty Map if no acls exist for this principal, otherwise a map of 
resource -> acls for the principal.
    */
-  def getAcls(principal: KafkaPrincipal): Set[Acl]
+  def getAcls(principal: KafkaPrincipal): Map[Resource, Set[Acl]]
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0990b6ba/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala 
b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
new file mode 100644
index 0000000..8457cb8
--- /dev/null
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.security.auth
+
+import java.util
+import java.util.concurrent.locks.ReentrantReadWriteLock
+import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener}
+import org.apache.zookeeper.Watcher.Event.KeeperState
+
+
+import kafka.network.RequestChannel.Session
+import kafka.server.KafkaConfig
+import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
+import kafka.utils._
+import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+import scala.collection.JavaConverters._
+import org.apache.log4j.Logger
+
+object SimpleAclAuthorizer {
+  //optional override zookeeper cluster configuration where acls will be 
stored, if not specified acls will be stored in
+  //same zookeeper where all other kafka broker info is stored.
+  val ZkUrlProp = "authorizer.zookeeper.url"
+  val ZkConnectionTimeOutProp = "authorizer.zookeeper.connection.timeout.ms"
+  val ZkSessionTimeOutProp = "authorizer.zookeeper.session.timeout.ms"
+
+  //List of users that will be treated as super users and will have access to 
all the resources for all actions from all hosts, defaults to no super users.
+  val SuperUsersProp = "super.users"
+  //If set to true when no acls are found for a resource , authorizer allows 
access to everyone. Defaults to false.
+  val AllowEveryoneIfNoAclIsFoundProp = "allow.everyone.if.no.acl.found"
+
+  /**
+   * The root acl storage node. Under this node there will be one child node 
per resource type (Topic, Cluster, ConsumerGroup).
+   * under each resourceType there will be a unique child for each resource 
instance and the data for that child will contain
+   * list of its acls as a json object. Following gives an example:
+   *
+   * <pre>
+   * /kafka-acl/Topic/topic-1 => {"version": 1, "acls": [ { "host":"host1", 
"permissionType": "Allow","operation": "Read","principal": "User:alice"}]}
+   * /kafka-acl/Cluster/kafka-cluster => {"version": 1, "acls": [ { 
"host":"host1", "permissionType": "Allow","operation": "Read","principal": 
"User:alice"}]}
+   * /kafka-acl/ConsumerGroup/group-1 => {"version": 1, "acls": [ { 
"host":"host1", "permissionType": "Allow","operation": "Read","principal": 
"User:alice"}]}
+   * </pre>
+   */
+  val AclZkPath = "/kafka-acl"
+
+  //notification node which gets updated with the resource name when acl on a 
resource is changed.
+  val AclChangedZkPath = "/kafka-acl-changes"
+
+  //prefix of all the change notificiation sequence node.
+  val AclChangedPrefix = "acl_changes_"
+}
+
+class SimpleAclAuthorizer extends Authorizer with Logging {
+  private val authorizerLogger = Logger.getLogger("kafka.authorizer.logger")
+  private var superUsers = Set.empty[KafkaPrincipal]
+  private var shouldAllowEveryoneIfNoAclIsFound = false
+  private var zkClient: ZkClient = null
+  private var aclChangeListener: ZkNodeChangeNotificationListener = null
+
+  private val aclCache = new scala.collection.mutable.HashMap[Resource, 
Set[Acl]]
+  private val lock = new ReentrantReadWriteLock()
+
+  /**
+   * Guaranteed to be called before any authorize call is made.
+   */
+  override def configure(javaConfigs: util.Map[String, _]) {
+    val configs = javaConfigs.asScala
+    val props = new java.util.Properties()
+    configs foreach { case (key, value) => props.put(key, value.toString) }
+    val kafkaConfig = KafkaConfig.fromProps(props)
+
+    superUsers = configs.get(SimpleAclAuthorizer.SuperUsersProp).collect {
+      case str: String if str.nonEmpty => str.split(",").map(s => 
KafkaPrincipal.fromString(s.trim)).toSet
+    }.getOrElse(Set.empty[KafkaPrincipal])
+
+    shouldAllowEveryoneIfNoAclIsFound = 
configs.get(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).map(_.toString.toBoolean).getOrElse(false)
+
+    val zkUrl = configs.getOrElse(SimpleAclAuthorizer.ZkUrlProp, 
kafkaConfig.zkConnect).toString
+    val zkConnectionTimeoutMs = 
configs.getOrElse(SimpleAclAuthorizer.ZkConnectionTimeOutProp, 
kafkaConfig.zkConnectionTimeoutMs).toString.toInt
+    val zkSessionTimeOutMs = 
configs.getOrElse(SimpleAclAuthorizer.ZkSessionTimeOutProp, 
kafkaConfig.zkSessionTimeoutMs).toString.toInt
+
+    zkClient = ZkUtils.createZkClient(zkUrl, zkConnectionTimeoutMs, 
zkSessionTimeOutMs)
+    ZkUtils.makeSurePersistentPathExists(zkClient, 
SimpleAclAuthorizer.AclZkPath)
+
+    loadCache()
+
+    ZkUtils.makeSurePersistentPathExists(zkClient, 
SimpleAclAuthorizer.AclChangedZkPath)
+    aclChangeListener = new ZkNodeChangeNotificationListener(zkClient, 
SimpleAclAuthorizer.AclChangedZkPath, SimpleAclAuthorizer.AclChangedPrefix, 
AclChangedNotificaitonHandler)
+    aclChangeListener.init()
+
+    zkClient.subscribeStateChanges(ZkStateChangeListener)
+  }
+
+  override def authorize(session: Session, operation: Operation, resource: 
Resource): Boolean = {
+    val principal: KafkaPrincipal = session.principal
+    val host = session.host
+    val acls = getAcls(resource)
+
+    //check if there is any Deny acl match that would disallow this operation.
+    val denyMatch = aclMatch(session, operation, resource, principal, host, 
Deny, acls)
+
+    //if principal is allowed to read or write we allow describe by default, 
the reverse does not apply to Deny.
+    val ops = if (Describe == operation)
+      Set[Operation](operation, Read, Write)
+    else
+      Set[Operation](operation)
+
+    //now check if there is any allow acl that will allow this operation.
+    val allowMatch = ops.exists(operation => aclMatch(session, operation, 
resource, principal, host, Allow, acls))
+
+    //we allow an operation if a user is a super user or if no acls are found 
and user has configured to allow all users
+    //when no acls are found or if no deny acls are found and at least one 
allow acls matches.
+    val authorized = isSuperUser(operation, resource, principal, host) ||
+      isEmptyAclAndAuthorized(operation, resource, principal, host, acls) ||
+      (!denyMatch && allowMatch)
+
+    logAuditMessage(principal, authorized, operation, resource, host)
+    authorized
+  }
+
+  def isEmptyAclAndAuthorized(operation: Operation, resource: Resource, 
principal: KafkaPrincipal, host: String, acls: Set[Acl]): Boolean = {
+    if (acls.isEmpty) {
+      authorizerLogger.debug(s"No acl found for resource $resource, authorized 
= $shouldAllowEveryoneIfNoAclIsFound")
+      shouldAllowEveryoneIfNoAclIsFound
+    } else false
+  }
+
+  def isSuperUser(operation: Operation, resource: Resource, principal: 
KafkaPrincipal, host: String): Boolean = {
+    if (superUsers.exists( _ == principal)) {
+      authorizerLogger.debug(s"principal = $principal is a super user, 
allowing operation without checking acls.")
+      true
+    } else false
+  }
+
+  private def aclMatch(session: Session, operations: Operation, resource: 
Resource, principal: KafkaPrincipal, host: String, permissionType: 
PermissionType, acls: Set[Acl]): Boolean = {
+    acls.find ( acl =>
+      acl.permissionType == permissionType
+        && (acl.principal == principal || acl.principal == 
Acl.WildCardPrincipal)
+        && (operations == acl.operation || acl.operation == All)
+        && (acl.host == host || acl.host == Acl.WildCardHost)
+    ).map { acl: Acl =>
+      authorizerLogger.debug(s"operation = $operations on resource = $resource 
from host = $host is $permissionType based on acl = $acl")
+      true
+    }.getOrElse(false)
+  }
+
+  override def addAcls(acls: Set[Acl], resource: Resource) {
+    if (acls != null && acls.nonEmpty) {
+      val updatedAcls = getAcls(resource) ++ acls
+      val path = toResourcePath(resource)
+
+      if (ZkUtils.pathExists(zkClient, path))
+        ZkUtils.updatePersistentPath(zkClient, path, 
Json.encode(Acl.toJsonCompatibleMap(updatedAcls)))
+      else
+        ZkUtils.createPersistentPath(zkClient, path, 
Json.encode(Acl.toJsonCompatibleMap(updatedAcls)))
+
+      updateAclChangedFlag(resource)
+      updateCache(resource, updatedAcls)
+    }
+  }
+
+  override def removeAcls(aclsTobeRemoved: Set[Acl], resource: Resource): 
Boolean = {
+    if (ZkUtils.pathExists(zkClient, toResourcePath(resource))) {
+      val existingAcls = getAcls(resource)
+      val filteredAcls = existingAcls.filter((acl: Acl) => 
!aclsTobeRemoved.contains(acl))
+
+      val aclNeedsRemoval = (existingAcls != filteredAcls)
+      if (aclNeedsRemoval) {
+        val path: String = toResourcePath(resource)
+        if (filteredAcls.nonEmpty)
+          ZkUtils.updatePersistentPath(zkClient, path, 
Json.encode(Acl.toJsonCompatibleMap(filteredAcls)))
+        else
+          ZkUtils.deletePath(zkClient, toResourcePath(resource))
+
+        updateAclChangedFlag(resource)
+        updateCache(resource, filteredAcls)
+      }
+
+      aclNeedsRemoval
+    } else false
+  }
+
+  override def removeAcls(resource: Resource): Boolean = {
+    if (ZkUtils.pathExists(zkClient, toResourcePath(resource))) {
+      ZkUtils.deletePath(zkClient, toResourcePath(resource))
+      updateAclChangedFlag(resource)
+      updateCache(resource, Set.empty[Acl])
+      true
+    } else false
+  }
+
+  override def getAcls(resource: Resource): Set[Acl] = {
+    inReadLock(lock) {
+      aclCache.get(resource).getOrElse(Set.empty[Acl])
+    }
+  }
+
+  private def getAclsFromZk(resource: Resource): Set[Acl] = {
+    val aclJson = ZkUtils.readDataMaybeNull(zkClient, 
toResourcePath(resource))._1
+    aclJson.map(Acl.fromJson).getOrElse(Set.empty)
+  }
+
+  override def getAcls(principal: KafkaPrincipal): Map[Resource, Set[Acl]] = {
+    aclCache.mapValues { acls =>
+      acls.filter(_.principal == principal)
+    }.filter { case (_, acls) =>
+      acls.nonEmpty
+    }.toMap
+  }
+
+  private def loadCache()  {
+    var acls = Set.empty[Acl]
+    val resourceTypes = ZkUtils.getChildren(zkClient, 
SimpleAclAuthorizer.AclZkPath)
+    for (rType <- resourceTypes) {
+      val resourceType = ResourceType.fromString(rType)
+      val resourceTypePath = SimpleAclAuthorizer.AclZkPath + "/" + 
resourceType.name
+      val resourceNames = ZkUtils.getChildren(zkClient, resourceTypePath)
+      for (resourceName <- resourceNames) {
+        acls = getAclsFromZk(Resource(resourceType, resourceName.toString))
+        updateCache(new Resource(resourceType, resourceName), acls)
+      }
+    }
+  }
+
+  private def updateCache(resource: Resource, acls: Set[Acl]) {
+    inWriteLock(lock) {
+      if (acls.nonEmpty)
+        aclCache.put(resource, acls)
+      else
+        aclCache.remove(resource)
+    }
+  }
+
+  def toResourcePath(resource: Resource): String = {
+    SimpleAclAuthorizer.AclZkPath + "/" + resource.resourceType + "/" + 
resource.name
+  }
+
+  private def logAuditMessage(principal: KafkaPrincipal, authorized: Boolean, 
operation: Operation, resource: Resource, host: String) {
+    val permissionType = if (authorized) "Allowed" else "Denied"
+    authorizerLogger.debug(s"Principal = $principal is $permissionType 
Operation = $operation from host = $host on resource = $resource")
+  }
+
+  private def updateAclChangedFlag(resource: Resource) {
+    ZkUtils.createSequentialPersistentPath(zkClient, 
SimpleAclAuthorizer.AclChangedZkPath + "/" + 
SimpleAclAuthorizer.AclChangedPrefix, resource.toString)
+  }
+
+  object AclChangedNotificaitonHandler extends NotificationHandler {
+
+    override def processNotification(notificationMessage: String) {
+      val resource: Resource = Resource.fromString(notificationMessage)
+      val acls = getAclsFromZk(resource)
+      updateCache(resource, acls)
+    }
+  }
+
+  object ZkStateChangeListener extends IZkStateListener {
+
+    override def handleNewSession() {
+      aclChangeListener.processAllNotifications
+    }
+
+    override def handleSessionEstablishmentError(error: Throwable) {
+      fatal("Could not establish session with zookeeper", error)
+    }
+
+    override def handleStateChanged(state: KeeperState) {
+      //no op
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0990b6ba/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
 
b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
new file mode 100644
index 0000000..d43778e
--- /dev/null
+++ 
b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.common
+
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.{TestUtils, ZkUtils}
+import org.junit.Test
+
+class ZkNodeChangeNotificationListenerTest extends KafkaServerTestHarness {
+
+  override def generateConfigs() = 
List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
+
+  @Test
+  def testProcessNotification() {
+    val notificationHandler = new NotificationHandler {
+      @volatile var notification: String = _
+      @volatile var invocationCount: Integer = 0
+      override def processNotification(notificationMessage: String): Unit = {
+        notification = notificationMessage
+        invocationCount += 1
+      }
+    }
+
+    val seqNodeRoot = "/root"
+    val seqNodePrefix = "prefix"
+    val seqNodePath = seqNodeRoot + "/" + seqNodePrefix
+    val notificationMessage1 = "message1"
+    val notificationMessage2 = "message2"
+    val changeExpirationMs = 100
+
+    val notificationListener = new ZkNodeChangeNotificationListener(zkClient, 
seqNodeRoot, seqNodePrefix, notificationHandler, changeExpirationMs)
+    notificationListener.init()
+
+    ZkUtils.createSequentialPersistentPath(zkClient, seqNodePath, 
notificationMessage1)
+
+    TestUtils.waitUntilTrue(() => notificationHandler.invocationCount == 1 && 
notificationHandler.notification == notificationMessage1, "failed to 
send/process notification message in the timeout period.")
+
+    /*There is no easy way to test that purging. Even if we mock kafka time 
with MockTime, the purging compares kafka time with the time stored in 
zookeeper stat and the
+    embeded zookeeper server does not provide a way to mock time. so to test 
purging we will have to use SystemTime.sleep(changeExpirationMs + 1) issue a 
write and check
+    Assert.assertEquals(1, ZkUtils.getChildren(zkClient, seqNodeRoot).size) 
however even after that the assertion can fail as the second node it self can 
be deleted
+    depending on how threads get scheduled.*/
+
+    ZkUtils.createSequentialPersistentPath(zkClient, seqNodePath, 
notificationMessage2)
+    TestUtils.waitUntilTrue(() => notificationHandler.invocationCount == 2 && 
notificationHandler.notification == notificationMessage2, "failed to 
send/process notification message in the timeout period.")
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0990b6ba/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala 
b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
new file mode 100644
index 0000000..d3efc36
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.security.auth
+
+import java.util.UUID
+
+import kafka.network.RequestChannel.Session
+import kafka.security.auth.Acl.WildCardHost
+import kafka.server.KafkaConfig
+import kafka.utils.{ZkUtils, TestUtils}
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.junit.Assert._
+import org.junit.{Before, Test}
+
+class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
+
+  var simpleAclAuthorizer = new SimpleAclAuthorizer
+  val testPrincipal = Acl.WildCardPrincipal
+  val testHostName = "test.host.com"
+  var session = new Session(testPrincipal, testHostName)
+  var resource: Resource = null
+  val superUsers = "User:superuser1, User:superuser2"
+  val username = "alice"
+  var config: KafkaConfig = null
+
+  @Before
+  override def setUp() {
+    super.setUp()
+
+    val props = TestUtils.createBrokerConfig(0, zkConnect)
+    props.put(SimpleAclAuthorizer.SuperUsersProp, superUsers)
+
+    config = KafkaConfig.fromProps(props)
+    simpleAclAuthorizer.configure(config.originals)
+    resource = new Resource(Topic, UUID.randomUUID().toString)
+  }
+
+  @Test
+  def testTopicAcl() {
+    val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
+    val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "rob")
+    val user3 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "batman")
+    val host1 = "host1"
+    val host2 = "host2"
+
+    //user1 has READ access from host1 and host2.
+    val acl1 = new Acl(user1, Allow, host1, Read)
+    val acl2 = new Acl(user1, Allow, host2, Read)
+
+    //user1 does not have  READ access from host1.
+    val acl3 = new Acl(user1, Deny, host1, Read)
+
+    //user1 has Write access from host1 only.
+    val acl4 = new Acl(user1, Allow, host1, Write)
+
+    //user1 has DESCRIBE access from all hosts.
+    val acl5 = new Acl(user1, Allow, WildCardHost, Describe)
+
+    //user2 has READ access from all hosts.
+    val acl6 = new Acl(user2, Allow, WildCardHost, Read)
+
+    //user3 has WRITE access from all hosts.
+    val acl7 = new Acl(user3, Allow, WildCardHost, Write)
+
+    val acls = Set[Acl](acl1, acl2, acl3, acl4, acl5, acl6, acl7)
+
+    changeAclAndVerify(Set.empty[Acl], acls, Set.empty[Acl])
+
+    val host1Session = new Session(user1, host1)
+    val host2Session = new Session(user1, host2)
+
+    assertTrue("User1 should have READ access from host2", 
simpleAclAuthorizer.authorize(host2Session, Read, resource))
+    assertFalse("User1 should not have READ access from host1 due to denyAcl", 
simpleAclAuthorizer.authorize(host1Session, Read, resource))
+    assertTrue("User1 should have WRITE access from host1", 
simpleAclAuthorizer.authorize(host1Session, Write, resource))
+    assertFalse("User1 should not have WRITE access from host2 as no allow acl 
is defined", simpleAclAuthorizer.authorize(host2Session, Write, resource))
+    assertTrue("User1 should not have DESCRIBE access from host1", 
simpleAclAuthorizer.authorize(host1Session, Describe, resource))
+    assertTrue("User1 should have DESCRIBE access from host2", 
simpleAclAuthorizer.authorize(host2Session, Describe, resource))
+    assertFalse("User1 should not have edit access from host1", 
simpleAclAuthorizer.authorize(host1Session, Alter, resource))
+    assertFalse("User1 should not have edit access from host2", 
simpleAclAuthorizer.authorize(host2Session, Alter, resource))
+
+    //test if user has READ and write access they also get describe access
+    val user2Session = new Session(user2, host1)
+    val user3Session = new Session(user3, host1)
+    assertTrue("User2 should have DESCRIBE access from host1", 
simpleAclAuthorizer.authorize(user2Session, Describe, resource))
+    assertTrue("User3 should have DESCRIBE access from host2", 
simpleAclAuthorizer.authorize(user3Session, Describe, resource))
+    assertTrue("User2 should have READ access from host1", 
simpleAclAuthorizer.authorize(user2Session, Read, resource))
+    assertTrue("User3 should have WRITE access from host2", 
simpleAclAuthorizer.authorize(user3Session, Write, resource))
+  }
+
+  @Test
+  def testDenyTakesPrecedence() {
+    val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
+    val host = "random-host"
+    val session = new Session(user, host)
+
+    val allowAll = Acl.AllowAllAcl
+    val denyAcl = new Acl(user, Deny, host, All)
+    val acls = Set[Acl](allowAll, denyAcl)
+
+    changeAclAndVerify(Set.empty[Acl], acls, Set.empty[Acl])
+
+    assertFalse("deny should take precedence over allow.", 
simpleAclAuthorizer.authorize(session, Read, resource))
+  }
+
+  @Test
+  def testAllowAllAccess() {
+    val allowAllAcl = Acl.AllowAllAcl
+
+    changeAclAndVerify(Set.empty[Acl], Set[Acl](allowAllAcl), Set.empty[Acl])
+
+    val session = new Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"random"), "random.host")
+    assertTrue("allow all acl should allow access to all.", 
simpleAclAuthorizer.authorize(session, Read, resource))
+  }
+
+  @Test
+  def testSuperUserHasAccess() {
+    val denyAllAcl = new Acl(Acl.WildCardPrincipal, Deny, WildCardHost, All)
+
+    changeAclAndVerify(Set.empty[Acl], Set[Acl](denyAllAcl), Set.empty[Acl])
+
+    val session1 = new Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"superuser1"), "random.host")
+    val session2 = new Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"superuser2"), "random.host")
+
+    assertTrue("superuser always has access, no matter what acls.", 
simpleAclAuthorizer.authorize(session1, Read, resource))
+    assertTrue("superuser always has access, no matter what acls.", 
simpleAclAuthorizer.authorize(session2, Read, resource))
+  }
+
+  @Test
+  def testNoAclFound() {
+    assertFalse("when acls = [],  authorizer should fail close.", 
simpleAclAuthorizer.authorize(session, Read, resource))
+  }
+
+  @Test
+  def testNoAclFoundOverride() {
+    val props = TestUtils.createBrokerConfig(1, zkConnect)
+    props.put(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, "true")
+
+    val cfg = KafkaConfig.fromProps(props)
+    val testAuthoizer: SimpleAclAuthorizer = new SimpleAclAuthorizer
+    testAuthoizer.configure(cfg.originals)
+    assertTrue("when acls = null or [],  authorizer should fail open with 
allow.everyone = true.", testAuthoizer.authorize(session, Read, resource))
+  }
+
+  @Test
+  def testAclManagementAPIs() {
+    val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
+    val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob")
+    val host1 = "host1"
+    val host2 = "host2"
+
+    val acl1 = new Acl(user1, Allow, host1, Read)
+    val acl2 = new Acl(user1, Allow, host1, Write)
+    val acl3 = new Acl(user2, Allow, host2, Read)
+    val acl4 = new Acl(user2, Allow, host2, Write)
+
+    var acls = changeAclAndVerify(Set.empty[Acl], Set[Acl](acl1, acl2, acl3, 
acl4), Set.empty[Acl])
+
+    //test addAcl is additive
+    val acl5 = new Acl(user2, Allow, WildCardHost, Read)
+    acls = changeAclAndVerify(acls, Set[Acl](acl5), Set.empty[Acl])
+
+    //test get by principal name.
+    TestUtils.waitUntilTrue(() => Map(resource -> Set(acl1, acl2)) == 
simpleAclAuthorizer.getAcls(user1), "changes not propogated in timeout period")
+    TestUtils.waitUntilTrue(() => Map(resource -> Set(acl3, acl4, acl5)) == 
simpleAclAuthorizer.getAcls(user2), "changes not propogated in timeout period")
+
+    //test remove acl from existing acls.
+    changeAclAndVerify(acls, Set.empty[Acl], Set(acl1, acl5))
+
+    //test remove all acls for resource
+    simpleAclAuthorizer.removeAcls(resource)
+    TestUtils.waitUntilTrue(() => simpleAclAuthorizer.getAcls(resource) == 
Set.empty[Acl], "changes not propagated in timeout period.")
+    assertTrue(!ZkUtils.pathExists(zkClient, 
simpleAclAuthorizer.toResourcePath(resource)))
+
+    //test removing last acl also deletes zookeeper path
+    acls = changeAclAndVerify(Set.empty[Acl], Set(acl1), Set.empty[Acl])
+    changeAclAndVerify(Set.empty[Acl], Set.empty[Acl], acls)
+    assertTrue(!ZkUtils.pathExists(zkClient, 
simpleAclAuthorizer.toResourcePath(resource)))
+  }
+
+  @Test
+  def testLoadCache() {
+    val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
+    val acl1 = new Acl(user1, Allow, "host-1", Read)
+    val acls = Set[Acl](acl1)
+    simpleAclAuthorizer.addAcls(acls, resource)
+
+    val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob")
+    val resource1 = new Resource(Topic, "test-2")
+    val acl2 = new Acl(user2, Deny, "host3", Read)
+    val acls1 = Set[Acl](acl2)
+    simpleAclAuthorizer.addAcls(acls1, resource1)
+
+    ZkUtils.deletePathRecursive(zkClient, SimpleAclAuthorizer.AclChangedZkPath)
+    val authorizer = new SimpleAclAuthorizer
+    authorizer.configure(config.originals)
+
+    assertEquals(acls, authorizer.getAcls(resource))
+    assertEquals(acls1, authorizer.getAcls(resource1))
+  }
+
+  private def changeAclAndVerify(originalAcls: Set[Acl], addedAcls: Set[Acl], 
removedAcls: Set[Acl]): Set[Acl] = {
+    var acls = originalAcls
+
+    if(addedAcls.nonEmpty) {
+      simpleAclAuthorizer.addAcls(addedAcls, resource)
+      acls ++= addedAcls
+    }
+
+    if(removedAcls.nonEmpty) {
+      simpleAclAuthorizer.removeAcls(removedAcls, resource)
+      acls --=removedAcls
+    }
+
+    TestUtils.waitUntilTrue(() => simpleAclAuthorizer.getAcls(resource) == 
acls, "changes not propagated in timeout period.")
+
+    acls
+  }
+}
\ No newline at end of file

Reply via email to