Repository: kafka Updated Branches: refs/heads/trunk b6effcbba -> 12aa70d55
KAFKA-5713; Shutdown brokers in tests Add broker shutdown for `LeaderEpochIntegrationTest`. Move broker shutdown in other tests to `tearDown` to ensure brokers are shutdown even if tests fail. Also added assertion to `ZooKeeperTestHarness` to verify that controller event thread is not running since this thread may load JAAS configuration if ZK ports are reused. Author: Rajini Sivaram <[email protected]> Reviewers: Ismael Juma <[email protected]> Closes #3026 from rajinisivaram/KAFKA-5173 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/12aa70d5 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/12aa70d5 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/12aa70d5 Branch: refs/heads/trunk Commit: 12aa70d55bc422226255ab18e69e4bc6f24be2d9 Parents: b6effcb Author: Rajini Sivaram <[email protected]> Authored: Fri May 12 00:43:35 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Fri May 12 02:29:16 2017 +0100 ---------------------------------------------------------------------- .../kafka/api/ProducerCompressionTest.scala | 5 +- ...tenersWithSameSecurityProtocolBaseTest.scala | 7 +- .../ReplicaFetcherThreadFatalErrorTest.scala | 2 +- .../other/kafka/ReplicationQuotasTestRig.scala | 5 +- .../unit/kafka/admin/AddPartitionsTest.scala | 5 +- .../test/scala/unit/kafka/admin/AdminTest.scala | 209 +++++++++---------- .../unit/kafka/admin/DeleteTopicTest.scala | 43 ++-- .../admin/ReassignPartitionsClusterTest.scala | 5 +- .../controller/ControllerIntegrationTest.scala | 5 +- .../integration/KafkaServerTestHarness.scala | 3 +- .../kafka/integration/TopicMetadataTest.scala | 20 +- .../unit/kafka/producer/ProducerTest.scala | 5 +- .../unit/kafka/server/AdvertiseBrokerTest.scala | 7 +- .../unit/kafka/server/LeaderElectionTest.scala | 5 +- .../scala/unit/kafka/server/LogOffsetTest.scala | 3 +- .../unit/kafka/server/LogRecoveryTest.scala | 6 +- .../unit/kafka/server/OffsetCommitTest.scala | 3 +- .../unit/kafka/server/ReplicaFetchTest.scala | 2 +- .../kafka/server/ReplicationQuotasTest.scala | 9 +- .../server/ServerGenerateBrokerIdTest.scala | 78 +++---- .../server/ServerGenerateClusterIdTest.scala | 44 ++-- .../unit/kafka/server/ServerStartupTest.scala | 39 ++-- ...rivenReplicationProtocolAcceptanceTest.scala | 2 +- .../epoch/LeaderEpochIntegrationTest.scala | 15 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 17 +- .../unit/kafka/zk/ZooKeeperTestHarness.scala | 20 ++ 26 files changed, 265 insertions(+), 299 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala index b165918..2001095 100755 --- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala @@ -30,7 +30,7 @@ import kafka.server.{KafkaConfig, KafkaServer} import kafka.consumer.SimpleConsumer import kafka.message.Message import kafka.zk.ZooKeeperTestHarness -import kafka.utils.{CoreUtils, TestUtils} +import kafka.utils.TestUtils @RunWith(value = classOf[Parameterized]) @@ -53,8 +53,7 @@ class ProducerCompressionTest(compression: String) extends ZooKeeperTestHarness @After override def tearDown() { - server.shutdown - CoreUtils.delete(server.config.logDirs) + TestUtils.shutdownServers(Seq(server)) super.tearDown() } http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala index 88b314f..7db9d7c 100644 --- a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala +++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala @@ -26,7 +26,7 @@ import kafka.api.SaslSetup import kafka.common.Topic import kafka.coordinator.group.OffsetConfig import kafka.utils.JaasTestUtils.JaasSection -import kafka.utils.{CoreUtils, TestUtils} +import kafka.utils.TestUtils import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} @@ -133,10 +133,7 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends ZooKeep override def tearDown() { producers.values.foreach(_.close()) consumers.values.foreach(_.close()) - servers.foreach { s => - s.shutdown() - CoreUtils.delete(s.config.logDirs) - } + TestUtils.shutdownServers(servers) super.tearDown() closeSasl() } http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala index ae76eb6..9a04e67 100644 --- a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala +++ b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala @@ -45,7 +45,7 @@ class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness { @After override def tearDown() { Exit.resetExitProcedure() - brokers.foreach(_.shutdown()) + TestUtils.shutdownServers(brokers) super.tearDown() } http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala index 38d07ba..d8bc65e 100644 --- a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala +++ b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala @@ -27,7 +27,7 @@ import org.apache.kafka.common.TopicPartition import kafka.server.{KafkaConfig, KafkaServer, QuotaType} import kafka.utils.TestUtils._ import kafka.utils.ZkUtils._ -import kafka.utils.{CoreUtils, Exit, Logging, TestUtils, ZkUtils} +import kafka.utils.{Exit, Logging, TestUtils, ZkUtils} import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.clients.producer.ProducerRecord import org.jfree.chart.plot.PlotOrientation @@ -108,8 +108,7 @@ object ReplicationQuotasTestRig { } override def tearDown() { - servers.par.foreach(_.shutdown()) - servers.par.foreach(server => CoreUtils.delete(server.config.logDirs)) + TestUtils.shutdownServers(servers) super.tearDown() } http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala index d95d90d..e9c5ac5 100755 --- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -22,7 +22,7 @@ import org.junit.Assert._ import org.apache.kafka.common.protocol.SecurityProtocol import kafka.zk.ZooKeeperTestHarness import kafka.utils.TestUtils._ -import kafka.utils.{CoreUtils, TestUtils} +import kafka.utils.TestUtils import kafka.cluster.Broker import kafka.client.ClientUtils import kafka.server.{KafkaConfig, KafkaServer} @@ -59,8 +59,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness { @After override def tearDown() { - servers.foreach(_.shutdown()) - servers.foreach(server => CoreUtils.delete(server.config.logDirs)) + TestUtils.shutdownServers(servers) super.tearDown() } http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/admin/AdminTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index ec54608..f8c65eb 100755 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -22,7 +22,7 @@ import org.apache.kafka.common.errors.{InvalidReplicaAssignmentException, Invali import org.apache.kafka.common.metrics.Quota import org.easymock.EasyMock import org.junit.Assert._ -import org.junit.Test +import org.junit.{After, Test} import java.util.Properties import kafka.utils._ @@ -47,6 +47,14 @@ import scala.util.Try class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest { + var servers: Seq[KafkaServer] = Seq() + + @After + override def tearDown() { + TestUtils.shutdownServers(servers) + super.tearDown() + } + @Test def testReplicaAssignment() { val brokerMetadatas = (0 to 4).map(new BrokerMetadata(_, None)) @@ -188,7 +196,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topic = "test" // create brokers - val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) + servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment) // reassign partition 0 @@ -211,7 +219,6 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest { ensureNoUnderReplicatedPartitions(zkUtils, topic, partitionToBeReassigned, assignedReplicas, servers) TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, "New replicas should exist on brokers") - servers.foreach(_.shutdown()) } @Test @@ -219,7 +226,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topic = "test" // create brokers - val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) + servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment) // reassign partition 0 @@ -241,8 +248,6 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest { ensureNoUnderReplicatedPartitions(zkUtils, topic, partitionToBeReassigned, assignedReplicas, servers) TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, "New replicas should exist on brokers") - - servers.foreach(_.shutdown()) } @Test @@ -250,7 +255,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest { val expectedReplicaAssignment = Map(0 -> List(0, 1)) val topic = "test" // create brokers - val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) + servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment) // reassign partition 0 @@ -272,14 +277,13 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest { ensureNoUnderReplicatedPartitions(zkUtils, topic, partitionToBeReassigned, assignedReplicas, servers) TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, "New replicas should exist on brokers") - servers.foreach(_.shutdown()) } @Test def testReassigningNonExistingPartition() { val topic = "test" // create brokers - val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) + servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) // reassign partition 0 val newReplicas = Seq(2, 3) val partitionToBeReassigned = 0 @@ -288,7 +292,6 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest { assertFalse("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions()) val reassignedPartitions = zkUtils.getPartitionsBeingReassigned() assertFalse("Partition should not be reassigned", reassignedPartitions.contains(topicAndPartition)) - servers.foreach(_.shutdown()) } @Test @@ -305,7 +308,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest { val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, Map(topicAndPartition -> newReplicas)) reassignPartitionsCommand.reassignPartitions() // create brokers - val servers = TestUtils.createBrokerConfigs(2, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) + servers = TestUtils.createBrokerConfigs(2, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) // wait until reassignment completes TestUtils.waitUntilTrue(() => !checkIfReassignPartitionPathExists(zkUtils), @@ -317,7 +320,6 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest { ensureNoUnderReplicatedPartitions(zkUtils, topic, partitionToBeReassigned, assignedReplicas, servers) TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, "New replicas should exist on brokers") - servers.foreach(_.shutdown()) } @Test @@ -344,7 +346,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest { val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false, rackInfo = brokerRack).map(KafkaConfig.fromProps) // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment) - val servers = serverConfigs.reverseMap(s => TestUtils.createServer(s)) + servers = serverConfigs.reverseMap(s => TestUtils.createServer(s)) // broker 2 should be the leader since it was started first val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partition, oldLeaderOpt = None).get // trigger preferred replica election @@ -352,7 +354,6 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest { preferredReplicaElection.moveLeaderToPreferredReplica() val newLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partition, oldLeaderOpt = Some(currentLeader)).get assertEquals("Preferred replica election failed", preferredReplica, newLeader) - servers.foreach(_.shutdown()) } @Test @@ -362,7 +363,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest { val partition = 1 // create brokers val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false).map(KafkaConfig.fromProps) - val servers = serverConfigs.reverseMap(s => TestUtils.createServer(s)) + servers = serverConfigs.reverseMap(s => TestUtils.createServer(s)) // create the topic TestUtils.createTopic(zkUtils, topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = servers) @@ -373,36 +374,31 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest { controller.shutdownBroker(2, controlledShutdownCallback) var partitionsRemaining = resultQueue.take().get var activeServers = servers.filter(s => s.config.brokerId != 2) - try { - // wait for the update metadata request to trickle to the brokers - TestUtils.waitUntilTrue(() => - activeServers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3), - "Topic test not created after timeout") - assertEquals(0, partitionsRemaining.size) - var partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get - var leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader - assertEquals(0, leaderAfterShutdown) - assertEquals(2, partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size) - assertEquals(List(0,1), partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.isr) - - controller.shutdownBroker(1, controlledShutdownCallback) - partitionsRemaining = resultQueue.take().get - assertEquals(0, partitionsRemaining.size) - activeServers = servers.filter(s => s.config.brokerId == 0) - partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get - leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader - assertEquals(0, leaderAfterShutdown) - - assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0)) - controller.shutdownBroker(0, controlledShutdownCallback) - partitionsRemaining = resultQueue.take().get - assertEquals(1, partitionsRemaining.size) - // leader doesn't change since all the replicas are shut down - assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0)) - } - finally { - servers.foreach(_.shutdown()) - } + // wait for the update metadata request to trickle to the brokers + TestUtils.waitUntilTrue(() => + activeServers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3), + "Topic test not created after timeout") + assertEquals(0, partitionsRemaining.size) + var partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get + var leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader + assertEquals(0, leaderAfterShutdown) + assertEquals(2, partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size) + assertEquals(List(0,1), partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.isr) + + controller.shutdownBroker(1, controlledShutdownCallback) + partitionsRemaining = resultQueue.take().get + assertEquals(0, partitionsRemaining.size) + activeServers = servers.filter(s => s.config.brokerId == 0) + partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get + leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader + assertEquals(0, leaderAfterShutdown) + + assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0)) + controller.shutdownBroker(0, controlledShutdownCallback) + partitionsRemaining = resultQueue.take().get + assertEquals(1, partitionsRemaining.size) + // leader doesn't change since all the replicas are shut down + assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0)) } /** @@ -414,6 +410,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest { val partitions = 3 val topic = "my-topic" val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect))) + servers = Seq(server) def makeConfig(messageSize: Int, retentionMs: Long, throttledLeaders: String, throttledFollowers: String) = { val props = new Properties() @@ -446,51 +443,45 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest { } } - try { - // create a topic with a few config overrides and check that they are applied - val maxMessageSize = 1024 - val retentionMs = 1000 * 1000 - AdminUtils.createTopic(server.zkUtils, topic, partitions, 1, makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1")) - - //Standard topic configs will be propagated at topic creation time, but the quota manager will not have been updated. - checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1", false) + // create a topic with a few config overrides and check that they are applied + val maxMessageSize = 1024 + val retentionMs = 1000 * 1000 + AdminUtils.createTopic(server.zkUtils, topic, partitions, 1, makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1")) - //Update dynamically and all properties should be applied - AdminUtils.changeTopicConfig(server.zkUtils, topic, makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1")) + //Standard topic configs will be propagated at topic creation time, but the quota manager will not have been updated. + checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1", false) - checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1", true) + //Update dynamically and all properties should be applied + AdminUtils.changeTopicConfig(server.zkUtils, topic, makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1")) - // now double the config values for the topic and check that it is applied - val newConfig = makeConfig(2 * maxMessageSize, 2 * retentionMs, "*", "*") - AdminUtils.changeTopicConfig(server.zkUtils, topic, makeConfig(2 * maxMessageSize, 2 * retentionMs, "*", "*")) - checkConfig(2 * maxMessageSize, 2 * retentionMs, "*", "*", quotaManagerIsThrottled = true) + checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1", true) - // Verify that the same config can be read from ZK - val configInZk = AdminUtils.fetchEntityConfig(server.zkUtils, ConfigType.Topic, topic) - assertEquals(newConfig, configInZk) + // now double the config values for the topic and check that it is applied + val newConfig = makeConfig(2 * maxMessageSize, 2 * retentionMs, "*", "*") + AdminUtils.changeTopicConfig(server.zkUtils, topic, makeConfig(2 * maxMessageSize, 2 * retentionMs, "*", "*")) + checkConfig(2 * maxMessageSize, 2 * retentionMs, "*", "*", quotaManagerIsThrottled = true) - //Now delete the config - AdminUtils.changeTopicConfig(server.zkUtils, topic, new Properties) - checkConfig(Defaults.MaxMessageSize, Defaults.RetentionMs, "", "", quotaManagerIsThrottled = false) + // Verify that the same config can be read from ZK + val configInZk = AdminUtils.fetchEntityConfig(server.zkUtils, ConfigType.Topic, topic) + assertEquals(newConfig, configInZk) - //Add config back - AdminUtils.changeTopicConfig(server.zkUtils, topic, makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1")) - checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1", quotaManagerIsThrottled = true) + //Now delete the config + AdminUtils.changeTopicConfig(server.zkUtils, topic, new Properties) + checkConfig(Defaults.MaxMessageSize, Defaults.RetentionMs, "", "", quotaManagerIsThrottled = false) - //Now ensure updating to "" removes the throttled replica list also - AdminUtils.changeTopicConfig(server.zkUtils, topic, propsWith((LogConfig.FollowerReplicationThrottledReplicasProp, ""), (LogConfig.LeaderReplicationThrottledReplicasProp, ""))) - checkConfig(Defaults.MaxMessageSize, Defaults.RetentionMs, "", "", quotaManagerIsThrottled = false) + //Add config back + AdminUtils.changeTopicConfig(server.zkUtils, topic, makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1")) + checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1", quotaManagerIsThrottled = true) - } finally { - server.shutdown() - CoreUtils.delete(server.config.logDirs) - } + //Now ensure updating to "" removes the throttled replica list also + AdminUtils.changeTopicConfig(server.zkUtils, topic, propsWith((LogConfig.FollowerReplicationThrottledReplicasProp, ""), (LogConfig.LeaderReplicationThrottledReplicasProp, ""))) + checkConfig(Defaults.MaxMessageSize, Defaults.RetentionMs, "", "", quotaManagerIsThrottled = false) } @Test def shouldPropagateDynamicBrokerConfigs() { val brokerIds = Seq(0, 1, 2) - val servers = createBrokerConfigs(3, zkConnect).map(fromProps).map(createServer(_)) + servers = createBrokerConfigs(3, zkConnect).map(fromProps).map(createServer(_)) def checkConfig(limit: Long) { retry(10000) { @@ -501,37 +492,31 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest { } } - try { - val limit: Long = 1000000 - - // Set the limit & check it is applied to the log - changeBrokerConfig(zkUtils, brokerIds, propsWith( - (LeaderReplicationThrottledRateProp, limit.toString), - (FollowerReplicationThrottledRateProp, limit.toString))) - checkConfig(limit) - - // Now double the config values for the topic and check that it is applied - val newLimit = 2 * limit - changeBrokerConfig(zkUtils, brokerIds, propsWith( - (LeaderReplicationThrottledRateProp, newLimit.toString), - (FollowerReplicationThrottledRateProp, newLimit.toString))) - checkConfig(newLimit) - - // Verify that the same config can be read from ZK - for (brokerId <- brokerIds) { - val configInZk = AdminUtils.fetchEntityConfig(servers(brokerId).zkUtils, ConfigType.Broker, brokerId.toString) - assertEquals(newLimit, configInZk.getProperty(LeaderReplicationThrottledRateProp).toInt) - assertEquals(newLimit, configInZk.getProperty(FollowerReplicationThrottledRateProp).toInt) - } - - //Now delete the config - changeBrokerConfig(servers(0).zkUtils, brokerIds, new Properties) - checkConfig(DefaultReplicationThrottledRate) - - } finally { - servers.foreach(_.shutdown()) - servers.foreach(server => CoreUtils.delete(server.config.logDirs)) + val limit: Long = 1000000 + + // Set the limit & check it is applied to the log + changeBrokerConfig(zkUtils, brokerIds, propsWith( + (LeaderReplicationThrottledRateProp, limit.toString), + (FollowerReplicationThrottledRateProp, limit.toString))) + checkConfig(limit) + + // Now double the config values for the topic and check that it is applied + val newLimit = 2 * limit + changeBrokerConfig(zkUtils, brokerIds, propsWith( + (LeaderReplicationThrottledRateProp, newLimit.toString), + (FollowerReplicationThrottledRateProp, newLimit.toString))) + checkConfig(newLimit) + + // Verify that the same config can be read from ZK + for (brokerId <- brokerIds) { + val configInZk = AdminUtils.fetchEntityConfig(servers(brokerId).zkUtils, ConfigType.Broker, brokerId.toString) + assertEquals(newLimit, configInZk.getProperty(LeaderReplicationThrottledRateProp).toInt) + assertEquals(newLimit, configInZk.getProperty(FollowerReplicationThrottledRateProp).toInt) } + + //Now delete the config + changeBrokerConfig(servers(0).zkUtils, brokerIds, new Properties) + checkConfig(DefaultReplicationThrottledRate) } /** @@ -556,13 +541,9 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest { // Test that the existing clientId overrides are read val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect))) - try { - assertEquals(new Quota(1000, true), server.apis.quotas.produce.quota("ANONYMOUS", clientId)) - assertEquals(new Quota(2000, true), server.apis.quotas.fetch.quota("ANONYMOUS", clientId)) - } finally { - server.shutdown() - CoreUtils.delete(server.config.logDirs) - } + servers = Seq(server) + assertEquals(new Quota(1000, true), server.apis.quotas.produce.quota("ANONYMOUS", clientId)) + assertEquals(new Quota(2000, true), server.apis.quotas.fetch.quota("ANONYMOUS", clientId)) } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index 2085d2d..d9ab85e 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -22,7 +22,7 @@ import kafka.utils.TestUtils import kafka.utils.ZkUtils._ import kafka.server.{KafkaConfig, KafkaServer} import org.junit.Assert._ -import org.junit.Test +import org.junit.{After, Test} import java.util.Properties import kafka.common.{TopicAlreadyMarkedForDeletionException, TopicAndPartition} @@ -31,22 +31,29 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException class DeleteTopicTest extends ZooKeeperTestHarness { + var servers: Seq[KafkaServer] = Seq() + + @After + override def tearDown() { + TestUtils.shutdownServers(servers) + super.tearDown() + } + @Test def testDeleteTopicWithAllAliveReplicas() { val topicPartition = new TopicPartition("test", 0) val topic = topicPartition.topic - val servers = createTestTopicAndCluster(topic) + servers = createTestTopicAndCluster(topic) // start topic deletion AdminUtils.deleteTopic(zkUtils, topic) TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers) - servers.foreach(_.shutdown()) } @Test def testResumeDeleteTopicWithRecoveredFollower() { val topicPartition = new TopicPartition("test", 0) val topic = topicPartition.topic - val servers = createTestTopicAndCluster(topic) + servers = createTestTopicAndCluster(topic) // shut down one follower replica val leaderIdOpt = zkUtils.getLeaderForPartition(topic, 0) assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined) @@ -64,14 +71,13 @@ class DeleteTopicTest extends ZooKeeperTestHarness { // restart follower replica follower.startup() TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers) - servers.foreach(_.shutdown()) } @Test def testResumeDeleteTopicOnControllerFailover() { val topicPartition = new TopicPartition("test", 0) val topic = topicPartition.topic - val servers = createTestTopicAndCluster(topic) + servers = createTestTopicAndCluster(topic) val controllerId = zkUtils.getController() val controller = servers.filter(s => s.config.brokerId == controllerId).head val leaderIdOpt = zkUtils.getLeaderForPartition(topic, 0) @@ -91,7 +97,6 @@ class DeleteTopicTest extends ZooKeeperTestHarness { follower.startup() TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers) - servers.foreach(_.shutdown()) } @Test @@ -103,6 +108,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness { brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) // create brokers val allServers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) + this.servers = allServers val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId)) // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment) @@ -136,13 +142,12 @@ class DeleteTopicTest extends ZooKeeperTestHarness { assertEquals("Partition should not be reassigned to 0, 1, 2", oldAssignedReplicas, assignedReplicas) follower.startup() TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers) - allServers.foreach(_.shutdown()) } @Test def testDeleteTopicDuringAddPartition() { val topic = "test" - val servers = createTestTopicAndCluster(topic) + servers = createTestTopicAndCluster(topic) val leaderIdOpt = zkUtils.getLeaderForPartition(topic, 0) assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined) val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last @@ -159,13 +164,12 @@ class DeleteTopicTest extends ZooKeeperTestHarness { TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(newPartition).isEmpty), "Replica logs not for new partition [test,1] not deleted after delete topic is complete.") - servers.foreach(_.shutdown()) } @Test def testAddPartitionDuringDeleteTopic() { val topic = "test" - val servers = createTestTopicAndCluster(topic) + servers = createTestTopicAndCluster(topic) // start topic deletion AdminUtils.deleteTopic(zkUtils, topic) // add partitions to topic @@ -175,7 +179,6 @@ class DeleteTopicTest extends ZooKeeperTestHarness { // verify that new partition doesn't exist on any broker either assertTrue("Replica logs not deleted after delete topic is complete", servers.forall(_.getLogManager().getLog(newPartition).isEmpty)) - servers.foreach(_.shutdown()) } @Test @@ -183,7 +186,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topic = "test" val topicPartition = new TopicPartition(topic, 0) - val servers = createTestTopicAndCluster(topic) + servers = createTestTopicAndCluster(topic) // start topic deletion AdminUtils.deleteTopic(zkUtils, topic) TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers) @@ -195,14 +198,13 @@ class DeleteTopicTest extends ZooKeeperTestHarness { // check if all replica logs are created TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined), "Replicas for topic test not created.") - servers.foreach(_.shutdown()) } @Test def testDeleteNonExistingTopic() { val topicPartition = new TopicPartition("test", 0) val topic = topicPartition.topic - val servers = createTestTopicAndCluster(topic) + servers = createTestTopicAndCluster(topic) // start topic deletion try { AdminUtils.deleteTopic(zkUtils, "test2") @@ -220,7 +222,6 @@ class DeleteTopicTest extends ZooKeeperTestHarness { // topic test should have a leader val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0, 1000) assertTrue("Leader should exist for topic test", leaderIdOpt.isDefined) - servers.foreach(_.shutdown()) } @Test @@ -236,7 +237,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness { brokerConfigs.head.setProperty("log.segment.bytes","100") brokerConfigs.head.setProperty("log.cleaner.dedupe.buffer.size","1048577") - val servers = createTestTopicAndCluster(topic,brokerConfigs) + servers = createTestTopicAndCluster(topic,brokerConfigs) // for simplicity, we are validating cleaner offsets on a single broker val server = servers.head @@ -251,15 +252,13 @@ class DeleteTopicTest extends ZooKeeperTestHarness { // delete topic AdminUtils.deleteTopic(zkUtils, "test") TestUtils.verifyTopicDeletion(zkUtils, "test", 1, servers) - - servers.foreach(_.shutdown()) } @Test def testDeleteTopicAlreadyMarkedAsDeleted() { val topicPartition = new TopicPartition("test", 0) val topic = topicPartition.topic - val servers = createTestTopicAndCluster(topic) + servers = createTestTopicAndCluster(topic) try { // start topic deletion @@ -273,7 +272,6 @@ class DeleteTopicTest extends ZooKeeperTestHarness { } TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers) - servers.foreach(_.shutdown()) } private def createTestTopicAndCluster(topic: String, deleteTopicEnabled: Boolean = true): Seq[KafkaServer] = { @@ -311,7 +309,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness { def testDisableDeleteTopic() { val topicPartition = new TopicPartition("test", 0) val topic = topicPartition.topic - val servers = createTestTopicAndCluster(topic, deleteTopicEnabled = false) + servers = createTestTopicAndCluster(topic, deleteTopicEnabled = false) // mark the topic for deletion AdminUtils.deleteTopic(zkUtils, "test") TestUtils.waitUntilTrue(() => !zkUtils.pathExists(getDeleteTopicPath(topic)), @@ -323,6 +321,5 @@ class DeleteTopicTest extends ZooKeeperTestHarness { // topic test should have a leader val leaderIdOpt = zkUtils.getLeaderForPartition(topic, 0) assertTrue("Leader should exist for topic test", leaderIdOpt.isDefined) - servers.foreach(_.shutdown()) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala index 05a3f83..e3b0aa8 100644 --- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala @@ -17,7 +17,7 @@ import kafka.common.{AdminCommandFailedException, TopicAndPartition} import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils.TestUtils._ import kafka.utils.ZkUtils._ -import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils} +import kafka.utils.{Logging, TestUtils, ZkUtils} import kafka.zk.ZooKeeperTestHarness import org.junit.Assert.{assertEquals, assertTrue} import org.junit.{After, Before, Test} @@ -44,8 +44,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { @After override def tearDown() { - servers.par.foreach(_.shutdown()) - servers.par.foreach(server => CoreUtils.delete(server.config.logDirs)) + TestUtils.shutdownServers(servers) super.tearDown() } http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala index 5e608d1..cbb98e8 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala @@ -20,7 +20,7 @@ package kafka.controller import kafka.api.LeaderAndIsr import kafka.common.TopicAndPartition import kafka.server.{KafkaConfig, KafkaServer} -import kafka.utils.{CoreUtils, TestUtils, ZkUtils} +import kafka.utils.{TestUtils, ZkUtils} import kafka.zk.ZooKeeperTestHarness import org.junit.{After, Before, Test} @@ -35,8 +35,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { @After override def tearDown() { - servers.foreach(_.shutdown()) - servers.foreach(server => CoreUtils.delete(server.config.logDirs)) + TestUtils.shutdownServers(servers) super.tearDown } http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index af3133a..da25d5c 100755 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -97,8 +97,7 @@ abstract class KafkaServerTestHarness extends ZooKeeperTestHarness { @After override def tearDown() { if (servers != null) { - servers.foreach(_.shutdown()) - servers.foreach(server => CoreUtils.delete(server.config.logDirs)) + TestUtils.shutdownServers(servers) } super.tearDown } http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala index d63d5b2..07af590 100644 --- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala @@ -24,7 +24,7 @@ import kafka.api.TopicMetadataResponse import kafka.client.ClientUtils import kafka.cluster.BrokerEndPoint import kafka.server.{KafkaConfig, KafkaServer, NotRunning} -import kafka.utils.TestUtils +import kafka.utils.{CoreUtils, TestUtils} import kafka.utils.TestUtils._ import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.common.protocol.Errors @@ -33,6 +33,7 @@ import org.junit.{Test, After, Before} class TopicMetadataTest extends ZooKeeperTestHarness { private var server1: KafkaServer = null + private var adHocServers: Seq[KafkaServer] = Seq() var brokerEndPoints: Seq[BrokerEndPoint] = null var adHocConfigs: Seq[KafkaConfig] = null val numConfigs: Int = 4 @@ -53,7 +54,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness { @After override def tearDown() { - server1.shutdown() + TestUtils.shutdownServers(adHocServers :+ server1) super.tearDown() } @@ -134,6 +135,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness { adHocProps.setProperty(KafkaConfig.DefaultReplicationFactorProp, "3") // start adHoc brokers with replication factor too high val adHocServer = createServer(new KafkaConfig(adHocProps)) + adHocServers = Seq(adHocServer) // We are using the Scala clients and they don't support SSL. Once we move to the Java ones, we should use // `securityProtocol` instead of PLAINTEXT below val adHocEndpoint = new BrokerEndPoint(adHocServer.config.brokerId, adHocServer.config.hostName, @@ -147,8 +149,6 @@ class TopicMetadataTest extends ZooKeeperTestHarness { assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size) assertEquals("Expecting metadata for the test topic", topic, topicsMetadata.head.topic) assertEquals(0, topicsMetadata.head.partitionsMetadata.size) - - adHocServer.shutdown() } @Test @@ -216,7 +216,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness { val numBrokers = 2 //just 2 brokers are enough for the test // start adHoc brokers - val adHocServers = adHocConfigs.take(numBrokers - 1).map(p => createServer(p)) + adHocServers = adHocConfigs.take(numBrokers - 1).map(p => createServer(p)) val allServers: Seq[KafkaServer] = Seq(server1) ++ adHocServers // create topic @@ -232,9 +232,6 @@ class TopicMetadataTest extends ZooKeeperTestHarness { // check metadata is still correct and updated at all brokers checkIsr(allServers) - - // shutdown adHoc brokers - adHocServers.map(p => p.shutdown()) } private def checkMetadata(servers: Seq[KafkaServer], expectedBrokersCount: Int): Unit = { @@ -269,7 +266,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness { @Test def testAliveBrokersListWithNoTopicsAfterNewBrokerStartup { - var adHocServers = adHocConfigs.takeRight(adHocConfigs.size - 1).map(p => createServer(p)) + adHocServers = adHocConfigs.takeRight(adHocConfigs.size - 1).map(p => createServer(p)) checkMetadata(adHocServers, numConfigs - 1) @@ -277,13 +274,12 @@ class TopicMetadataTest extends ZooKeeperTestHarness { adHocServers = adHocServers ++ Seq(createServer(adHocConfigs.head)) checkMetadata(adHocServers, numConfigs) - adHocServers.map(p => p.shutdown()) } @Test def testAliveBrokersListWithNoTopicsAfterABrokerShutdown { - val adHocServers = adHocConfigs.map(p => createServer(p)) + adHocServers = adHocConfigs.map(p => createServer(p)) checkMetadata(adHocServers, numConfigs) @@ -292,7 +288,5 @@ class TopicMetadataTest extends ZooKeeperTestHarness { adHocServers.last.awaitShutdown() checkMetadata(adHocServers, numConfigs - 1) - - adHocServers.map(p => p.shutdown()) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/producer/ProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index 63ec83e..1d3f77f 100755 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -91,10 +91,7 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{ if (consumer2 != null) consumer2.close() - server1.shutdown - server2.shutdown - CoreUtils.delete(server1.config.logDirs) - CoreUtils.delete(server2.config.logDirs) + TestUtils.shutdownServers(Seq(server1, server2)) super.tearDown() } http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala index 8d4899b..6d9ab72 100755 --- a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala +++ b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala @@ -18,7 +18,7 @@ package kafka.server import org.junit.Assert._ -import kafka.utils.{CoreUtils, TestUtils} +import kafka.utils.TestUtils import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.common.protocol.SecurityProtocol import org.junit.{After, Test} @@ -32,10 +32,7 @@ class AdvertiseBrokerTest extends ZooKeeperTestHarness { @After override def tearDown() { - servers.foreach { s => - s.shutdown() - CoreUtils.delete(s.config.logDirs) - } + TestUtils.shutdownServers(servers) super.tearDown() } http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index 6ffe314..aa243be 100755 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._ import kafka.api.LeaderAndIsr import org.apache.kafka.common.requests._ import org.junit.Assert._ -import kafka.utils.{CoreUtils, TestUtils} +import kafka.utils.TestUtils import kafka.cluster.Broker import kafka.controller.{ControllerChannelManager, ControllerContext} import kafka.utils.TestUtils._ @@ -60,8 +60,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness { @After override def tearDown() { - servers.foreach(_.shutdown()) - servers.foreach(server => CoreUtils.delete(server.config.logDirs)) + TestUtils.shutdownServers(servers) super.tearDown() } http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index 415027c..9383355 100755 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -60,8 +60,7 @@ class LogOffsetTest extends ZooKeeperTestHarness { @After override def tearDown() { simpleConsumer.close - server.shutdown - Utils.delete(logDir) + TestUtils.shutdownServers(Seq(server)) super.tearDown() } http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index 54cee6b..0ecc3c7 100755 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -27,7 +27,6 @@ import kafka.server.checkpoints.{OffsetCheckpoint, OffsetCheckpointFile} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.{IntegerSerializer, StringSerializer} -import org.apache.kafka.common.utils.Utils import org.junit.{After, Before, Test} import org.junit.Assert._ @@ -95,10 +94,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness { @After override def tearDown() { producer.close() - for (server <- servers) { - server.shutdown() - Utils.delete(new File(server.config.logDirs.head)) - } + TestUtils.shutdownServers(servers) super.tearDown() } http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala index ebfbe89..244ef78 100755 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -68,8 +68,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness { @After override def tearDown() { simpleConsumer.close - server.shutdown - Utils.delete(logDir) + TestUtils.shutdownServers(Seq(server)) super.tearDown() } http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala index 19c386f..dd683e1 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala @@ -42,7 +42,7 @@ class ReplicaFetchTest extends ZooKeeperTestHarness { @After override def tearDown() { - brokers.foreach(_.shutdown()) + TestUtils.shutdownServers(brokers) super.tearDown() } http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala index 15e77a0..5fc4c0f 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala @@ -50,15 +50,10 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness { val topic = "topic1" var producer: KafkaProducer[Array[Byte], Array[Byte]] = null - @Before - override def setUp() { - super.setUp() - } - @After override def tearDown() { - brokers.par.foreach(_.shutdown()) producer.close() + shutdownServers(brokers) super.tearDown() } @@ -242,4 +237,4 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness { val metricName = broker.metrics.metricName("byte-rate", repType.toString) broker.metrics.metrics.asScala(metricName).value } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala index f21f2de..0ba133f 100755 --- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala @@ -19,8 +19,8 @@ package kafka.server import java.util.Properties import kafka.zk.ZooKeeperTestHarness -import kafka.utils.{TestUtils, CoreUtils} -import org.junit.{Before, Test} +import kafka.utils.TestUtils +import org.junit.{After, Before, Test} import org.junit.Assert._ import java.io.File @@ -30,6 +30,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness { var props2: Properties = null var config2: KafkaConfig = null val brokerMetaPropsFile = "meta.properties" + var servers: Seq[KafkaServer] = Seq() @Before override def setUp() { @@ -40,6 +41,12 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness { config2 = KafkaConfig.fromProps(props2) } + @After + override def tearDown() { + TestUtils.shutdownServers(servers) + super.tearDown() + } + @Test def testAutoGenerateBrokerId() { var server1 = new KafkaServer(config1, threadNamePrefix = Option(this.getClass.getName)) @@ -47,11 +54,10 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness { server1.shutdown() assertTrue(verifyBrokerMetadata(config1.logDirs, 1001)) // restart the server check to see if it uses the brokerId generated previously - server1 = new KafkaServer(config1) - server1.startup() + server1 = TestUtils.createServer(config1) + servers = Seq(server1) assertEquals(server1.config.brokerId, 1001) server1.shutdown() - CoreUtils.delete(server1.config.logDirs) TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName) } @@ -61,23 +67,18 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness { val server1 = new KafkaServer(config1, threadNamePrefix = Option(this.getClass.getName)) val server2 = new KafkaServer(config2, threadNamePrefix = Option(this.getClass.getName)) val props3 = TestUtils.createBrokerConfig(-1, zkConnect) - val config3 = KafkaConfig.fromProps(props3) - val server3 = new KafkaServer(config3) + val server3 = new KafkaServer(KafkaConfig.fromProps(props3)) server1.startup() - assertEquals(server1.config.brokerId,1001) + assertEquals(server1.config.brokerId, 1001) server2.startup() - assertEquals(server2.config.brokerId,0) + assertEquals(server2.config.brokerId, 0) server3.startup() - assertEquals(server3.config.brokerId,1002) - server1.shutdown() - server2.shutdown() - server3.shutdown() - assertTrue(verifyBrokerMetadata(server1.config.logDirs,1001)) - assertTrue(verifyBrokerMetadata(server2.config.logDirs,0)) - assertTrue(verifyBrokerMetadata(server3.config.logDirs,1002)) - CoreUtils.delete(server1.config.logDirs) - CoreUtils.delete(server2.config.logDirs) - CoreUtils.delete(server3.config.logDirs) + assertEquals(server3.config.brokerId, 1002) + servers = Seq(server1, server2, server3) + servers.foreach(_.shutdown()) + assertTrue(verifyBrokerMetadata(server1.config.logDirs, 1001)) + assertTrue(verifyBrokerMetadata(server2.config.logDirs, 0)) + assertTrue(verifyBrokerMetadata(server3.config.logDirs, 1002)) TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName) } @@ -88,12 +89,11 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness { // Set reserve broker ids to cause collision and ensure disabling broker id generation ignores the setting props3.put(KafkaConfig.MaxReservedBrokerIdProp, "0") val config3 = KafkaConfig.fromProps(props3) - val server3 = new KafkaServer(config3) - server3.startup() - assertEquals(server3.config.brokerId,3) + val server3 = TestUtils.createServer(config3) + servers = Seq(server3) + assertEquals(server3.config.brokerId, 3) server3.shutdown() - assertTrue(verifyBrokerMetadata(server3.config.logDirs,3)) - CoreUtils.delete(server3.config.logDirs) + assertTrue(verifyBrokerMetadata(server3.config.logDirs, 3)) TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName) } @@ -102,21 +102,22 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness { // add multiple logDirs and check if the generate brokerId is stored in all of them val logDirs = props1.getProperty("log.dir")+ "," + TestUtils.tempDir().getAbsolutePath + "," + TestUtils.tempDir().getAbsolutePath - props1.setProperty("log.dir",logDirs) + props1.setProperty("log.dir", logDirs) config1 = KafkaConfig.fromProps(props1) var server1 = new KafkaServer(config1, threadNamePrefix = Option(this.getClass.getName)) server1.startup() + servers = Seq(server1) server1.shutdown() assertTrue(verifyBrokerMetadata(config1.logDirs, 1001)) // addition to log.dirs after generation of a broker.id from zk should be copied over val newLogDirs = props1.getProperty("log.dir") + "," + TestUtils.tempDir().getAbsolutePath - props1.setProperty("log.dir",newLogDirs) + props1.setProperty("log.dir", newLogDirs) config1 = KafkaConfig.fromProps(props1) server1 = new KafkaServer(config1, threadNamePrefix = Option(this.getClass.getName)) server1.startup() + servers = Seq(server1) server1.shutdown() assertTrue(verifyBrokerMetadata(config1.logDirs, 1001)) - CoreUtils.delete(server1.config.logDirs) TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName) } @@ -125,6 +126,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness { // check if configured brokerId and stored brokerId are equal or throw InconsistentBrokerException var server1 = new KafkaServer(config1, threadNamePrefix = Option(this.getClass.getName)) //auto generate broker Id server1.startup() + servers = Seq(server1) server1.shutdown() server1 = new KafkaServer(config2, threadNamePrefix = Option(this.getClass.getName)) // user specified broker id try { @@ -133,7 +135,6 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness { case _: kafka.common.InconsistentBrokerIdException => //success } server1.shutdown() - CoreUtils.delete(server1.config.logDirs) TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName) } @@ -142,8 +143,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness { // Start a good server val propsA = TestUtils.createBrokerConfig(1, zkConnect) val configA = KafkaConfig.fromProps(propsA) - val serverA = new KafkaServer(configA) - serverA.startup() + val serverA = TestUtils.createServer(configA) // Start a server that collides on the broker id val propsB = TestUtils.createBrokerConfig(1, zkConnect) @@ -152,6 +152,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness { intercept[RuntimeException] { serverB.startup() } + servers = Seq(serverA) // verify no broker metadata was written serverB.config.logDirs.foreach { logDir => @@ -162,26 +163,25 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness { // adjust the broker config and start again propsB.setProperty(KafkaConfig.BrokerIdProp, "2") val newConfigB = KafkaConfig.fromProps(propsB) - val newServerB = new KafkaServer(newConfigB) - newServerB.startup() + val newServerB = TestUtils.createServer(newConfigB) + servers = Seq(serverA, newServerB) serverA.shutdown() newServerB.shutdown() + // verify correct broker metadata was written - assertTrue(verifyBrokerMetadata(serverA.config.logDirs,1)) - assertTrue(verifyBrokerMetadata(newServerB.config.logDirs,2)) - CoreUtils.delete(serverA.config.logDirs) - CoreUtils.delete(newServerB.config.logDirs) + assertTrue(verifyBrokerMetadata(serverA.config.logDirs, 1)) + assertTrue(verifyBrokerMetadata(newServerB.config.logDirs, 2)) TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName) } def verifyBrokerMetadata(logDirs: Seq[String], brokerId: Int): Boolean = { - for(logDir <- logDirs) { + for (logDir <- logDirs) { val brokerMetadataOpt = new BrokerMetadataCheckpoint( new File(logDir + File.separator + brokerMetaPropsFile)).read() brokerMetadataOpt match { - case Some(brokerMetadata: BrokerMetadata) => - if (brokerMetadata.brokerId != brokerId) return false + case Some(brokerMetadata) => + if (brokerMetadata.brokerId != brokerId) return false case _ => return false } } http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala index 325889f..1ec80fa 100755 --- a/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala @@ -19,16 +19,17 @@ package kafka.server import scala.concurrent._ import ExecutionContext.Implicits._ import scala.concurrent.duration._ -import kafka.utils.{CoreUtils, TestUtils, ZkUtils} +import kafka.utils.{TestUtils, ZkUtils} import kafka.zk.ZooKeeperTestHarness import org.junit.Assert._ -import org.junit.{Before, Test} +import org.junit.{Before, After, Test} import org.apache.kafka.test.TestUtils.isValidClusterId class ServerGenerateClusterIdTest extends ZooKeeperTestHarness { var config1: KafkaConfig = null var config2: KafkaConfig = null var config3: KafkaConfig = null + var servers: Seq[KafkaServer] = Seq() @Before override def setUp() { @@ -38,12 +39,20 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness { config3 = KafkaConfig.fromProps(TestUtils.createBrokerConfig(3, zkConnect)) } + @After + override def tearDown() { + TestUtils.shutdownServers(servers) + super.tearDown() + } + + @Test def testAutoGenerateClusterId() { // Make sure that the cluster id doesn't exist yet. assertFalse(zkUtils.pathExists(ZkUtils.ClusterIdPath)) var server1 = TestUtils.createServer(config1) + servers = Seq(server1) // Validate the cluster id val clusterIdOnFirstBoot = server1.clusterId @@ -56,8 +65,8 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness { assertEquals(zkUtils.getClusterId, Some(clusterIdOnFirstBoot)) // Restart the server check to confirm that it uses the clusterId generated previously - server1 = new KafkaServer(config1) - server1.startup() + server1 = TestUtils.createServer(config1) + servers = Seq(server1) val clusterIdOnSecondBoot = server1.clusterId assertEquals(clusterIdOnFirstBoot, clusterIdOnSecondBoot) @@ -68,7 +77,6 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness { assertTrue(zkUtils.pathExists(ZkUtils.ClusterIdPath)) assertEquals(zkUtils.getClusterId, Some(clusterIdOnFirstBoot)) - CoreUtils.delete(server1.config.logDirs) TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName) } @@ -82,10 +90,9 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness { val server3 = TestUtils.createServer(config3) val clusterIdFromServer3 = server3.clusterId + servers = Seq(server1, server2, server3) - server1.shutdown() - server2.shutdown() - server3.shutdown() + servers.foreach(_.shutdown()) isValidClusterId(clusterIdFromServer1) assertEquals(clusterIdFromServer1, clusterIdFromServer2, clusterIdFromServer3) @@ -97,28 +104,23 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness { assertEquals(clusterIdFromServer2, server2.clusterId) server3.startup() assertEquals(clusterIdFromServer3, server3.clusterId) - server1.shutdown() - server2.shutdown() - server3.shutdown() - CoreUtils.delete(server1.config.logDirs) - CoreUtils.delete(server2.config.logDirs) - CoreUtils.delete(server3.config.logDirs) + servers.foreach(_.shutdown()) + TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName) } @Test def testAutoGenerateClusterIdForKafkaClusterParallel() { val firstBoot = Future.traverse(Seq(config1, config2, config3))(config => Future(TestUtils.createServer(config))) - val Seq(server1, server2, server3) = Await.result(firstBoot, 100 second) + servers = Await.result(firstBoot, 100 second) + val Seq(server1, server2, server3) = servers val clusterIdFromServer1 = server1.clusterId val clusterIdFromServer2 = server2.clusterId val clusterIdFromServer3 = server3.clusterId - server1.shutdown() - server2.shutdown() - server3.shutdown() + servers.foreach(_.shutdown()) isValidClusterId(clusterIdFromServer1) assertEquals(clusterIdFromServer1, clusterIdFromServer2, clusterIdFromServer3) @@ -127,13 +129,11 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness { server.startup() server }) - val servers = Await.result(secondBoot, 100 second) + servers = Await.result(secondBoot, 100 second) servers.foreach(server => assertEquals(clusterIdFromServer1, server.clusterId)) servers.foreach(_.shutdown()) - CoreUtils.delete(server1.config.logDirs) - CoreUtils.delete(server2.config.logDirs) - CoreUtils.delete(server3.config.logDirs) + TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName) } http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala index ac757d0..a25569f 100755 --- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala @@ -17,14 +17,23 @@ package kafka.server -import kafka.utils.{CoreUtils, TestUtils, ZkUtils} +import kafka.utils.{TestUtils, ZkUtils} import kafka.zk.ZooKeeperTestHarness import org.easymock.EasyMock import org.junit.Assert._ -import org.junit.Test +import org.junit.{After, Test} class ServerStartupTest extends ZooKeeperTestHarness { + private var server: KafkaServer = null + + @After + override def tearDown() { + if (server != null) + TestUtils.shutdownServers(Seq(server)) + super.tearDown() + } + @Test def testBrokerCreatesZKChroot { val brokerId = 0 @@ -32,13 +41,10 @@ class ServerStartupTest extends ZooKeeperTestHarness { val props = TestUtils.createBrokerConfig(brokerId, zkConnect) val zooKeeperConnect = props.get("zookeeper.connect") props.put("zookeeper.connect", zooKeeperConnect + zookeeperChroot) - val server = TestUtils.createServer(KafkaConfig.fromProps(props)) + server = TestUtils.createServer(KafkaConfig.fromProps(props)) val pathExists = zkUtils.pathExists(zookeeperChroot) assertTrue(pathExists) - - server.shutdown() - CoreUtils.delete(server.config.logDirs) } @Test @@ -46,8 +52,8 @@ class ServerStartupTest extends ZooKeeperTestHarness { // Create and start first broker val brokerId1 = 0 val props1 = TestUtils.createBrokerConfig(brokerId1, zkConnect) - val server1 = TestUtils.createServer(KafkaConfig.fromProps(props1)) - val port = TestUtils.boundPort(server1) + server = TestUtils.createServer(KafkaConfig.fromProps(props1)) + val port = TestUtils.boundPort(server) // Create a second broker with same port val brokerId2 = 1 @@ -57,9 +63,6 @@ class ServerStartupTest extends ZooKeeperTestHarness { fail("Starting a broker with the same port should fail") } catch { case _: RuntimeException => // expected - } finally { - server1.shutdown() - CoreUtils.delete(server1.config.logDirs) } } @@ -70,7 +73,7 @@ class ServerStartupTest extends ZooKeeperTestHarness { val brokerId = 0 val props1 = TestUtils.createBrokerConfig(brokerId, zkConnect) - val server1 = TestUtils.createServer(KafkaConfig.fromProps(props1)) + server = TestUtils.createServer(KafkaConfig.fromProps(props1)) val brokerRegistration = zkUtils.readData(ZkUtils.BrokerIdsPath + "/" + brokerId)._1 val props2 = TestUtils.createBrokerConfig(brokerId, zkConnect) @@ -84,23 +87,17 @@ class ServerStartupTest extends ZooKeeperTestHarness { // broker registration shouldn't change assertEquals(brokerRegistration, zkUtils.readData(ZkUtils.BrokerIdsPath + "/" + brokerId)._1) - - server1.shutdown() - CoreUtils.delete(server1.config.logDirs) } @Test def testBrokerSelfAware { val brokerId = 0 val props = TestUtils.createBrokerConfig(brokerId, zkConnect) - val server = TestUtils.createServer(KafkaConfig.fromProps(props)) + server = TestUtils.createServer(KafkaConfig.fromProps(props)) TestUtils.waitUntilTrue(() => server.metadataCache.getAliveBrokers.nonEmpty, "Wait for cache to update") assertEquals(1, server.metadataCache.getAliveBrokers.size) assertEquals(brokerId, server.metadataCache.getAliveBrokers.head.id) - - server.shutdown() - CoreUtils.delete(server.config.logDirs) } @Test @@ -119,13 +116,11 @@ class ServerStartupTest extends ZooKeeperTestHarness { class MockKafkaServer(override val config: KafkaConfig, override val brokerState: BrokerState = mockBrokerState) extends KafkaServer(config) {} val props = TestUtils.createBrokerConfig(brokerId, zkConnect) - val server = new MockKafkaServer(KafkaConfig.fromProps(props)) + server = new MockKafkaServer(KafkaConfig.fromProps(props)) EasyMock.expect(mockBrokerState.newState(RunningAsBroker)).andDelegateTo(new BrokerStateInterceptor).once() EasyMock.replay(mockBrokerState) server.startup() - server.shutdown() - CoreUtils.delete(server.config.logDirs) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala index 4edfbaf..182e904 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala @@ -66,8 +66,8 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness @After override def tearDown() { - brokers.par.foreach(_.shutdown()) producer.close() + TestUtils.shutdownServers(brokers) super.tearDown() } http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala index c5bb5e4..f7110ee 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala @@ -33,7 +33,7 @@ import org.apache.kafka.common.utils.SystemTime import org.apache.kafka.common.TopicPartition import org.junit.Assert._ -import org.junit.{After, Before, Test} +import org.junit.{After, Test} import org.apache.kafka.common.requests.{EpochEndOffset, OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse} import scala.collection.JavaConverters._ @@ -51,23 +51,18 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging { val tp = t1p0 var producer: KafkaProducer[Array[Byte], Array[Byte]] = null - @Before - override def setUp() { - super.setUp() - val props = createBrokerConfigs(2, zkConnect) - brokers = props.map(KafkaConfig.fromProps).map(TestUtils.createServer(_)) - } - @After override def tearDown() { - brokers.foreach(_.shutdown()) if (producer != null) producer.close() + TestUtils.shutdownServers(brokers) super.tearDown() } @Test def shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader() { + brokers = (0 to 1).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) } + // Given two topics with replication of a single partition for (topic <- List(topic1, topic2)) { createTopic(zkUtils, topic, Map(0 -> Seq(0, 1)), servers = brokers) @@ -280,4 +275,4 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging { partitions.map { case (tp, epoch) => tp -> epoch.asInstanceOf[Integer] }.toMap.asJava } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 01ff83d..a51a07c 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -174,6 +174,16 @@ object TestUtils extends Logging { } /** + * Shutdown `servers` and delete their log directories. + */ + def shutdownServers(servers: Seq[KafkaServer]) { + servers.par.foreach { s => + s.shutdown() + CoreUtils.delete(s.config.logDirs) + } + } + + /** * Create a test config for the provided parameters. * * Note that if `interBrokerSecurityProtocol` is defined, the listener for the `SecurityProtocol` will be enabled. @@ -939,9 +949,10 @@ object TestUtils extends Logging { } def verifyNonDaemonThreadsStatus(threadNamePrefix: String) { - assertEquals(0, Thread.getAllStackTraces.keySet().toArray - .map(_.asInstanceOf[Thread]) - .count(t => !t.isDaemon && t.isAlive && t.getName.startsWith(threadNamePrefix))) + val threadCount = Thread.getAllStackTraces.keySet.asScala.count { t => + !t.isDaemon && t.isAlive && t.getName.startsWith(threadNamePrefix) + } + assertEquals(0, threadCount) } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala index 2805b3b..b3b10f3 100755 --- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala +++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala @@ -21,11 +21,14 @@ import javax.security.auth.login.Configuration import kafka.utils.{CoreUtils, Logging, ZkUtils} import org.junit.{After, Before} +import org.junit.Assert.assertEquals import org.scalatest.junit.JUnitSuite import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.test.IntegrationTest import org.junit.experimental.categories.Category +import scala.collection.JavaConverters._ + @Category(Array(classOf[IntegrationTest])) abstract class ZooKeeperTestHarness extends JUnitSuite with Logging { @@ -41,6 +44,7 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging { @Before def setUp() { + assertNoBrokerControllersRunning() zookeeper = new EmbeddedZookeeper() zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled())) } @@ -52,6 +56,22 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging { if (zookeeper != null) CoreUtils.swallow(zookeeper.shutdown()) Configuration.setConfiguration(null) + assertNoBrokerControllersRunning() } + // Tests using this class start ZooKeeper before starting any brokers and shutdown ZK after + // shutting down brokers. If tests leave broker controllers running, subsequent tests may fail in + // unexpected ways if ZK port is reused. This method ensures that there is no Controller event thread + // since the controller loads default JAAS configuration to make connections to brokers on this thread. + // + // Any tests that use this class and invoke ZooKeeperTestHarness#tearDown() will fail in the tearDown() + // if controller event thread is found. Tests with missing broker shutdown which don't use ZooKeeperTestHarness + // or its tearDown() will cause an assertion failure in the subsequent test that invokes ZooKeeperTestHarness#setUp(), + // making it easier to identify the test with missing shutdown from the test sequence. + private def assertNoBrokerControllersRunning() { + val threads = Thread.getAllStackTraces.keySet.asScala + .map(_.getName) + .filter(_.contains("controller-event-thread")) + assertEquals(Set(), threads) + } }
