This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new d36b52c  KAFKA-7255: Fix timing issue with create/update in 
SimpleAclAuthorizer (#5478)
d36b52c is described below

commit d36b52c6c1c685741a1c9c6d199ceb716fc3e97b
Author: Rajini Sivaram <[email protected]>
AuthorDate: Wed Aug 8 17:44:57 2018 +0100

    KAFKA-7255: Fix timing issue with create/update in SimpleAclAuthorizer 
(#5478)
    
    ACL updates currently get `(currentAcls, currentVersion)` for the resource 
from ZK and do a conditional update using `(currentAcls+newAcl, 
currentVersion)`. This supports concurrent atomic updates if the resource path 
already exists in ZK. If the path doesn't exist, we currently do a conditional 
createOrUpdate using `(newAcl, 0)`. So two brokers adding acls using `(newAcl1, 
0)` and `(newAcl2, 0)` will result in one broker creating the path and setting 
newAcl1, while the other broker c [...]
---
 .../kafka/security/auth/SimpleAclAuthorizer.scala  | 50 +++++++++++++++-------
 1 file changed, 34 insertions(+), 16 deletions(-)

diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala 
b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 3c949648..3a00226 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -21,11 +21,12 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
 import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener}
 
 import kafka.network.RequestChannel.Session
-import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
+import kafka.security.auth.SimpleAclAuthorizer._
 import kafka.server.KafkaConfig
 import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils._
 import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
+
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import scala.collection.JavaConverters._
 import org.apache.log4j.Logger
@@ -63,7 +64,11 @@ object SimpleAclAuthorizer {
   //prefix of all the change notification sequence node.
   val AclChangedPrefix = "acl_changes_"
 
-  private case class VersionedAcls(acls: Set[Acl], zkVersion: Int)
+  private val UnknownZkVersion = -2
+  private case class VersionedAcls(acls: Set[Acl], zkVersion: Int) {
+    def exists: Boolean = zkVersion != UnknownZkVersion
+  }
+  private val NoAcls = VersionedAcls(Set.empty, UnknownZkVersion)
 }
 
 class SimpleAclAuthorizer extends Authorizer with Logging {
@@ -193,7 +198,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
   override def removeAcls(resource: Resource): Boolean = {
     inWriteLock(lock) {
       val result = zkUtils.deletePath(toResourcePath(resource))
-      updateCache(resource, VersionedAcls(Set(), 0))
+      updateCache(resource, NoAcls)
       updateAclChangedFlag(resource)
       result
     }
@@ -281,7 +286,10 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
       val data = Json.encode(Acl.toJsonCompatibleMap(newAcls))
       val (updateSucceeded, updateVersion) =
         if (newAcls.nonEmpty) {
-         updatePath(path, data, currentVersionedAcls.zkVersion)
+          if (currentVersionedAcls.exists)
+            updatePath(path, data, currentVersionedAcls.zkVersion)
+          else
+            createPath(path, data)
         } else {
           trace(s"Deleting path for $resource because it had no ACLs 
remaining")
           (zkUtils.conditionalDeletePath(path, 
currentVersionedAcls.zkVersion), 0)
@@ -314,23 +322,30 @@ class SimpleAclAuthorizer extends Authorizer with Logging 
{
   }
 
   /**
-    * Updates a zookeeper path with an expected version. If the topic does not 
exist, it will create it.
+    * Updates a zookeeper path with an expected version. Fails if the path 
does not exist.
     * Returns if the update was successful and the new version.
     */
   private def updatePath(path: String, data: String, expectedVersion: Int): 
(Boolean, Int) = {
     try {
       zkUtils.conditionalUpdatePersistentPathIfExists(path, data, 
expectedVersion)
     } catch {
-      case _: ZkNoNodeException =>
-        try {
-          debug(s"Node $path does not exist, attempting to create it.")
-          zkUtils.createPersistentPath(path, data)
-          (true, 0)
-        } catch {
-          case _: ZkNodeExistsException =>
-            debug(s"Failed to create node for $path because it already 
exists.")
-            (false, 0)
-        }
+      case _: ZkNoNodeException => (false, UnknownZkVersion)
+    }
+  }
+
+  /**
+   * Creates a zookeeper path with the provided data. Fails if the path 
already exists.
+   * Returns if the create was successful and the new version.
+   */
+  private def createPath(path: String, data: String): (Boolean, Int) = {
+    try {
+      debug(s"Node $path does not exist, attempting to create it.")
+      zkUtils.createPersistentPath(path, data)
+      (true, 0)
+    } catch {
+      case _: ZkNodeExistsException =>
+        debug(s"Failed to create node for $path because it already exists.")
+        (false, UnknownZkVersion)
     }
   }
 
@@ -340,7 +355,10 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
 
   private def getAclsFromZk(resource: Resource): VersionedAcls = {
     val (aclJson, stat) = zkUtils.readDataMaybeNull(toResourcePath(resource))
-    VersionedAcls(aclJson.map(Acl.fromJson).getOrElse(Set()), stat.getVersion)
+    aclJson match {
+      case Some(acl) => VersionedAcls(Acl.fromJson(acl), stat.getVersion)
+      case None => NoAcls
+    }
   }
 
   private def updateCache(resource: Resource, versionedAcls: VersionedAcls) {

Reply via email to