This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.0 by this push:
new d36b52c KAFKA-7255: Fix timing issue with create/update in
SimpleAclAuthorizer (#5478)
d36b52c is described below
commit d36b52c6c1c685741a1c9c6d199ceb716fc3e97b
Author: Rajini Sivaram <[email protected]>
AuthorDate: Wed Aug 8 17:44:57 2018 +0100
KAFKA-7255: Fix timing issue with create/update in SimpleAclAuthorizer
(#5478)
ACL updates currently get `(currentAcls, currentVersion)` for the resource
from ZK and do a conditional update using `(currentAcls+newAcl,
currentVersion)`. This supports concurrent atomic updates if the resource path
already exists in ZK. If the path doesn't exist, we currently do a conditional
createOrUpdate using `(newAcl, 0)`. So two brokers adding acls using `(newAcl1,
0)` and `(newAcl2, 0)` will result in one broker creating the path and setting
newAcl1, while the other broker c [...]
---
.../kafka/security/auth/SimpleAclAuthorizer.scala | 50 +++++++++++++++-------
1 file changed, 34 insertions(+), 16 deletions(-)
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 3c949648..3a00226 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -21,11 +21,12 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener}
import kafka.network.RequestChannel.Session
-import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
+import kafka.security.auth.SimpleAclAuthorizer._
import kafka.server.KafkaConfig
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils._
import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
+
import org.apache.kafka.common.security.auth.KafkaPrincipal
import scala.collection.JavaConverters._
import org.apache.log4j.Logger
@@ -63,7 +64,11 @@ object SimpleAclAuthorizer {
//prefix of all the change notification sequence node.
val AclChangedPrefix = "acl_changes_"
- private case class VersionedAcls(acls: Set[Acl], zkVersion: Int)
+ private val UnknownZkVersion = -2
+ private case class VersionedAcls(acls: Set[Acl], zkVersion: Int) {
+ def exists: Boolean = zkVersion != UnknownZkVersion
+ }
+ private val NoAcls = VersionedAcls(Set.empty, UnknownZkVersion)
}
class SimpleAclAuthorizer extends Authorizer with Logging {
@@ -193,7 +198,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
override def removeAcls(resource: Resource): Boolean = {
inWriteLock(lock) {
val result = zkUtils.deletePath(toResourcePath(resource))
- updateCache(resource, VersionedAcls(Set(), 0))
+ updateCache(resource, NoAcls)
updateAclChangedFlag(resource)
result
}
@@ -281,7 +286,10 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
val data = Json.encode(Acl.toJsonCompatibleMap(newAcls))
val (updateSucceeded, updateVersion) =
if (newAcls.nonEmpty) {
- updatePath(path, data, currentVersionedAcls.zkVersion)
+ if (currentVersionedAcls.exists)
+ updatePath(path, data, currentVersionedAcls.zkVersion)
+ else
+ createPath(path, data)
} else {
trace(s"Deleting path for $resource because it had no ACLs
remaining")
(zkUtils.conditionalDeletePath(path,
currentVersionedAcls.zkVersion), 0)
@@ -314,23 +322,30 @@ class SimpleAclAuthorizer extends Authorizer with Logging
{
}
/**
- * Updates a zookeeper path with an expected version. If the topic does not
exist, it will create it.
+ * Updates a zookeeper path with an expected version. Fails if the path
does not exist.
* Returns if the update was successful and the new version.
*/
private def updatePath(path: String, data: String, expectedVersion: Int):
(Boolean, Int) = {
try {
zkUtils.conditionalUpdatePersistentPathIfExists(path, data,
expectedVersion)
} catch {
- case _: ZkNoNodeException =>
- try {
- debug(s"Node $path does not exist, attempting to create it.")
- zkUtils.createPersistentPath(path, data)
- (true, 0)
- } catch {
- case _: ZkNodeExistsException =>
- debug(s"Failed to create node for $path because it already
exists.")
- (false, 0)
- }
+ case _: ZkNoNodeException => (false, UnknownZkVersion)
+ }
+ }
+
+ /**
+ * Creates a zookeeper path with the provided data. Fails if the path
already exists.
+ * Returns if the create was successful and the new version.
+ */
+ private def createPath(path: String, data: String): (Boolean, Int) = {
+ try {
+ debug(s"Node $path does not exist, attempting to create it.")
+ zkUtils.createPersistentPath(path, data)
+ (true, 0)
+ } catch {
+ case _: ZkNodeExistsException =>
+ debug(s"Failed to create node for $path because it already exists.")
+ (false, UnknownZkVersion)
}
}
@@ -340,7 +355,10 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
private def getAclsFromZk(resource: Resource): VersionedAcls = {
val (aclJson, stat) = zkUtils.readDataMaybeNull(toResourcePath(resource))
- VersionedAcls(aclJson.map(Acl.fromJson).getOrElse(Set()), stat.getVersion)
+ aclJson match {
+ case Some(acl) => VersionedAcls(Acl.fromJson(acl), stat.getVersion)
+ case None => NoAcls
+ }
}
private def updateCache(resource: Resource, versionedAcls: VersionedAcls) {