Repository: kafka Updated Branches: refs/heads/trunk 430bf56cd -> f643d1b75
KAFKA-3783; Catch proper exception on path delete - ZkClient is used for conditional path deletion and wraps `KeeperException.BadVersionException` into `ZkBadVersionException` - add unit test to `SimpleAclAuthorizerTest` to reproduce the issue and catch potential future regression Author: Sebastien Launay <[email protected]> Reviewers: Ismael Juma <[email protected]> Closes #1461 from slaunay/bugfix/KAFKA-3783-zk-conditional-delete-path Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f643d1b7 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f643d1b7 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f643d1b7 Branch: refs/heads/trunk Commit: f643d1b75d17bb27a378c7e66fcc49607454e445 Parents: 430bf56 Author: Sebastien Launay <[email protected]> Authored: Tue Jun 7 01:22:58 2016 +0100 Committer: Ismael Juma <[email protected]> Committed: Tue Jun 7 01:22:58 2016 +0100 ---------------------------------------------------------------------- core/src/main/scala/kafka/utils/ZkUtils.scala | 4 +- .../security/auth/SimpleAclAuthorizerTest.scala | 20 ++++++- .../scala/unit/kafka/utils/ZkUtilsTest.scala | 55 ++++++++++++++++++++ 3 files changed, 76 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/f643d1b7/core/src/main/scala/kafka/utils/ZkUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index f02ab20..3788ef4 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -515,14 +515,14 @@ class ZkUtils(val zkClient: ZkClient, /** * Conditional delete the persistent path data, return true if it succeeds, - * otherwise (the current version is not the expected version) + * false otherwise (the current version is not the expected version) */ def conditionalDeletePath(path: String, expectedVersion: Int): Boolean = { try { zkClient.delete(path, expectedVersion) true } catch { - case e: KeeperException.BadVersionException => false + case e: ZkBadVersionException => false } } http://git-wip-us.apache.org/repos/asf/kafka/blob/f643d1b7/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala index 7fcc33d..1f52af4 100644 --- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala @@ -336,12 +336,30 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { aclId % 10 != 0 }.toSet - TestUtils.assertConcurrent("Should support many concurrent calls", concurrentFuctions, 15000) + TestUtils.assertConcurrent("Should support many concurrent calls", concurrentFuctions, 30 * 1000) TestUtils.waitAndVerifyAcls(expectedAcls, simpleAclAuthorizer, commonResource) TestUtils.waitAndVerifyAcls(expectedAcls, simpleAclAuthorizer2, commonResource) } + @Test + def testHighConcurrencyDeletionOfResourceAcls() { + val acl = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username), Allow, WildCardHost, All) + + // Alternate authorizer to keep adding and removing zookeeper path + val concurrentFuctions = (0 to 50).map { i => + () => { + simpleAclAuthorizer.addAcls(Set(acl), resource) + simpleAclAuthorizer2.removeAcls(Set(acl), resource) + } + } + + TestUtils.assertConcurrent("Should support many concurrent calls", concurrentFuctions, 30 * 1000) + + TestUtils.waitAndVerifyAcls(Set.empty[Acl], simpleAclAuthorizer, resource) + TestUtils.waitAndVerifyAcls(Set.empty[Acl], simpleAclAuthorizer2, resource) + } + private def changeAclAndVerify(originalAcls: Set[Acl], addedAcls: Set[Acl], removedAcls: Set[Acl], resource: Resource = resource): Set[Acl] = { var acls = originalAcls http://git-wip-us.apache.org/repos/asf/kafka/blob/f643d1b7/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala new file mode 100755 index 0000000..2d81ed9 --- /dev/null +++ b/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.utils + +import kafka.zk.ZooKeeperTestHarness +import org.junit.Assert._ +import org.junit.Test + +class ZkUtilsTest extends ZooKeeperTestHarness { + + val path = "/path" + + @Test + def testSuccessfulConditionalDeletePath() { + // Given an existing path + zkUtils.createPersistentPath(path) + val (_, statAfterCreation) = zkUtils.readData(path) + + // Deletion is successful when the version number matches + assertTrue("Deletion should be successful", zkUtils.conditionalDeletePath(path, statAfterCreation.getVersion)) + val (optionalData, _) = zkUtils.readDataMaybeNull(path) + assertTrue("Node should be deleted", optionalData.isEmpty) + + // Deletion is successful when the node does not exist too + assertTrue("Deletion should be successful", zkUtils.conditionalDeletePath(path, 0)) + } + + @Test + def testAbortedConditionalDeletePath() { + // Given an existing path that gets updated + zkUtils.createPersistentPath(path) + val (_, statAfterCreation) = zkUtils.readData(path) + zkUtils.updatePersistentPath(path, "data") + + // Deletion is aborted when the version number does not match + assertFalse("Deletion should be aborted", zkUtils.conditionalDeletePath(path, statAfterCreation.getVersion)) + val (optionalData, _) = zkUtils.readDataMaybeNull(path) + assertTrue("Node should still be there", optionalData.isDefined) + } +}
