This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new 1c2179b KAFKA-7255: Fix timing issue with create/update in
SimpleAclAuthorizer (#5478)
1c2179b is described below
commit 1c2179b0fa36c8e54633367a024a64059c3f0287
Author: Rajini Sivaram <[email protected]>
AuthorDate: Wed Aug 8 17:44:57 2018 +0100
KAFKA-7255: Fix timing issue with create/update in SimpleAclAuthorizer
(#5478)
ACL updates currently get `(currentAcls, currentVersion)` for the resource
from ZK and do a conditional update using `(currentAcls+newAcl,
currentVersion)`. This supports concurrent atomic updates if the resource path
already exists in ZK. If the path doesn't exist, we currently do a conditional
createOrUpdate using `(newAcl, -1)`. But `-1` has a special meaning in
ZooKeeper for update operations - it means match any version. So two brokers
adding acls using `(newAcl1, -1)` and `(newA [...]
This commit fixes the version used for conditional updates in ZooKeeper. It
also replaces the confusing `ZkVersion.NoVersion=-1` used for
`set(any-version)` and `get(return not-found)` with `ZkVersion.MatchAnyVersion`
for `set(any-version)` and `ZkVersion.UnknownVersion` for `get(return
not-found)` to avoid the return value from `get` matching arbitrary values in
`set`.
---
.../kafka/security/auth/SimpleAclAuthorizer.scala | 16 +++--
core/src/main/scala/kafka/zk/KafkaZkClient.scala | 81 ++++++++++++----------
core/src/main/scala/kafka/zk/ZkData.scala | 3 +-
.../scala/unit/kafka/zk/KafkaZkClientTest.scala | 20 ++++--
4 files changed, 70 insertions(+), 50 deletions(-)
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 4ff8f3e..9472411 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -22,11 +22,11 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
import com.typesafe.scalalogging.Logger
import kafka.api.KAFKA_2_0_IV1
import kafka.network.RequestChannel.Session
-import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
+import kafka.security.auth.SimpleAclAuthorizer.{VersionedAcls, NoAcls}
import kafka.server.KafkaConfig
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils._
-import kafka.zk.{AclChangeNotificationHandler, AclChangeSubscription,
KafkaZkClient, ZkAclChangeStore, ZkAclStore}
+import kafka.zk.{AclChangeNotificationHandler, AclChangeSubscription,
KafkaZkClient, ZkAclChangeStore, ZkAclStore, ZkVersion}
import org.apache.kafka.common.errors.UnsupportedVersionException
import org.apache.kafka.common.resource.PatternType
import org.apache.kafka.common.security.auth.KafkaPrincipal
@@ -48,7 +48,10 @@ object SimpleAclAuthorizer {
//If set to true when no acls are found for a resource , authorizer allows
access to everyone. Defaults to false.
val AllowEveryoneIfNoAclIsFoundProp = "allow.everyone.if.no.acl.found"
- case class VersionedAcls(acls: Set[Acl], zkVersion: Int)
+ case class VersionedAcls(acls: Set[Acl], zkVersion: Int) {
+ def exists: Boolean = zkVersion != ZkVersion.UnknownVersion
+ }
+ val NoAcls = VersionedAcls(Set.empty, ZkVersion.UnknownVersion)
}
class SimpleAclAuthorizer extends Authorizer with Logging {
@@ -196,7 +199,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
override def removeAcls(resource: Resource): Boolean = {
inWriteLock(lock) {
val result = zkClient.deleteResource(resource)
- updateCache(resource, VersionedAcls(Set(), 0))
+ updateCache(resource, NoAcls)
updateAclChangedFlag(resource)
result
}
@@ -306,7 +309,10 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
val newAcls = getNewAcls(currentVersionedAcls.acls)
val (updateSucceeded, updateVersion) =
if (newAcls.nonEmpty) {
- zkClient.conditionalSetOrCreateAclsForResource(resource, newAcls,
currentVersionedAcls.zkVersion)
+ if (currentVersionedAcls.exists)
+ zkClient.conditionalSetAclsForResource(resource, newAcls,
currentVersionedAcls.zkVersion)
+ else
+ zkClient.createAclsForResourceIfNotExists(resource, newAcls)
} else {
trace(s"Deleting path for $resource because it had no ACLs
remaining")
(zkClient.conditionalDelete(resource,
currentVersionedAcls.zkVersion), 0)
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index ec4932a..0406457 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -24,7 +24,7 @@ import kafka.cluster.Broker
import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.log.LogConfig
import kafka.metrics.KafkaMetricsGroup
-import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
+import kafka.security.auth.SimpleAclAuthorizer.{VersionedAcls, NoAcls}
import kafka.security.auth.{Acl, Resource, ResourceType}
import kafka.server.ConfigType
import kafka.utils.Logging
@@ -87,7 +87,7 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
def updateBrokerInfoInZk(brokerInfo: BrokerInfo): Unit = {
val brokerIdPath = brokerInfo.path
- val setDataRequest = SetDataRequest(brokerIdPath, brokerInfo.toJsonBytes,
ZkVersion.NoVersion)
+ val setDataRequest = SetDataRequest(brokerIdPath, brokerInfo.toJsonBytes,
ZkVersion.MatchAnyVersion)
val response = retryRequestUntilConnected(setDataRequest)
response.maybeThrow()
info("Updated broker %d at path %s with addresses:
%s".format(brokerInfo.broker.id, brokerIdPath, brokerInfo.broker.endPoints))
@@ -259,7 +259,8 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName:
String, config: Properties) = {
def set(configData: Array[Byte]): SetDataResponse = {
- val setDataRequest =
SetDataRequest(ConfigEntityZNode.path(rootEntityType, sanitizedEntityName),
ConfigEntityZNode.encode(config), ZkVersion.NoVersion)
+ val setDataRequest =
SetDataRequest(ConfigEntityZNode.path(rootEntityType, sanitizedEntityName),
+ ConfigEntityZNode.encode(config), ZkVersion.MatchAnyVersion)
retryRequestUntilConnected(setDataRequest)
}
@@ -372,7 +373,7 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
* @return SetDataResponse
*/
def setTopicAssignmentRaw(topic: String, assignment:
collection.Map[TopicPartition, Seq[Int]]): SetDataResponse = {
- val setDataRequest = SetDataRequest(TopicZNode.path(topic),
TopicZNode.encode(assignment), ZkVersion.NoVersion)
+ val setDataRequest = SetDataRequest(TopicZNode.path(topic),
TopicZNode.encode(assignment), ZkVersion.MatchAnyVersion)
retryRequestUntilConnected(setDataRequest)
}
@@ -447,7 +448,7 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
*/
def deleteLogDirEventNotifications(sequenceNumbers: Seq[String]): Unit = {
val deleteRequests = sequenceNumbers.map { sequenceNumber =>
- DeleteRequest(LogDirEventNotificationSequenceZNode.path(sequenceNumber),
ZkVersion.NoVersion)
+ DeleteRequest(LogDirEventNotificationSequenceZNode.path(sequenceNumber),
ZkVersion.MatchAnyVersion)
}
retryRequestsUntilConnected(deleteRequests)
}
@@ -547,12 +548,12 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
* @param path zk node path
* @return A tuple of 2 elements, where first element is zk node data as an
array of bytes
* and second element is zk node version.
- * returns (None, ZkVersion.NoVersion) if node doesn't exists and
throws exception for any error
+ * returns (None, ZkVersion.UnknownVersion) if node doesn't exist
and throws exception for any error
*/
def getDataAndVersion(path: String): (Option[Array[Byte]], Int) = {
val (data, stat) = getDataAndStat(path)
stat match {
- case ZkStat.NoStat => (data, ZkVersion.NoVersion)
+ case ZkStat.NoStat => (data, ZkVersion.UnknownVersion)
case _ => (data, stat.getVersion)
}
}
@@ -591,7 +592,7 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
/**
* Conditional update the persistent path data, return (true, newVersion) if
it succeeds, otherwise (the path doesn't
- * exist, the current version is not the expected version, etc.) return
(false, ZkVersion.NoVersion)
+ * exist, the current version is not the expected version, etc.) return
(false, ZkVersion.UnknownVersion)
*
* When there is a ConnectionLossException during the conditional update,
ZookeeperClient will retry the update and may fail
* since the previous update may have succeeded (but the stored zkVersion no
longer matches the expected one).
@@ -616,13 +617,13 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
debug("Checker method is not passed skipping zkData match")
debug("Conditional update of path %s with data %s and expected
version %d failed due to %s"
.format(path, Utils.utf8(data), expectVersion,
setDataResponse.resultException.get.getMessage))
- (false, ZkVersion.NoVersion)
+ (false, ZkVersion.UnknownVersion)
}
case Code.NONODE =>
debug("Conditional update of path %s with data %s and expected version
%d failed due to %s".format(path,
Utils.utf8(data), expectVersion,
setDataResponse.resultException.get.getMessage))
- (false, ZkVersion.NoVersion)
+ (false, ZkVersion.UnknownVersion)
case _ =>
debug("Conditional update of path %s with data %s and expected version
%d failed due to %s".format(path,
@@ -667,7 +668,7 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
* @param topics the topics to remove.
*/
def deleteTopicDeletions(topics: Seq[String]): Unit = {
- val deleteRequests = topics.map(topic =>
DeleteRequest(DeleteTopicsTopicZNode.path(topic), ZkVersion.NoVersion))
+ val deleteRequests = topics.map(topic =>
DeleteRequest(DeleteTopicsTopicZNode.path(topic), ZkVersion.MatchAnyVersion))
retryRequestsUntilConnected(deleteRequests)
}
@@ -701,7 +702,7 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
def setOrCreatePartitionReassignment(reassignment:
collection.Map[TopicPartition, Seq[Int]]): Unit = {
def set(reassignmentData: Array[Byte]): SetDataResponse = {
- val setDataRequest = SetDataRequest(ReassignPartitionsZNode.path,
reassignmentData, ZkVersion.NoVersion)
+ val setDataRequest = SetDataRequest(ReassignPartitionsZNode.path,
reassignmentData, ZkVersion.MatchAnyVersion)
retryRequestUntilConnected(setDataRequest)
}
@@ -734,7 +735,7 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
* Deletes the partition reassignment znode.
*/
def deletePartitionReassignment(): Unit = {
- val deleteRequest = DeleteRequest(ReassignPartitionsZNode.path,
ZkVersion.NoVersion)
+ val deleteRequest = DeleteRequest(ReassignPartitionsZNode.path,
ZkVersion.MatchAnyVersion)
retryRequestUntilConnected(deleteRequest)
}
@@ -855,7 +856,7 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
*/
def deleteIsrChangeNotifications(sequenceNumbers: Seq[String]): Unit = {
val deleteRequests = sequenceNumbers.map { sequenceNumber =>
- DeleteRequest(IsrChangeNotificationSequenceZNode.path(sequenceNumber),
ZkVersion.NoVersion)
+ DeleteRequest(IsrChangeNotificationSequenceZNode.path(sequenceNumber),
ZkVersion.MatchAnyVersion)
}
retryRequestsUntilConnected(deleteRequests)
}
@@ -887,7 +888,7 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
* Deletes the preferred replica election znode.
*/
def deletePreferredReplicaElection(): Unit = {
- val deleteRequest = DeleteRequest(PreferredReplicaElectionZNode.path,
ZkVersion.NoVersion)
+ val deleteRequest = DeleteRequest(PreferredReplicaElectionZNode.path,
ZkVersion.MatchAnyVersion)
retryRequestUntilConnected(deleteRequest)
}
@@ -909,7 +910,7 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
* Deletes the controller znode.
*/
def deleteController(): Unit = {
- val deleteRequest = DeleteRequest(ControllerZNode.path,
ZkVersion.NoVersion)
+ val deleteRequest = DeleteRequest(ControllerZNode.path,
ZkVersion.MatchAnyVersion)
retryRequestUntilConnected(deleteRequest)
}
@@ -942,7 +943,7 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
* @param topics the topics whose configs we wish to delete.
*/
def deleteTopicConfigs(topics: Seq[String]): Unit = {
- val deleteRequests = topics.map(topic =>
DeleteRequest(ConfigEntityZNode.path(ConfigType.Topic, topic),
ZkVersion.NoVersion))
+ val deleteRequests = topics.map(topic =>
DeleteRequest(ConfigEntityZNode.path(ConfigType.Topic, topic),
ZkVersion.MatchAnyVersion))
retryRequestsUntilConnected(deleteRequests)
}
@@ -970,7 +971,7 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
val getDataResponse = retryRequestUntilConnected(getDataRequest)
getDataResponse.resultCode match {
case Code.OK => ResourceZNode.decode(getDataResponse.data,
getDataResponse.stat)
- case Code.NONODE => VersionedAcls(Set(), -1)
+ case Code.NONODE => NoAcls
case _ => throw getDataResponse.resultException.get
}
}
@@ -983,12 +984,26 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
* @param expectedVersion
* @return true if the update was successful and the new version
*/
- def conditionalSetOrCreateAclsForResource(resource: Resource, aclsSet:
Set[Acl], expectedVersion: Int): (Boolean, Int) = {
+ def conditionalSetAclsForResource(resource: Resource, aclsSet: Set[Acl],
expectedVersion: Int): (Boolean, Int) = {
def set(aclData: Array[Byte], expectedVersion: Int): SetDataResponse = {
val setDataRequest = SetDataRequest(ResourceZNode.path(resource),
aclData, expectedVersion)
retryRequestUntilConnected(setDataRequest)
}
+ if (expectedVersion < 0)
+ throw new IllegalArgumentException(s"Invalid version $expectedVersion
provided for conditional update")
+
+ val aclData = ResourceZNode.encode(aclsSet)
+
+ val setDataResponse = set(aclData, expectedVersion)
+ setDataResponse.resultCode match {
+ case Code.OK => (true, setDataResponse.stat.getVersion)
+ case Code.NONODE | Code.BADVERSION => (false, ZkVersion.UnknownVersion)
+ case _ => throw setDataResponse.resultException.get
+ }
+ }
+
+ def createAclsForResourceIfNotExists(resource: Resource, aclsSet: Set[Acl]):
(Boolean, Int) = {
def create(aclData: Array[Byte]): CreateResponse = {
val path = ResourceZNode.path(resource)
val createRequest = CreateRequest(path, aclData, acls(path),
CreateMode.PERSISTENT)
@@ -997,19 +1012,11 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
val aclData = ResourceZNode.encode(aclsSet)
- val setDataResponse = set(aclData, expectedVersion)
- setDataResponse.resultCode match {
- case Code.OK => (true, setDataResponse.stat.getVersion)
- case Code.NONODE => {
- val createResponse = create(aclData)
- createResponse.resultCode match {
- case Code.OK => (true, 0)
- case Code.NODEEXISTS => (false, 0)
- case _ => throw createResponse.resultException.get
- }
- }
- case Code.BADVERSION => (false, 0)
- case _ => throw setDataResponse.resultException.get
+ val createResponse = create(aclData)
+ createResponse.resultCode match {
+ case Code.OK => (true, 0)
+ case Code.NODEEXISTS => (false, ZkVersion.UnknownVersion)
+ case _ => throw createResponse.resultException.get
}
}
@@ -1060,7 +1067,7 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
*/
private def deleteAclChangeNotifications(aclChangePath: String,
sequenceNodes: Seq[String]): Unit = {
val deleteRequests = sequenceNodes.map { sequenceNode =>
- DeleteRequest(s"$aclChangePath/$sequenceNode", ZkVersion.NoVersion)
+ DeleteRequest(s"$aclChangePath/$sequenceNode", ZkVersion.MatchAnyVersion)
}
val deleteResponses = retryRequestsUntilConnected(deleteRequests)
@@ -1162,7 +1169,7 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
def setOrCreateDelegationToken(token: DelegationToken): Unit = {
def set(tokenData: Array[Byte]): SetDataResponse = {
- val setDataRequest =
SetDataRequest(DelegationTokenInfoZNode.path(token.tokenInfo().tokenId()),
tokenData, ZkVersion.NoVersion)
+ val setDataRequest =
SetDataRequest(DelegationTokenInfoZNode.path(token.tokenInfo().tokenId()),
tokenData, ZkVersion.MatchAnyVersion)
retryRequestUntilConnected(setDataRequest)
}
@@ -1344,7 +1351,7 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
* @return sequence number as the broker id
*/
def generateBrokerSequenceId(): Int = {
- val setDataRequest = SetDataRequest(BrokerSequenceIdZNode.path,
Array.empty[Byte], -1)
+ val setDataRequest = SetDataRequest(BrokerSequenceIdZNode.path,
Array.empty[Byte], ZkVersion.MatchAnyVersion)
val setDataResponse = retryRequestUntilConnected(setDataRequest)
setDataResponse.resultCode match {
case Code.OK => setDataResponse.stat.getVersion
@@ -1373,7 +1380,7 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
private def setConsumerOffset(group: String, topicPartition: TopicPartition,
offset: Long): SetDataResponse = {
val setDataRequest = SetDataRequest(ConsumerOffset.path(group,
topicPartition.topic, topicPartition.partition),
- ConsumerOffset.encode(offset), ZkVersion.NoVersion)
+ ConsumerOffset.encode(offset), ZkVersion.MatchAnyVersion)
retryRequestUntilConnected(setDataRequest)
}
@@ -1393,7 +1400,7 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
getChildrenResponse.resultCode match {
case Code.OK =>
getChildrenResponse.children.foreach(child =>
deleteRecursive(s"$path/$child"))
- val deleteResponse = retryRequestUntilConnected(DeleteRequest(path,
ZkVersion.NoVersion))
+ val deleteResponse = retryRequestUntilConnected(DeleteRequest(path,
ZkVersion.MatchAnyVersion))
if (deleteResponse.resultCode != Code.OK && deleteResponse.resultCode
!= Code.NONODE) {
throw deleteResponse.resultException.get
}
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala
b/core/src/main/scala/kafka/zk/ZkData.scala
index d2b2333..f918b61 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -436,7 +436,8 @@ object ConsumerOffset {
}
object ZkVersion {
- val NoVersion = -1
+ val MatchAnyVersion = -1 // if used in a conditional set, matches any
version (the value should match ZooKeeper codebase)
+ val UnknownVersion = -2 // Version returned from get if node does not exist
(internal constant for Kafka codebase, unused value in ZK)
}
object ZkStat {
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index cc67a01..77da8bd 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -176,7 +176,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
// test with non-existing path
val (data0, version0) = zkClient.getDataAndVersion(path)
assertTrue(data0.isEmpty)
- assertEquals(-1, version0)
+ assertEquals(ZkVersion.UnknownVersion, version0)
// create a test path
zkClient.createRecursive(path)
@@ -200,7 +200,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
// test with non-existing path
var statusAndVersion = zkClient.conditionalUpdatePath(path,
"version0".getBytes(UTF_8), 0)
assertFalse(statusAndVersion._1)
- assertEquals(-1, statusAndVersion._2)
+ assertEquals(ZkVersion.UnknownVersion, statusAndVersion._2)
// create path
zkClient.createRecursive(path)
@@ -213,7 +213,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
// test with invalid expected version
statusAndVersion = zkClient.conditionalUpdatePath(path,
"version2".getBytes(UTF_8), 2)
assertFalse(statusAndVersion._1)
- assertEquals(-1, statusAndVersion._2)
+ assertEquals(ZkVersion.UnknownVersion, statusAndVersion._2)
}
@Test
@@ -446,7 +446,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
// try getting acls for non-existing resource
var versionedAcls = zkClient.getVersionedAclsForResource(resource1)
assertTrue(versionedAcls.acls.isEmpty)
- assertEquals(-1, versionedAcls.zkVersion)
+ assertEquals(ZkVersion.UnknownVersion, versionedAcls.zkVersion)
assertFalse(zkClient.resourceExists(resource1))
@@ -454,9 +454,15 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
val acl2 = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob"),
Allow, "*", Read)
val acl3 = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob"),
Deny, "host1", Read)
+ // Conditional set should fail if path not created
+ assertFalse(zkClient.conditionalSetAclsForResource(resource1, Set(acl1,
acl3), 0)._1)
+
//create acls for resources
- zkClient.conditionalSetOrCreateAclsForResource(resource1, Set(acl1,
acl2), 0)
- zkClient.conditionalSetOrCreateAclsForResource(resource2, Set(acl1,
acl3), 0)
+ assertTrue(zkClient.createAclsForResourceIfNotExists(resource1,
Set(acl1, acl2))._1)
+ assertTrue(zkClient.createAclsForResourceIfNotExists(resource2,
Set(acl1, acl3))._1)
+
+ // Create should fail if path already exists
+ assertFalse(zkClient.createAclsForResourceIfNotExists(resource2,
Set(acl1, acl3))._1)
versionedAcls = zkClient.getVersionedAclsForResource(resource1)
assertEquals(Set(acl1, acl2), versionedAcls.acls)
@@ -464,7 +470,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertTrue(zkClient.resourceExists(resource1))
//update acls for resource
- zkClient.conditionalSetOrCreateAclsForResource(resource1, Set(acl1,
acl3), 0)
+ assertTrue(zkClient.conditionalSetAclsForResource(resource1, Set(acl1,
acl3), 0)._1)
versionedAcls = zkClient.getVersionedAclsForResource(resource1)
assertEquals(Set(acl1, acl3), versionedAcls.acls)