This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ce19f34  KAFKA-7255: Fix timing issue with create/update in 
SimpleAclAuthorizer (#5478)
ce19f34 is described below

commit ce19f34f1e3e6e697cce2e01721bedc850fca698
Author: Rajini Sivaram <[email protected]>
AuthorDate: Wed Aug 8 17:44:57 2018 +0100

    KAFKA-7255: Fix timing issue with create/update in SimpleAclAuthorizer 
(#5478)
    
    ACL updates currently get `(currentAcls, currentVersion)` for the resource 
from ZK and do a conditional update using `(currentAcls+newAcl, 
currentVersion)`. This supports concurrent atomic updates if the resource path 
already exists in ZK. If the path doesn't exist, we currently do a conditional 
createOrUpdate using `(newAcl, -1)`. But `-1` has a special meaning in 
ZooKeeper for update operations - it means match any version. So two brokers 
adding acls using `(newAcl1, -1)` and `(newA [...]
    
    This commit fixes the version used for conditional updates in ZooKeeper. It 
also replaces the confusing `ZkVersion.NoVersion=-1` used for 
`set(any-version)` and `get(return not-found)` with `ZkVersion.MatchAnyVersion` 
for `set(any-version)` and `ZkVersion.UnknownVersion` for `get(return 
not-found)` to avoid the return value from `get` matching arbitrary values in 
`set`.
---
 .../kafka/security/auth/SimpleAclAuthorizer.scala  | 16 +++--
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   | 81 ++++++++++++----------
 core/src/main/scala/kafka/zk/ZkData.scala          |  3 +-
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala    | 20 ++++--
 4 files changed, 70 insertions(+), 50 deletions(-)

diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala 
b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index e77656d..7ec572c 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -22,11 +22,11 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
 import com.typesafe.scalalogging.Logger
 import kafka.api.KAFKA_2_0_IV1
 import kafka.network.RequestChannel.Session
-import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
+import kafka.security.auth.SimpleAclAuthorizer.{VersionedAcls, NoAcls}
 import kafka.server.KafkaConfig
 import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils._
-import kafka.zk.{AclChangeNotificationHandler, AclChangeSubscription, 
KafkaZkClient, ZkAclChangeStore, ZkAclStore}
+import kafka.zk.{AclChangeNotificationHandler, AclChangeSubscription, 
KafkaZkClient, ZkAclChangeStore, ZkAclStore, ZkVersion}
 import org.apache.kafka.common.errors.UnsupportedVersionException
 import org.apache.kafka.common.resource.PatternType
 import org.apache.kafka.common.security.auth.KafkaPrincipal
@@ -48,7 +48,10 @@ object SimpleAclAuthorizer {
   //If set to true when no acls are found for a resource , authorizer allows 
access to everyone. Defaults to false.
   val AllowEveryoneIfNoAclIsFoundProp = "allow.everyone.if.no.acl.found"
 
-  case class VersionedAcls(acls: Set[Acl], zkVersion: Int)
+  case class VersionedAcls(acls: Set[Acl], zkVersion: Int) {
+    def exists: Boolean = zkVersion != ZkVersion.UnknownVersion
+  }
+  val NoAcls = VersionedAcls(Set.empty, ZkVersion.UnknownVersion)
 }
 
 class SimpleAclAuthorizer extends Authorizer with Logging {
@@ -204,7 +207,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
   override def removeAcls(resource: Resource): Boolean = {
     inWriteLock(lock) {
       val result = zkClient.deleteResource(resource)
-      updateCache(resource, VersionedAcls(Set(), 0))
+      updateCache(resource, NoAcls)
       updateAclChangedFlag(resource)
       result
     }
@@ -314,7 +317,10 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
       val newAcls = getNewAcls(currentVersionedAcls.acls)
       val (updateSucceeded, updateVersion) =
         if (newAcls.nonEmpty) {
-          zkClient.conditionalSetOrCreateAclsForResource(resource, newAcls, 
currentVersionedAcls.zkVersion)
+          if (currentVersionedAcls.exists)
+            zkClient.conditionalSetAclsForResource(resource, newAcls, 
currentVersionedAcls.zkVersion)
+          else
+            zkClient.createAclsForResourceIfNotExists(resource, newAcls)
         } else {
           trace(s"Deleting path for $resource because it had no ACLs 
remaining")
           (zkClient.conditionalDelete(resource, 
currentVersionedAcls.zkVersion), 0)
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala 
b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index c45a90f..c807965 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -24,7 +24,7 @@ import kafka.cluster.Broker
 import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.log.LogConfig
 import kafka.metrics.KafkaMetricsGroup
-import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
+import kafka.security.auth.SimpleAclAuthorizer.{VersionedAcls, NoAcls}
 import kafka.security.auth.{Acl, Resource, ResourceType}
 import kafka.server.ConfigType
 import kafka.utils.Logging
@@ -98,7 +98,7 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
 
   def updateBrokerInfo(brokerInfo: BrokerInfo): Unit = {
     val brokerIdPath = brokerInfo.path
-    val setDataRequest = SetDataRequest(brokerIdPath, brokerInfo.toJsonBytes, 
ZkVersion.NoVersion)
+    val setDataRequest = SetDataRequest(brokerIdPath, brokerInfo.toJsonBytes, 
ZkVersion.MatchAnyVersion)
     val response = retryRequestUntilConnected(setDataRequest)
     response.maybeThrow()
     info("Updated broker %d at path %s with addresses: 
%s".format(brokerInfo.broker.id, brokerIdPath, brokerInfo.broker.endPoints))
@@ -270,7 +270,8 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
   def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: 
String, config: Properties) = {
 
     def set(configData: Array[Byte]): SetDataResponse = {
-      val setDataRequest = 
SetDataRequest(ConfigEntityZNode.path(rootEntityType, sanitizedEntityName), 
ConfigEntityZNode.encode(config), ZkVersion.NoVersion)
+      val setDataRequest = 
SetDataRequest(ConfigEntityZNode.path(rootEntityType, sanitizedEntityName),
+        ConfigEntityZNode.encode(config), ZkVersion.MatchAnyVersion)
       retryRequestUntilConnected(setDataRequest)
     }
 
@@ -383,7 +384,7 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
    * @return SetDataResponse
    */
   def setTopicAssignmentRaw(topic: String, assignment: 
collection.Map[TopicPartition, Seq[Int]]): SetDataResponse = {
-    val setDataRequest = SetDataRequest(TopicZNode.path(topic), 
TopicZNode.encode(assignment), ZkVersion.NoVersion)
+    val setDataRequest = SetDataRequest(TopicZNode.path(topic), 
TopicZNode.encode(assignment), ZkVersion.MatchAnyVersion)
     retryRequestUntilConnected(setDataRequest)
   }
 
@@ -458,7 +459,7 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
    */
   def deleteLogDirEventNotifications(sequenceNumbers: Seq[String]): Unit = {
     val deleteRequests = sequenceNumbers.map { sequenceNumber =>
-      DeleteRequest(LogDirEventNotificationSequenceZNode.path(sequenceNumber), 
ZkVersion.NoVersion)
+      DeleteRequest(LogDirEventNotificationSequenceZNode.path(sequenceNumber), 
ZkVersion.MatchAnyVersion)
     }
     retryRequestsUntilConnected(deleteRequests)
   }
@@ -558,12 +559,12 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
    * @param path zk node path
    * @return A tuple of 2 elements, where first element is zk node data as an 
array of bytes
    *         and second element is zk node version.
-   *         returns (None, ZkVersion.NoVersion) if node doesn't exists and 
throws exception for any error
+   *         returns (None, ZkVersion.UnknownVersion) if node doesn't exist 
and throws exception for any error
    */
   def getDataAndVersion(path: String): (Option[Array[Byte]], Int) = {
     val (data, stat) = getDataAndStat(path)
     stat match {
-      case ZkStat.NoStat => (data, ZkVersion.NoVersion)
+      case ZkStat.NoStat => (data, ZkVersion.UnknownVersion)
       case _ => (data, stat.getVersion)
     }
   }
@@ -602,7 +603,7 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
 
   /**
    * Conditional update the persistent path data, return (true, newVersion) if 
it succeeds, otherwise (the path doesn't
-   * exist, the current version is not the expected version, etc.) return 
(false, ZkVersion.NoVersion)
+   * exist, the current version is not the expected version, etc.) return 
(false, ZkVersion.UnknownVersion)
    *
    * When there is a ConnectionLossException during the conditional update, 
ZookeeperClient will retry the update and may fail
    * since the previous update may have succeeded (but the stored zkVersion no 
longer matches the expected one).
@@ -627,13 +628,13 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
             debug("Checker method is not passed skipping zkData match")
             debug("Conditional update of path %s with data %s and expected 
version %d failed due to %s"
               .format(path, Utils.utf8(data), expectVersion, 
setDataResponse.resultException.get.getMessage))
-            (false, ZkVersion.NoVersion)
+            (false, ZkVersion.UnknownVersion)
         }
 
       case Code.NONODE =>
         debug("Conditional update of path %s with data %s and expected version 
%d failed due to %s".format(path,
           Utils.utf8(data), expectVersion, 
setDataResponse.resultException.get.getMessage))
-        (false, ZkVersion.NoVersion)
+        (false, ZkVersion.UnknownVersion)
 
       case _ =>
         debug("Conditional update of path %s with data %s and expected version 
%d failed due to %s".format(path,
@@ -678,7 +679,7 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
    * @param topics the topics to remove.
    */
   def deleteTopicDeletions(topics: Seq[String]): Unit = {
-    val deleteRequests = topics.map(topic => 
DeleteRequest(DeleteTopicsTopicZNode.path(topic), ZkVersion.NoVersion))
+    val deleteRequests = topics.map(topic => 
DeleteRequest(DeleteTopicsTopicZNode.path(topic), ZkVersion.MatchAnyVersion))
     retryRequestsUntilConnected(deleteRequests)
   }
 
@@ -712,7 +713,7 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
   def setOrCreatePartitionReassignment(reassignment: 
collection.Map[TopicPartition, Seq[Int]]): Unit = {
 
     def set(reassignmentData: Array[Byte]): SetDataResponse = {
-      val setDataRequest = SetDataRequest(ReassignPartitionsZNode.path, 
reassignmentData, ZkVersion.NoVersion)
+      val setDataRequest = SetDataRequest(ReassignPartitionsZNode.path, 
reassignmentData, ZkVersion.MatchAnyVersion)
       retryRequestUntilConnected(setDataRequest)
     }
 
@@ -745,7 +746,7 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
    * Deletes the partition reassignment znode.
    */
   def deletePartitionReassignment(): Unit = {
-    val deleteRequest = DeleteRequest(ReassignPartitionsZNode.path, 
ZkVersion.NoVersion)
+    val deleteRequest = DeleteRequest(ReassignPartitionsZNode.path, 
ZkVersion.MatchAnyVersion)
     retryRequestUntilConnected(deleteRequest)
   }
 
@@ -866,7 +867,7 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
    */
   def deleteIsrChangeNotifications(sequenceNumbers: Seq[String]): Unit = {
     val deleteRequests = sequenceNumbers.map { sequenceNumber =>
-      DeleteRequest(IsrChangeNotificationSequenceZNode.path(sequenceNumber), 
ZkVersion.NoVersion)
+      DeleteRequest(IsrChangeNotificationSequenceZNode.path(sequenceNumber), 
ZkVersion.MatchAnyVersion)
     }
     retryRequestsUntilConnected(deleteRequests)
   }
@@ -898,7 +899,7 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
    * Deletes the preferred replica election znode.
    */
   def deletePreferredReplicaElection(): Unit = {
-    val deleteRequest = DeleteRequest(PreferredReplicaElectionZNode.path, 
ZkVersion.NoVersion)
+    val deleteRequest = DeleteRequest(PreferredReplicaElectionZNode.path, 
ZkVersion.MatchAnyVersion)
     retryRequestUntilConnected(deleteRequest)
   }
 
@@ -920,7 +921,7 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
    * Deletes the controller znode.
    */
   def deleteController(): Unit = {
-    val deleteRequest = DeleteRequest(ControllerZNode.path, 
ZkVersion.NoVersion)
+    val deleteRequest = DeleteRequest(ControllerZNode.path, 
ZkVersion.MatchAnyVersion)
     retryRequestUntilConnected(deleteRequest)
   }
 
@@ -953,7 +954,7 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
    * @param topics the topics whose configs we wish to delete.
    */
   def deleteTopicConfigs(topics: Seq[String]): Unit = {
-    val deleteRequests = topics.map(topic => 
DeleteRequest(ConfigEntityZNode.path(ConfigType.Topic, topic), 
ZkVersion.NoVersion))
+    val deleteRequests = topics.map(topic => 
DeleteRequest(ConfigEntityZNode.path(ConfigType.Topic, topic), 
ZkVersion.MatchAnyVersion))
     retryRequestsUntilConnected(deleteRequests)
   }
 
@@ -981,7 +982,7 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
     val getDataResponse = retryRequestUntilConnected(getDataRequest)
     getDataResponse.resultCode match {
       case Code.OK => ResourceZNode.decode(getDataResponse.data, 
getDataResponse.stat)
-      case Code.NONODE => VersionedAcls(Set(), -1)
+      case Code.NONODE => NoAcls
       case _ => throw getDataResponse.resultException.get
     }
   }
@@ -994,12 +995,26 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
    * @param expectedVersion
    * @return true if the update was successful and the new version
    */
-  def conditionalSetOrCreateAclsForResource(resource: Resource, aclsSet: 
Set[Acl], expectedVersion: Int): (Boolean, Int) = {
+  def conditionalSetAclsForResource(resource: Resource, aclsSet: Set[Acl], 
expectedVersion: Int): (Boolean, Int) = {
     def set(aclData: Array[Byte],  expectedVersion: Int): SetDataResponse = {
       val setDataRequest = SetDataRequest(ResourceZNode.path(resource), 
aclData, expectedVersion)
       retryRequestUntilConnected(setDataRequest)
     }
 
+    if (expectedVersion < 0)
+      throw new IllegalArgumentException(s"Invalid version $expectedVersion 
provided for conditional update")
+
+    val aclData = ResourceZNode.encode(aclsSet)
+
+    val setDataResponse = set(aclData, expectedVersion)
+    setDataResponse.resultCode match {
+      case Code.OK => (true, setDataResponse.stat.getVersion)
+      case Code.NONODE | Code.BADVERSION  => (false, ZkVersion.UnknownVersion)
+      case _ => throw setDataResponse.resultException.get
+    }
+  }
+
+  def createAclsForResourceIfNotExists(resource: Resource, aclsSet: Set[Acl]): 
(Boolean, Int) = {
     def create(aclData: Array[Byte]): CreateResponse = {
       val path = ResourceZNode.path(resource)
       val createRequest = CreateRequest(path, aclData, acls(path), 
CreateMode.PERSISTENT)
@@ -1008,19 +1023,11 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
 
     val aclData = ResourceZNode.encode(aclsSet)
 
-    val setDataResponse = set(aclData, expectedVersion)
-    setDataResponse.resultCode match {
-      case Code.OK => (true, setDataResponse.stat.getVersion)
-      case Code.NONODE => {
-        val createResponse = create(aclData)
-        createResponse.resultCode match {
-          case Code.OK => (true, 0)
-          case Code.NODEEXISTS => (false, 0)
-          case _ => throw createResponse.resultException.get
-        }
-      }
-      case Code.BADVERSION => (false, 0)
-      case _ => throw setDataResponse.resultException.get
+    val createResponse = create(aclData)
+    createResponse.resultCode match {
+      case Code.OK => (true, 0)
+      case Code.NODEEXISTS => (false, ZkVersion.UnknownVersion)
+      case _ => throw createResponse.resultException.get
     }
   }
 
@@ -1071,7 +1078,7 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
     */
   private def deleteAclChangeNotifications(aclChangePath: String, 
sequenceNodes: Seq[String]): Unit = {
     val deleteRequests = sequenceNodes.map { sequenceNode =>
-      DeleteRequest(s"$aclChangePath/$sequenceNode", ZkVersion.NoVersion)
+      DeleteRequest(s"$aclChangePath/$sequenceNode", ZkVersion.MatchAnyVersion)
     }
 
     val deleteResponses = retryRequestsUntilConnected(deleteRequests)
@@ -1173,7 +1180,7 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
   def setOrCreateDelegationToken(token: DelegationToken): Unit = {
 
     def set(tokenData: Array[Byte]): SetDataResponse = {
-      val setDataRequest = 
SetDataRequest(DelegationTokenInfoZNode.path(token.tokenInfo().tokenId()), 
tokenData, ZkVersion.NoVersion)
+      val setDataRequest = 
SetDataRequest(DelegationTokenInfoZNode.path(token.tokenInfo().tokenId()), 
tokenData, ZkVersion.MatchAnyVersion)
       retryRequestUntilConnected(setDataRequest)
     }
 
@@ -1355,7 +1362,7 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
     * @return sequence number as the broker id
     */
   def generateBrokerSequenceId(): Int = {
-    val setDataRequest = SetDataRequest(BrokerSequenceIdZNode.path, 
Array.empty[Byte], -1)
+    val setDataRequest = SetDataRequest(BrokerSequenceIdZNode.path, 
Array.empty[Byte], ZkVersion.MatchAnyVersion)
     val setDataResponse = retryRequestUntilConnected(setDataRequest)
     setDataResponse.resultCode match {
       case Code.OK => setDataResponse.stat.getVersion
@@ -1384,7 +1391,7 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
 
   private def setConsumerOffset(group: String, topicPartition: TopicPartition, 
offset: Long): SetDataResponse = {
     val setDataRequest = SetDataRequest(ConsumerOffset.path(group, 
topicPartition.topic, topicPartition.partition),
-      ConsumerOffset.encode(offset), ZkVersion.NoVersion)
+      ConsumerOffset.encode(offset), ZkVersion.MatchAnyVersion)
     retryRequestUntilConnected(setDataRequest)
   }
 
@@ -1404,7 +1411,7 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
     getChildrenResponse.resultCode match {
       case Code.OK =>
         getChildrenResponse.children.foreach(child => 
deleteRecursive(s"$path/$child"))
-        val deleteResponse = retryRequestUntilConnected(DeleteRequest(path, 
ZkVersion.NoVersion))
+        val deleteResponse = retryRequestUntilConnected(DeleteRequest(path, 
ZkVersion.MatchAnyVersion))
         if (deleteResponse.resultCode != Code.OK && deleteResponse.resultCode 
!= Code.NONODE) {
           throw deleteResponse.resultException.get
         }
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala 
b/core/src/main/scala/kafka/zk/ZkData.scala
index d2b2333..f918b61 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -436,7 +436,8 @@ object ConsumerOffset {
 }
 
 object ZkVersion {
-  val NoVersion = -1
+  val MatchAnyVersion = -1 // if used in a conditional set, matches any 
version (the value should match ZooKeeper codebase)
+  val UnknownVersion = -2  // Version returned from get if node does not exist 
(internal constant for Kafka codebase, unused value in ZK)
 }
 
 object ZkStat {
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala 
b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index df009e8..9cffb51 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -176,7 +176,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     // test with non-existing path
     val (data0, version0) = zkClient.getDataAndVersion(path)
     assertTrue(data0.isEmpty)
-    assertEquals(-1, version0)
+    assertEquals(ZkVersion.UnknownVersion, version0)
 
     // create a test path
     zkClient.createRecursive(path)
@@ -200,7 +200,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     // test with non-existing path
     var statusAndVersion = zkClient.conditionalUpdatePath(path, 
"version0".getBytes(UTF_8), 0)
     assertFalse(statusAndVersion._1)
-    assertEquals(-1, statusAndVersion._2)
+    assertEquals(ZkVersion.UnknownVersion, statusAndVersion._2)
 
     // create path
     zkClient.createRecursive(path)
@@ -213,7 +213,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     // test with invalid expected version
     statusAndVersion = zkClient.conditionalUpdatePath(path, 
"version2".getBytes(UTF_8), 2)
     assertFalse(statusAndVersion._1)
-    assertEquals(-1, statusAndVersion._2)
+    assertEquals(ZkVersion.UnknownVersion, statusAndVersion._2)
   }
 
   @Test
@@ -446,7 +446,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
       // try getting acls for non-existing resource
       var versionedAcls = zkClient.getVersionedAclsForResource(resource1)
       assertTrue(versionedAcls.acls.isEmpty)
-      assertEquals(-1, versionedAcls.zkVersion)
+      assertEquals(ZkVersion.UnknownVersion, versionedAcls.zkVersion)
       assertFalse(zkClient.resourceExists(resource1))
 
 
@@ -454,9 +454,15 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
       val acl2 = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob"), 
Allow, "*", Read)
       val acl3 = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob"), 
Deny, "host1", Read)
 
+      // Conditional set should fail if path not created
+      assertFalse(zkClient.conditionalSetAclsForResource(resource1, Set(acl1, 
acl3), 0)._1)
+
       //create acls for resources
-      zkClient.conditionalSetOrCreateAclsForResource(resource1, Set(acl1, 
acl2), 0)
-      zkClient.conditionalSetOrCreateAclsForResource(resource2, Set(acl1, 
acl3), 0)
+      assertTrue(zkClient.createAclsForResourceIfNotExists(resource1, 
Set(acl1, acl2))._1)
+      assertTrue(zkClient.createAclsForResourceIfNotExists(resource2, 
Set(acl1, acl3))._1)
+
+      // Create should fail if path already exists
+      assertFalse(zkClient.createAclsForResourceIfNotExists(resource2, 
Set(acl1, acl3))._1)
 
       versionedAcls = zkClient.getVersionedAclsForResource(resource1)
       assertEquals(Set(acl1, acl2), versionedAcls.acls)
@@ -464,7 +470,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
       assertTrue(zkClient.resourceExists(resource1))
 
       //update acls for resource
-      zkClient.conditionalSetOrCreateAclsForResource(resource1, Set(acl1, 
acl3), 0)
+      assertTrue(zkClient.conditionalSetAclsForResource(resource1, Set(acl1, 
acl3), 0)._1)
 
       versionedAcls = zkClient.getVersionedAclsForResource(resource1)
       assertEquals(Set(acl1, acl3), versionedAcls.acls)

Reply via email to