Repository: kafka Updated Branches: refs/heads/trunk 245fa2bd8 -> 102903046
MINOR: Fix setting of ACLs and ZK shutdown in test harnesses I found both issues while investigating the issue described in PR #1425. Author: Ismael Juma <[email protected]> Reviewers: Sriharsha Chintalapani <[email protected]>, Jun Rao <[email protected]> Closes #1455 from ijuma/fix-integration-test-harness-and-zk-test-harness Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/10290304 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/10290304 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/10290304 Branch: refs/heads/trunk Commit: 1029030466f01937d416e11f93562bcaaecce253 Parents: 245fa2b Author: Ismael Juma <[email protected]> Authored: Fri Jun 3 01:19:46 2016 +0100 Committer: Ismael Juma <[email protected]> Committed: Fri Jun 3 01:19:46 2016 +0100 ---------------------------------------------------------------------- .../kafka/api/EndToEndAuthorizationTest.scala | 7 ++-- .../integration/KafkaServerTestHarness.scala | 34 ++++++++++++-------- .../scala/unit/kafka/zk/EmbeddedZookeeper.scala | 10 ++++++ .../unit/kafka/zk/ZooKeeperTestHarness.scala | 13 -------- 4 files changed, 33 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/10290304/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala index fec96cd..e13f160 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala @@ -61,12 +61,11 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { override val producerCount = 1 override val consumerCount = 2 override val serverCount = 3 - override val setClusterAcl = Some { () => + + override def setAclsBeforeServersStart() { AclCommand.main(clusterAclArgs) - servers.foreach(s => - TestUtils.waitAndVerifyAcls(ClusterActionAcl, s.apis.authorizer.get, clusterResource) - ) } + val numRecords = 1 val group = "group" val topic = "e2etopic" http://git-wip-us.apache.org/repos/asf/kafka/blob/10290304/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index 8e8ae8b..7059d17 100755 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -38,7 +38,6 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness { var brokerList: String = null var alive: Array[Boolean] = null val kafkaPrincipalType = KafkaPrincipal.USER_TYPE - val setClusterAcl: Option[() => Unit] = None /** * Implementations must override this method to return a set of KafkaConfigs. This method will be invoked for every @@ -46,13 +45,26 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness { */ def generateConfigs(): Seq[KafkaConfig] + /** + * Override this in case ACLs must be set before `servers` are started. + * + * This is required in some cases because of the topic creation in the setup of `IntegrationTestHarness`. If the ACLs + * are only set later, tests may fail. The failure could manifest itself as a cluster action + * authorization exception when processing an update metadata request (controller -> broker) or in more obscure + * ways (e.g. __consumer_offsets topic replication fails because the metadata cache has no brokers as a previous + * update metadata request failed due to an authorization exception). + * + * The default implementation of this method is a no-op. + */ + def setAclsBeforeServersStart() {} + def configs: Seq[KafkaConfig] = { if (instanceConfigs == null) instanceConfigs = generateConfigs() instanceConfigs } - def serverForId(id: Int) = servers.find(s => s.config.brokerId == id) + def serverForId(id: Int): Option[KafkaServer] = servers.find(s => s.config.brokerId == id) protected def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT protected def trustStoreFile: Option[File] = None @@ -61,23 +73,17 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness { @Before override def setUp() { super.setUp - if (configs.size <= 0) + + if (configs.isEmpty) throw new KafkaException("Must supply at least one server config.") + + // default implementation is a no-op, it is overridden by subclasses if required + setAclsBeforeServersStart() + servers = configs.map(TestUtils.createServer(_)).toBuffer brokerList = TestUtils.getBrokerListStrFromServers(servers, securityProtocol) alive = new Array[Boolean](servers.length) Arrays.fill(alive, true) - // We need to set a cluster ACL in some cases here - // because of the topic creation in the setup of - // IntegrationTestHarness. If we don't, then tests - // fail with a cluster action authorization exception - // when processing an update metadata request - // (controller -> broker). - // - // The following method does nothing by default, but - // if the test case requires setting up a cluster ACL, - // then it needs to be implemented. - setClusterAcl.foreach(_.apply) } @After http://git-wip-us.apache.org/repos/asf/kafka/blob/10290304/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala index 1030c46..22465ea 100755 --- a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala +++ b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala @@ -39,6 +39,16 @@ class EmbeddedZookeeper() { def shutdown() { CoreUtils.swallow(zookeeper.shutdown()) CoreUtils.swallow(factory.shutdown()) + + def isDown(): Boolean = { + try { + ZkFourLetterWords.sendStat("127.0.0.1", port, 3000) + false + } catch { case _: Throwable => true } + } + + Iterator.continually(isDown()).exists(identity) + Utils.delete(logDir) Utils.delete(snapshotDir) } http://git-wip-us.apache.org/repos/asf/kafka/blob/10290304/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala index 0de11cd..305e074 100755 --- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala +++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala @@ -46,19 +46,6 @@ trait ZooKeeperTestHarness extends JUnitSuite with Logging { CoreUtils.swallow(zkUtils.close()) if (zookeeper != null) CoreUtils.swallow(zookeeper.shutdown()) - - def isDown(): Boolean = { - try { - ZkFourLetterWords.sendStat("127.0.0.1", zkPort, 3000) - false - } catch { case _: Throwable => - debug("Server is down") - true - } - } - - Iterator.continually(isDown()).exists(identity) - Configuration.setConfiguration(null) }
