http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/other/kafka/TestOffsetManager.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala index e0e46c8..549a96b 100644 --- a/core/src/test/scala/other/kafka/TestOffsetManager.scala +++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala @@ -63,13 +63,13 @@ object TestOffsetManager { } - class CommitThread(id: Int, partitionCount: Int, commitIntervalMs: Long, zkClient: ZkClient) + class CommitThread(id: Int, partitionCount: Int, commitIntervalMs: Long, zkUtils: ZkUtils) extends ShutdownableThread("commit-thread") with KafkaMetricsGroup { private val groupId = "group-" + id private val metadata = "Metadata from commit thread " + id - private var offsetsChannel = ClientUtils.channelToOffsetManager(groupId, zkClient, SocketTimeoutMs) + private var offsetsChannel = ClientUtils.channelToOffsetManager(groupId, zkUtils, SocketTimeoutMs) private var offset = 0L val numErrors = new AtomicInteger(0) val numCommits = new AtomicInteger(0) @@ -79,7 +79,7 @@ object TestOffsetManager { private def ensureConnected() { if (!offsetsChannel.isConnected) - offsetsChannel = ClientUtils.channelToOffsetManager(groupId, zkClient, SocketTimeoutMs) + offsetsChannel = ClientUtils.channelToOffsetManager(groupId, zkUtils, SocketTimeoutMs) } override def doWork() { @@ -119,7 +119,7 @@ object TestOffsetManager { } } - class FetchThread(numGroups: Int, fetchIntervalMs: Long, zkClient: ZkClient) + class FetchThread(numGroups: Int, fetchIntervalMs: Long, zkUtils: ZkUtils) extends ShutdownableThread("fetch-thread") with KafkaMetricsGroup { @@ -127,7 +127,7 @@ object TestOffsetManager { private val fetchTimer = new KafkaTimer(timer) private val channels = mutable.Map[Int, BlockingChannel]() - private var metadataChannel = ClientUtils.channelToAnyBroker(zkClient, SocketTimeoutMs) + private var metadataChannel = ClientUtils.channelToAnyBroker(zkUtils, SocketTimeoutMs) private val numErrors = new AtomicInteger(0) @@ -141,7 +141,7 @@ object TestOffsetManager { val channel = if (channels.contains(coordinatorId)) channels(coordinatorId) else { - val newChannel = ClientUtils.channelToOffsetManager(group, zkClient, SocketTimeoutMs) + val newChannel = ClientUtils.channelToOffsetManager(group, zkUtils, SocketTimeoutMs) channels.put(coordinatorId, newChannel) newChannel } @@ -173,7 +173,7 @@ object TestOffsetManager { println("Error while querying %s:%d - shutting down query channel.".format(metadataChannel.host, metadataChannel.port)) metadataChannel.disconnect() println("Creating new query channel.") - metadataChannel = ClientUtils.channelToAnyBroker(zkClient, SocketTimeoutMs) + metadataChannel = ClientUtils.channelToAnyBroker(zkUtils, SocketTimeoutMs) } finally { Thread.sleep(fetchIntervalMs) @@ -250,17 +250,17 @@ object TestOffsetManager { println("Commit thread count: %d; Partition count: %d, Commit interval: %d ms; Fetch interval: %d ms; Reporting interval: %d ms" .format(threadCount, partitionCount, commitIntervalMs, fetchIntervalMs, reportingIntervalMs)) - var zkClient: ZkClient = null + var zkUtils: ZkUtils = null var commitThreads: Seq[CommitThread] = Seq() var fetchThread: FetchThread = null var statsThread: StatsThread = null try { - zkClient = ZkUtils.createZkClient(zookeeper, 6000, 2000) + zkUtils = ZkUtils(zookeeper, 6000, 2000, false) commitThreads = (0 to (threadCount-1)).map { threadId => - new CommitThread(threadId, partitionCount, commitIntervalMs, zkClient) + new CommitThread(threadId, partitionCount, commitIntervalMs, zkUtils) } - fetchThread = new FetchThread(threadCount, fetchIntervalMs, zkClient) + fetchThread = new FetchThread(threadCount, fetchIntervalMs, zkUtils) val statsThread = new StatsThread(reportingIntervalMs, commitThreads, fetchThread) @@ -300,7 +300,7 @@ object TestOffsetManager { statsThread.shutdown() statsThread.join() } - zkClient.close() + zkUtils.close() } }
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala index ed94039..0fce611 100755 --- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -49,10 +49,10 @@ class AddPartitionsTest extends ZooKeeperTestHarness { brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, s.boundPort())) // create topics first - createTopic(zkClient, topic1, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers) - createTopic(zkClient, topic2, partitionReplicaAssignment = Map(0->Seq(1,2)), servers = servers) - createTopic(zkClient, topic3, partitionReplicaAssignment = Map(0->Seq(2,3,0,1)), servers = servers) - createTopic(zkClient, topic4, partitionReplicaAssignment = Map(0->Seq(0,3)), servers = servers) + createTopic(zkUtils, topic1, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers) + createTopic(zkUtils, topic2, partitionReplicaAssignment = Map(0->Seq(1,2)), servers = servers) + createTopic(zkUtils, topic3, partitionReplicaAssignment = Map(0->Seq(2,3,0,1)), servers = servers) + createTopic(zkUtils, topic4, partitionReplicaAssignment = Map(0->Seq(0,3)), servers = servers) } @After @@ -65,7 +65,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness { @Test def testTopicDoesNotExist { try { - AdminUtils.addPartitions(zkClient, "Blah", 1) + AdminUtils.addPartitions(zkUtils, "Blah", 1) fail("Topic should not exist") } catch { case e: AdminOperationException => //this is good @@ -76,7 +76,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness { @Test def testWrongReplicaCount { try { - AdminUtils.addPartitions(zkClient, topic1, 2, "0:1,0:1:2") + AdminUtils.addPartitions(zkUtils, topic1, 2, "0:1,0:1:2") fail("Add partitions should fail") } catch { case e: AdminOperationException => //this is good @@ -86,12 +86,12 @@ class AddPartitionsTest extends ZooKeeperTestHarness { @Test def testIncrementPartitions { - AdminUtils.addPartitions(zkClient, topic1, 3) + AdminUtils.addPartitions(zkUtils, topic1, 3) // wait until leader is elected - var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 1) - var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 2) - val leader1FromZk = ZkUtils.getLeaderForPartition(zkClient, topic1, 1).get - val leader2FromZk = ZkUtils.getLeaderForPartition(zkClient, topic1, 2).get + var leader1 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 1) + var leader2 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 2) + val leader1FromZk = zkUtils.getLeaderForPartition(topic1, 1).get + val leader2FromZk = zkUtils.getLeaderForPartition(topic1, 2).get assertEquals(leader1.get, leader1FromZk) assertEquals(leader2.get, leader2FromZk) @@ -112,12 +112,12 @@ class AddPartitionsTest extends ZooKeeperTestHarness { @Test def testManualAssignmentOfReplicas { - AdminUtils.addPartitions(zkClient, topic2, 3, "1:2,0:1,2:3") + AdminUtils.addPartitions(zkUtils, topic2, 3, "1:2,0:1,2:3") // wait until leader is elected - var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 1) - var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 2) - val leader1FromZk = ZkUtils.getLeaderForPartition(zkClient, topic2, 1).get - val leader2FromZk = ZkUtils.getLeaderForPartition(zkClient, topic2, 2).get + var leader1 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic2, 1) + var leader2 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic2, 2) + val leader1FromZk = zkUtils.getLeaderForPartition(topic2, 1).get + val leader2FromZk = zkUtils.getLeaderForPartition(topic2, 2).get assertEquals(leader1.get, leader1FromZk) assertEquals(leader2.get, leader2FromZk) @@ -139,7 +139,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness { @Test def testReplicaPlacement { - AdminUtils.addPartitions(zkClient, topic3, 7) + AdminUtils.addPartitions(zkUtils, topic3, 7) // read metadata from a broker and verify the new topic partitions exist TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 1) http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/admin/AdminTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 2d18069..52ea580 100755 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -66,23 +66,23 @@ class AdminTest extends ZooKeeperTestHarness with Logging { @Test def testManualReplicaAssignment() { val brokers = List(0, 1, 2, 3, 4) - TestUtils.createBrokersInZk(zkClient, zkConnection, brokers) + TestUtils.createBrokersInZk(zkUtils, brokers) // duplicate brokers intercept[IllegalArgumentException] { - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, "test", Map(0->Seq(0,0))) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, "test", Map(0->Seq(0,0))) } // inconsistent replication factor intercept[IllegalArgumentException] { - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, "test", Map(0->Seq(0,1), 1->Seq(0))) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, "test", Map(0->Seq(0,1), 1->Seq(0))) } // good assignment val assignment = Map(0 -> List(0, 1, 2), 1 -> List(1, 2, 3)) - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, "test", assignment) - val found = ZkUtils.getPartitionAssignmentForTopics(zkClient, Seq("test")) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, "test", assignment) + val found = zkUtils.getPartitionAssignmentForTopics(Seq("test")) assertEquals(assignment, found("test")) } @@ -117,19 +117,19 @@ class AdminTest extends ZooKeeperTestHarness with Logging { 11 -> 1 ) val topic = "test" - TestUtils.createBrokersInZk(zkClient, zkConnection, List(0, 1, 2, 3, 4)) + TestUtils.createBrokersInZk(zkUtils, List(0, 1, 2, 3, 4)) // create the topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment) // create leaders for all partitions - TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1) - val actualReplicaList = leaderForPartitionMap.keys.toArray.map(p => (p -> ZkUtils.getReplicasForPartition(zkClient, topic, p))).toMap + TestUtils.makeLeaderForPartition(zkUtils, topic, leaderForPartitionMap, 1) + val actualReplicaList = leaderForPartitionMap.keys.toArray.map(p => (p -> zkUtils.getReplicasForPartition(topic, p))).toMap assertEquals(expectedReplicaAssignment.size, actualReplicaList.size) for(i <- 0 until actualReplicaList.size) assertEquals(expectedReplicaAssignment.get(i).get, actualReplicaList(i)) intercept[TopicExistsException] { // shouldn't be able to create a topic that already exists - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment) } } @@ -137,13 +137,13 @@ class AdminTest extends ZooKeeperTestHarness with Logging { def testTopicCreationWithCollision() { val topic = "test.topic" val collidingTopic = "test_topic" - TestUtils.createBrokersInZk(zkClient, zkConnection, List(0, 1, 2, 3, 4)) + TestUtils.createBrokersInZk(zkUtils, List(0, 1, 2, 3, 4)) // create the topic - AdminUtils.createTopic(zkClient, topic, 3, 1) + AdminUtils.createTopic(zkUtils, topic, 3, 1) intercept[InvalidTopicException] { // shouldn't be able to create a topic that collides - AdminUtils.createTopic(zkClient, collidingTopic, 3, 1) + AdminUtils.createTopic(zkUtils, collidingTopic, 3, 1) } } @@ -160,25 +160,25 @@ class AdminTest extends ZooKeeperTestHarness with Logging { // create brokers val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) // create the topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment) // reassign partition 0 val newReplicas = Seq(0, 2, 3) val partitionToBeReassigned = 0 val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned) - val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas)) + val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, Map(topicAndPartition -> newReplicas)) assertTrue("Partition reassignment attempt failed for [test, 0]", reassignPartitionsCommand.reassignPartitions()) // wait until reassignment is completed TestUtils.waitUntilTrue(() => { - val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas); - ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas, + val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas); + ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkUtils, topicAndPartition, newReplicas, Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted; }, "Partition reassignment should complete") - val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) + val assignedReplicas = zkUtils.getReplicasForPartition(topic, partitionToBeReassigned) // in sync replicas should not have any replica that is not in the new assigned replicas - checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas) + checkForPhantomInSyncReplicas(zkUtils, topic, partitionToBeReassigned, assignedReplicas) assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas) - ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers) + ensureNoUnderReplicatedPartitions(zkUtils, topic, partitionToBeReassigned, assignedReplicas, servers) TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, "New replicas should exist on brokers") servers.foreach(_.shutdown()) @@ -191,24 +191,24 @@ class AdminTest extends ZooKeeperTestHarness with Logging { // create brokers val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) // create the topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment) // reassign partition 0 val newReplicas = Seq(1, 2, 3) val partitionToBeReassigned = 0 val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned) - val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas)) + val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, Map(topicAndPartition -> newReplicas)) assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions()) // wait until reassignment is completed TestUtils.waitUntilTrue(() => { - val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas); - ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas, + val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas); + ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkUtils, topicAndPartition, newReplicas, Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted; }, "Partition reassignment should complete") - val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) + val assignedReplicas = zkUtils.getReplicasForPartition(topic, partitionToBeReassigned) assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas) - checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas) - ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers) + checkForPhantomInSyncReplicas(zkUtils, topic, partitionToBeReassigned, assignedReplicas) + ensureNoUnderReplicatedPartitions(zkUtils, topic, partitionToBeReassigned, assignedReplicas, servers) TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, "New replicas should exist on brokers") @@ -222,24 +222,24 @@ class AdminTest extends ZooKeeperTestHarness with Logging { // create brokers val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) // create the topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment) // reassign partition 0 val newReplicas = Seq(2, 3) val partitionToBeReassigned = 0 val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned) - val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas)) + val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, Map(topicAndPartition -> newReplicas)) assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions()) // wait until reassignment is completed TestUtils.waitUntilTrue(() => { - val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas); - ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas, + val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas); + ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkUtils, topicAndPartition, newReplicas, Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted; }, "Partition reassignment should complete") - val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) + val assignedReplicas = zkUtils.getReplicasForPartition(topic, partitionToBeReassigned) assertEquals("Partition should have been reassigned to 2, 3", newReplicas, assignedReplicas) - checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas) - ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers) + checkForPhantomInSyncReplicas(zkUtils, topic, partitionToBeReassigned, assignedReplicas) + ensureNoUnderReplicatedPartitions(zkUtils, topic, partitionToBeReassigned, assignedReplicas, servers) TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, "New replicas should exist on brokers") servers.foreach(_.shutdown()) @@ -254,9 +254,9 @@ class AdminTest extends ZooKeeperTestHarness with Logging { val newReplicas = Seq(2, 3) val partitionToBeReassigned = 0 val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned) - val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas)) + val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, Map(topicAndPartition -> newReplicas)) assertFalse("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions()) - val reassignedPartitions = ZkUtils.getPartitionsBeingReassigned(zkClient) + val reassignedPartitions = zkUtils.getPartitionsBeingReassigned() assertFalse("Partition should not be reassigned", reassignedPartitions.contains(topicAndPartition)) servers.foreach(_.shutdown()) } @@ -266,25 +266,25 @@ class AdminTest extends ZooKeeperTestHarness with Logging { val expectedReplicaAssignment = Map(0 -> List(0, 1)) val topic = "test" // create the topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment) // put the partition in the reassigned path as well // reassign partition 0 val newReplicas = Seq(0, 1) val partitionToBeReassigned = 0 val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned) - val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas)) + val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, Map(topicAndPartition -> newReplicas)) reassignPartitionsCommand.reassignPartitions // create brokers val servers = TestUtils.createBrokerConfigs(2, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) // wait until reassignment completes - TestUtils.waitUntilTrue(() => !checkIfReassignPartitionPathExists(zkClient), + TestUtils.waitUntilTrue(() => !checkIfReassignPartitionPathExists(zkUtils), "Partition reassignment should complete") - val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) + val assignedReplicas = zkUtils.getReplicasForPartition(topic, partitionToBeReassigned) assertEquals("Partition should have been reassigned to 0, 1", newReplicas, assignedReplicas) - checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas) + checkForPhantomInSyncReplicas(zkUtils, topic, partitionToBeReassigned, assignedReplicas) // ensure that there are no under replicated partitions - ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers) + ensureNoUnderReplicatedPartitions(zkUtils, topic, partitionToBeReassigned, assignedReplicas, servers) TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, "New replicas should exist on brokers") servers.foreach(_.shutdown()) @@ -294,10 +294,9 @@ class AdminTest extends ZooKeeperTestHarness with Logging { def testPreferredReplicaJsonData() { // write preferred replica json data to zk path val partitionsForPreferredReplicaElection = Set(TopicAndPartition("test", 1), TopicAndPartition("test2", 1)) - PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkClient, partitionsForPreferredReplicaElection) + PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkUtils, partitionsForPreferredReplicaElection) // try to read it back and compare with what was written - val preferredReplicaElectionZkData = ZkUtils.readData(zkClient, - ZkUtils.PreferredReplicaLeaderElectionPath)._1 + val preferredReplicaElectionZkData = zkUtils.readData(ZkUtils.PreferredReplicaLeaderElectionPath)._1 val partitionsUndergoingPreferredReplicaElection = PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(preferredReplicaElectionZkData) assertEquals("Preferred replica election ser-de failed", partitionsForPreferredReplicaElection, @@ -313,14 +312,14 @@ class AdminTest extends ZooKeeperTestHarness with Logging { // create brokers val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false).map(KafkaConfig.fromProps) // create the topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment) val servers = serverConfigs.reverseMap(s => TestUtils.createServer(s)) // broker 2 should be the leader since it was started first - val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, oldLeaderOpt = None).get + val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partition, oldLeaderOpt = None).get // trigger preferred replica election - val preferredReplicaElection = new PreferredReplicaLeaderElectionCommand(zkClient, Set(TopicAndPartition(topic, partition))) + val preferredReplicaElection = new PreferredReplicaLeaderElectionCommand(zkUtils, Set(TopicAndPartition(topic, partition))) preferredReplicaElection.moveLeaderToPreferredReplica() - val newLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, oldLeaderOpt = Some(currentLeader)).get + val newLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partition, oldLeaderOpt = Some(currentLeader)).get assertEquals("Preferred replica election failed", preferredReplica, newLeader) servers.foreach(_.shutdown()) } @@ -334,9 +333,9 @@ class AdminTest extends ZooKeeperTestHarness with Logging { val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false).map(KafkaConfig.fromProps) val servers = serverConfigs.reverseMap(s => TestUtils.createServer(s)) // create the topic - TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = servers) + TestUtils.createTopic(zkUtils, topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = servers) - val controllerId = ZkUtils.getController(zkClient) + val controllerId = zkUtils.getController() val controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController var partitionsRemaining = controller.shutdownBroker(2) var activeServers = servers.filter(s => s.config.brokerId != 2) @@ -402,16 +401,16 @@ class AdminTest extends ZooKeeperTestHarness with Logging { // create a topic with a few config overrides and check that they are applied val maxMessageSize = 1024 val retentionMs = 1000*1000 - AdminUtils.createTopic(server.zkClient, topic, partitions, 1, makeConfig(maxMessageSize, retentionMs)) + AdminUtils.createTopic(server.zkUtils, topic, partitions, 1, makeConfig(maxMessageSize, retentionMs)) checkConfig(maxMessageSize, retentionMs) // now double the config values for the topic and check that it is applied val newConfig: Properties = makeConfig(2*maxMessageSize, 2 * retentionMs) - AdminUtils.changeTopicConfig(server.zkClient, topic, makeConfig(2*maxMessageSize, 2 * retentionMs)) + AdminUtils.changeTopicConfig(server.zkUtils, topic, makeConfig(2*maxMessageSize, 2 * retentionMs)) checkConfig(2*maxMessageSize, 2 * retentionMs) // Verify that the same config can be read from ZK - val configInZk = AdminUtils.fetchEntityConfig(server.zkClient, ConfigType.Topic, topic) + val configInZk = AdminUtils.fetchEntityConfig(server.zkUtils, ConfigType.Topic, topic) assertEquals(newConfig, configInZk) } finally { server.shutdown() http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala index d3abf08..c19127e 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala @@ -33,11 +33,11 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness { val groupToDelete = "groupToDelete" val otherGroup = "otherGroup" - TestUtils.createTopic(zkClient, topic, 1, 3, servers) + TestUtils.createTopic(zkUtils, topic, 1, 3, servers) fillInConsumerGroupInfo(topic, groupToDelete, "consumer", 0, 10, false) fillInConsumerGroupInfo(topic, otherGroup, "consumer", 0, 10, false) - AdminUtils.deleteConsumerGroupInZK(zkClient, groupToDelete) + AdminUtils.deleteConsumerGroupInZK(zkUtils, groupToDelete) TestUtils.waitUntilTrue(() => !groupDirExists(new ZKGroupDirs(groupToDelete)), "DeleteConsumerGroupInZK should delete the provided consumer group's directory") @@ -51,11 +51,11 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness { val groupToDelete = "groupToDelete" val otherGroup = "otherGroup" - TestUtils.createTopic(zkClient, topic, 1, 3, servers) + TestUtils.createTopic(zkUtils, topic, 1, 3, servers) fillInConsumerGroupInfo(topic, groupToDelete, "consumer", 0, 10, true) fillInConsumerGroupInfo(topic, otherGroup, "consumer", 0, 10, false) - AdminUtils.deleteConsumerGroupInZK(zkClient, groupToDelete) + AdminUtils.deleteConsumerGroupInZK(zkUtils, groupToDelete) TestUtils.waitUntilTrue(() => groupDirExists(new ZKGroupDirs(groupToDelete)), "DeleteConsumerGroupInZK should not delete the provided consumer group's directory if the consumer group is still active") @@ -68,11 +68,11 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness { val topic = "test" val groupToDelete = "groupToDelete" val otherGroup = "otherGroup" - TestUtils.createTopic(zkClient, topic, 1, 3, servers) + TestUtils.createTopic(zkUtils, topic, 1, 3, servers) fillInConsumerGroupInfo(topic, groupToDelete, "consumer", 0, 10, false) fillInConsumerGroupInfo(topic, otherGroup, "consumer", 0, 10, false) - AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkClient, groupToDelete, topic) + AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkUtils, groupToDelete, topic) TestUtils.waitUntilTrue(() => !groupDirExists(new ZKGroupDirs(groupToDelete)), "DeleteConsumerGroupInfoForTopicInZK should delete the provided consumer group's directory if it just consumes from one topic") @@ -86,14 +86,14 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness { val otherTopic = "otherTopic" val groupToDelete = "groupToDelete" val otherGroup = "otherGroup" - TestUtils.createTopic(zkClient, topicToDelete, 1, 3, servers) - TestUtils.createTopic(zkClient, otherTopic, 1, 3, servers) + TestUtils.createTopic(zkUtils, topicToDelete, 1, 3, servers) + TestUtils.createTopic(zkUtils, otherTopic, 1, 3, servers) fillInConsumerGroupInfo(topicToDelete, groupToDelete, "consumer", 0, 10, false) fillInConsumerGroupInfo(otherTopic, groupToDelete, "consumer", 0, 10, false) fillInConsumerGroupInfo(topicToDelete, otherGroup, "consumer", 0, 10, false) - AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkClient, groupToDelete, topicToDelete) + AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkUtils, groupToDelete, topicToDelete) TestUtils.waitUntilTrue(() => !groupTopicOffsetAndOwnerDirsExist(new ZKGroupTopicDirs(groupToDelete, topicToDelete)), "DeleteConsumerGroupInfoForTopicInZK should delete the provided consumer group's owner and offset directories for the given topic") @@ -108,13 +108,13 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness { val topicToDelete = "topicToDelete" val otherTopic = "otherTopic" val group = "group" - TestUtils.createTopic(zkClient, topicToDelete, 1, 3, servers) - TestUtils.createTopic(zkClient, otherTopic, 1, 3, servers) + TestUtils.createTopic(zkUtils, topicToDelete, 1, 3, servers) + TestUtils.createTopic(zkUtils, otherTopic, 1, 3, servers) fillInConsumerGroupInfo(topicToDelete, group, "consumer", 0, 10, true) fillInConsumerGroupInfo(otherTopic, group, "consumer", 0, 10, true) - AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkClient, group, topicToDelete) + AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkUtils, group, topicToDelete) TestUtils.waitUntilTrue(() => groupTopicOffsetAndOwnerDirsExist(new ZKGroupTopicDirs(group, topicToDelete)), "DeleteConsumerGroupInfoForTopicInZK should not delete the provided consumer group's owner and offset directories for the given topic if the consumer group is still active") @@ -128,14 +128,14 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness { val otherTopic = "otherTopic" val groups = Seq("group1", "group2") - TestUtils.createTopic(zkClient, topicToDelete, 1, 3, servers) - TestUtils.createTopic(zkClient, otherTopic, 1, 3, servers) + TestUtils.createTopic(zkUtils, topicToDelete, 1, 3, servers) + TestUtils.createTopic(zkUtils, otherTopic, 1, 3, servers) val groupTopicDirsForTopicToDelete = groups.map(group => new ZKGroupTopicDirs(group, topicToDelete)) val groupTopicDirsForOtherTopic = groups.map(group => new ZKGroupTopicDirs(group, otherTopic)) groupTopicDirsForTopicToDelete.foreach(dir => fillInConsumerGroupInfo(topicToDelete, dir.group, "consumer", 0, 10, false)) groupTopicDirsForOtherTopic.foreach(dir => fillInConsumerGroupInfo(otherTopic, dir.group, "consumer", 0, 10, false)) - AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkClient, topicToDelete) + AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkUtils, topicToDelete) TestUtils.waitUntilTrue(() => !groupTopicDirsForTopicToDelete.exists(groupTopicOffsetAndOwnerDirsExist), "Consumer group info on deleted topic topic should be deleted by DeleteAllConsumerGroupInfoForTopicInZK") @@ -148,13 +148,13 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness { val topic = "topic" val group = "group" - TestUtils.createTopic(zkClient, topic, 1, 3, servers) + TestUtils.createTopic(zkUtils, topic, 1, 3, servers) val dir = new ZKGroupTopicDirs(group, topic) fillInConsumerGroupInfo(topic, dir.group, "consumer", 0, 10, false) - AdminUtils.deleteTopic(zkClient, topic) - TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers) - AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkClient, topic) + AdminUtils.deleteTopic(zkUtils, topic) + TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers) + AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkUtils, topic) TestUtils.waitUntilTrue(() => !groupDirExists(dir), "Consumer group info on related topics should be deleted by DeleteAllConsumerGroupInfoForTopicInZK") @@ -185,19 +185,19 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness { val consumerConfig = new ConsumerConfig(consumerProps) val dir = new ZKGroupTopicDirs(group, topic) TestUtils.updateConsumerOffset(consumerConfig, dir.consumerOffsetDir + "/" + partition, offset) - ZkUtils.createEphemeralPathExpectConflict(zkClient, ZkUtils.getConsumerPartitionOwnerPath(group, topic, partition), "") - ZkUtils.makeSurePersistentPathExists(zkClient, dir.consumerRegistryDir) + zkUtils.createEphemeralPathExpectConflict(zkUtils.getConsumerPartitionOwnerPath(group, topic, partition), "") + zkUtils.makeSurePersistentPathExists(dir.consumerRegistryDir) if (registerConsumer) { - ZkUtils.createEphemeralPathExpectConflict(zkClient, dir.consumerRegistryDir + "/" + consumerId, "") + zkUtils.createEphemeralPathExpectConflict(dir.consumerRegistryDir + "/" + consumerId, "") } } private def groupDirExists(dir: ZKGroupDirs) = { - ZkUtils.pathExists(zkClient, dir.consumerGroupDir) + zkUtils.pathExists(dir.consumerGroupDir) } private def groupTopicOffsetAndOwnerDirsExist(dir: ZKGroupTopicDirs) = { - ZkUtils.pathExists(zkClient, dir.consumerOffsetDir) && ZkUtils.pathExists(zkClient, dir.consumerOwnerDir) + zkUtils.pathExists(dir.consumerOffsetDir) && zkUtils.pathExists(dir.consumerOwnerDir) } private def produceEvents(producer: KafkaProducer[Array[Byte], Array[Byte]], topic: String, messages: List[String]) { http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index fbae398..383cb44 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -20,6 +20,7 @@ import kafka.log.Log import kafka.zk.ZooKeeperTestHarness import junit.framework.Assert._ import kafka.utils.{ZkUtils, TestUtils} +import kafka.utils.ZkUtils._ import kafka.server.{KafkaServer, KafkaConfig} import org.junit.Test import java.util.Properties @@ -33,8 +34,8 @@ class DeleteTopicTest extends ZooKeeperTestHarness { val topic = topicAndPartition.topic val servers = createTestTopicAndCluster(topic) // start topic deletion - AdminUtils.deleteTopic(zkClient, topic) - TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers) + AdminUtils.deleteTopic(zkUtils, topic) + TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers) servers.foreach(_.shutdown()) } @@ -44,22 +45,22 @@ class DeleteTopicTest extends ZooKeeperTestHarness { val topic = topicAndPartition.topic val servers = createTestTopicAndCluster(topic) // shut down one follower replica - val leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0) + val leaderIdOpt = zkUtils.getLeaderForPartition(topic, 0) assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined) val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last follower.shutdown() // start topic deletion - AdminUtils.deleteTopic(zkClient, topic) + AdminUtils.deleteTopic(zkUtils, topic) // check if all replicas but the one that is shut down has deleted the log TestUtils.waitUntilTrue(() => servers.filter(s => s.config.brokerId != follower.config.brokerId) .forall(_.getLogManager().getLog(topicAndPartition).isEmpty), "Replicas 0,1 have not deleted log.") // ensure topic deletion is halted - TestUtils.waitUntilTrue(() => ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)), + TestUtils.waitUntilTrue(() => zkUtils.pathExists(getDeleteTopicPath(topic)), "Admin path /admin/delete_topic/test path deleted even when a follower replica is down") // restart follower replica follower.startup() - TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers) + TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers) servers.foreach(_.shutdown()) } @@ -68,25 +69,25 @@ class DeleteTopicTest extends ZooKeeperTestHarness { val topicAndPartition = TopicAndPartition("test", 0) val topic = topicAndPartition.topic val servers = createTestTopicAndCluster(topic) - val controllerId = ZkUtils.getController(zkClient) + val controllerId = zkUtils.getController() val controller = servers.filter(s => s.config.brokerId == controllerId).head - val leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0) + val leaderIdOpt = zkUtils.getLeaderForPartition(topic, 0) val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get && s.config.brokerId != controllerId).last follower.shutdown() // start topic deletion - AdminUtils.deleteTopic(zkClient, topic) + AdminUtils.deleteTopic(zkUtils, topic) // shut down the controller to trigger controller failover during delete topic controller.shutdown() // ensure topic deletion is halted - TestUtils.waitUntilTrue(() => ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)), + TestUtils.waitUntilTrue(() => zkUtils.pathExists(getDeleteTopicPath(topic)), "Admin path /admin/delete_topic/test path deleted even when a replica is down") controller.startup() follower.startup() - TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers) + TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers) servers.foreach(_.shutdown()) } @@ -101,37 +102,37 @@ class DeleteTopicTest extends ZooKeeperTestHarness { val allServers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId)) // create the topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment) // wait until replica log is created on every broker TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined), "Replicas for topic test not created.") - val leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0) + val leaderIdOpt = zkUtils.getLeaderForPartition(topic, 0) assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined) val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last follower.shutdown() // start topic deletion - AdminUtils.deleteTopic(zkClient, topic) + AdminUtils.deleteTopic(zkUtils, topic) // start partition reassignment at the same time right after delete topic. In this case, reassignment will fail since // the topic is being deleted // reassign partition 0 - val oldAssignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, 0) + val oldAssignedReplicas = zkUtils.getReplicasForPartition(topic, 0) val newReplicas = Seq(1, 2, 3) - val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas)) + val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, Map(topicAndPartition -> newReplicas)) assertTrue("Partition reassignment should fail for [test,0]", reassignPartitionsCommand.reassignPartitions()) // wait until reassignment is completed TestUtils.waitUntilTrue(() => { - val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas); - ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas, + val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas); + ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkUtils, topicAndPartition, newReplicas, Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentFailed; }, "Partition reassignment shouldn't complete.") - val controllerId = ZkUtils.getController(zkClient) + val controllerId = zkUtils.getController() val controller = servers.filter(s => s.config.brokerId == controllerId).head assertFalse("Partition reassignment should fail", controller.kafkaController.controllerContext.partitionsBeingReassigned.contains(topicAndPartition)) - val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, 0) + val assignedReplicas = zkUtils.getReplicasForPartition(topic, 0) assertEquals("Partition should not be reassigned to 0, 1, 2", oldAssignedReplicas, assignedReplicas) follower.startup() - TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers) + TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers) allServers.foreach(_.shutdown()) } @@ -139,18 +140,18 @@ class DeleteTopicTest extends ZooKeeperTestHarness { def testDeleteTopicDuringAddPartition() { val topic = "test" val servers = createTestTopicAndCluster(topic) - val leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0) + val leaderIdOpt = zkUtils.getLeaderForPartition(topic, 0) assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined) val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last val newPartition = TopicAndPartition(topic, 1) follower.shutdown() // add partitions to topic - AdminUtils.addPartitions(zkClient, topic, 2, "0:1:2,0:1:2", false) + AdminUtils.addPartitions(zkUtils, topic, 2, "0:1:2,0:1:2", false) // start topic deletion - AdminUtils.deleteTopic(zkClient, topic) + AdminUtils.deleteTopic(zkUtils, topic) follower.startup() // test if topic deletion is resumed - TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers) + TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers) // verify that new partition doesn't exist on any broker either TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(newPartition).isEmpty), @@ -164,11 +165,11 @@ class DeleteTopicTest extends ZooKeeperTestHarness { val topicAndPartition = TopicAndPartition(topic, 0) val servers = createTestTopicAndCluster(topic) // start topic deletion - AdminUtils.deleteTopic(zkClient, topic) + AdminUtils.deleteTopic(zkUtils, topic) // add partitions to topic val newPartition = TopicAndPartition(topic, 1) - AdminUtils.addPartitions(zkClient, topic, 2, "0:1:2,0:1:2") - TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers) + AdminUtils.addPartitions(zkUtils, topic, 2, "0:1:2,0:1:2") + TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers) // verify that new partition doesn't exist on any broker either assertTrue("Replica logs not deleted after delete topic is complete", servers.forall(_.getLogManager().getLog(newPartition).isEmpty)) @@ -182,12 +183,12 @@ class DeleteTopicTest extends ZooKeeperTestHarness { val topicAndPartition = TopicAndPartition(topic, 0) val servers = createTestTopicAndCluster(topic) // start topic deletion - AdminUtils.deleteTopic(zkClient, topic) - TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers) + AdminUtils.deleteTopic(zkUtils, topic) + TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers) // re-create topic on same replicas - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment) // wait until leader is elected - val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) + val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0, 1000) assertTrue("New leader should be elected after re-creating topic test", leaderIdOpt.isDefined) // check if all replica logs are created TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined), @@ -201,16 +202,16 @@ class DeleteTopicTest extends ZooKeeperTestHarness { val topic = topicAndPartition.topic val servers = createTestTopicAndCluster(topic) // start topic deletion - AdminUtils.deleteTopic(zkClient, "test2") + AdminUtils.deleteTopic(zkUtils, "test2") // verify delete topic path for test2 is removed from zookeeper - TestUtils.verifyTopicDeletion(zkClient, "test2", 1, servers) + TestUtils.verifyTopicDeletion(zkUtils, "test2", 1, servers) // verify that topic test is untouched TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined), "Replicas for topic test not created") // test the topic path exists - assertTrue("Topic test mistakenly deleted", ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) + assertTrue("Topic test mistakenly deleted", zkUtils.pathExists(getTopicPath(topic))) // topic test should have a leader - val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) + val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0, 1000) assertTrue("Leader should exist for topic test", leaderIdOpt.isDefined) servers.foreach(_.shutdown()) } @@ -242,8 +243,8 @@ class DeleteTopicTest extends ZooKeeperTestHarness { server.logManager.cleaner.awaitCleaned(topicName,0,0) // delete topic - AdminUtils.deleteTopic(zkClient, "test") - TestUtils.verifyTopicDeletion(zkClient, "test", 1, servers) + AdminUtils.deleteTopic(zkUtils, "test") + TestUtils.verifyTopicDeletion(zkUtils, "test", 1, servers) servers.foreach(_.shutdown()) } @@ -256,16 +257,16 @@ class DeleteTopicTest extends ZooKeeperTestHarness { try { // start topic deletion - AdminUtils.deleteTopic(zkClient, topic) + AdminUtils.deleteTopic(zkUtils, topic) // try to delete topic marked as deleted - AdminUtils.deleteTopic(zkClient, topic) + AdminUtils.deleteTopic(zkUtils, topic) fail("Expected TopicAlreadyMarkedForDeletionException") } catch { case e: TopicAlreadyMarkedForDeletionException => // expected exception } - TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers) + TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers) servers.foreach(_.shutdown()) } @@ -283,7 +284,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness { // create brokers val servers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) // create the topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment) // wait until replica log is created on every broker TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined), "Replicas for topic test not created") http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala index d4fa0d5..cab4813 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala @@ -23,7 +23,7 @@ import kafka.utils.TestUtils import kafka.zk.ZooKeeperTestHarness import kafka.server.ConfigType import kafka.admin.TopicCommand.TopicCommandOptions -import kafka.utils.ZkUtils +import kafka.utils.ZkUtils._ import kafka.coordinator.ConsumerCoordinator class TopicCommandTest extends ZooKeeperTestHarness with Logging { @@ -36,25 +36,25 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging { val cleanupVal = "compact" // create brokers val brokers = List(0, 1, 2) - TestUtils.createBrokersInZk(zkClient, zkConnection, brokers) + TestUtils.createBrokersInZk(zkUtils, brokers) // create the topic val createOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString, "--replication-factor", "1", "--config", cleanupKey + "=" + cleanupVal, "--topic", topic)) - TopicCommand.createTopic(zkClient, createOpts) - val props = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, topic) + TopicCommand.createTopic(zkUtils, createOpts) + val props = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic) assertTrue("Properties after creation don't contain " + cleanupKey, props.containsKey(cleanupKey)) assertTrue("Properties after creation have incorrect value", props.getProperty(cleanupKey).equals(cleanupVal)) // pre-create the topic config changes path to avoid a NoNodeException - ZkUtils.createPersistentPath(zkClient, ZkUtils.EntityConfigChangesPath) + zkUtils.createPersistentPath(EntityConfigChangesPath) // modify the topic to add new partitions val numPartitionsModified = 3 val alterOpts = new TopicCommandOptions(Array("--partitions", numPartitionsModified.toString, "--topic", topic)) - TopicCommand.alterTopic(zkClient, alterOpts) - val newProps = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, topic) + TopicCommand.alterTopic(zkUtils, alterOpts) + val newProps = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic) assertTrue("Updated properties do not contain " + cleanupKey, newProps.containsKey(cleanupKey)) assertTrue("Updated properties have incorrect value", newProps.getProperty(cleanupKey).equals(cleanupVal)) } @@ -67,34 +67,34 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging { // create brokers val brokers = List(0, 1, 2) - TestUtils.createBrokersInZk(zkClient, zkConnection, brokers) + TestUtils.createBrokersInZk(zkUtils, brokers) // create the NormalTopic val createOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString, "--replication-factor", "1", "--topic", normalTopic)) - TopicCommand.createTopic(zkClient, createOpts) + TopicCommand.createTopic(zkUtils, createOpts) // delete the NormalTopic val deleteOpts = new TopicCommandOptions(Array("--topic", normalTopic)) - val deletePath = ZkUtils.getDeleteTopicPath(normalTopic) - assertFalse("Delete path for topic shouldn't exist before deletion.", zkClient.exists(deletePath)) - TopicCommand.deleteTopic(zkClient, deleteOpts) - assertTrue("Delete path for topic should exist after deletion.", zkClient.exists(deletePath)) + val deletePath = getDeleteTopicPath(normalTopic) + assertFalse("Delete path for topic shouldn't exist before deletion.", zkUtils.zkClient.exists(deletePath)) + TopicCommand.deleteTopic(zkUtils, deleteOpts) + assertTrue("Delete path for topic should exist after deletion.", zkUtils.zkClient.exists(deletePath)) // create the offset topic val createOffsetTopicOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString, "--replication-factor", "1", "--topic", ConsumerCoordinator.OffsetsTopicName)) - TopicCommand.createTopic(zkClient, createOffsetTopicOpts) + TopicCommand.createTopic(zkUtils, createOffsetTopicOpts) // try to delete the OffsetManager.OffsetsTopicName and make sure it doesn't val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", ConsumerCoordinator.OffsetsTopicName)) - val deleteOffsetTopicPath = ZkUtils.getDeleteTopicPath(ConsumerCoordinator.OffsetsTopicName) - assertFalse("Delete path for topic shouldn't exist before deletion.", zkClient.exists(deleteOffsetTopicPath)) + val deleteOffsetTopicPath = getDeleteTopicPath(ConsumerCoordinator.OffsetsTopicName) + assertFalse("Delete path for topic shouldn't exist before deletion.", zkUtils.zkClient.exists(deleteOffsetTopicPath)) intercept[AdminOperationException] { - TopicCommand.deleteTopic(zkClient, deleteOffsetTopicOpts) + TopicCommand.deleteTopic(zkUtils, deleteOffsetTopicOpts) } - assertFalse("Delete path for topic shouldn't exist after deletion.", zkClient.exists(deleteOffsetTopicPath)) + assertFalse("Delete path for topic shouldn't exist after deletion.", zkUtils.zkClient.exists(deleteOffsetTopicPath)) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala index d43778e..50496f0 100644 --- a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala +++ b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala @@ -43,10 +43,10 @@ class ZkNodeChangeNotificationListenerTest extends KafkaServerTestHarness { val notificationMessage2 = "message2" val changeExpirationMs = 100 - val notificationListener = new ZkNodeChangeNotificationListener(zkClient, seqNodeRoot, seqNodePrefix, notificationHandler, changeExpirationMs) + val notificationListener = new ZkNodeChangeNotificationListener(zkUtils, seqNodeRoot, seqNodePrefix, notificationHandler, changeExpirationMs) notificationListener.init() - ZkUtils.createSequentialPersistentPath(zkClient, seqNodePath, notificationMessage1) + zkUtils.createSequentialPersistentPath(seqNodePath, notificationMessage1) TestUtils.waitUntilTrue(() => notificationHandler.invocationCount == 1 && notificationHandler.notification == notificationMessage1, "failed to send/process notification message in the timeout period.") @@ -55,7 +55,7 @@ class ZkNodeChangeNotificationListenerTest extends KafkaServerTestHarness { Assert.assertEquals(1, ZkUtils.getChildren(zkClient, seqNodeRoot).size) however even after that the assertion can fail as the second node it self can be deleted depending on how threads get scheduled.*/ - ZkUtils.createSequentialPersistentPath(zkClient, seqNodePath, notificationMessage2) + zkUtils.createSequentialPersistentPath(seqNodePath, notificationMessage2) TestUtils.waitUntilTrue(() => notificationHandler.invocationCount == 2 && notificationHandler.notification == notificationMessage2, "failed to send/process notification message in the timeout period.") } } http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala index ca63c80..b8ad15b 100755 --- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala @@ -57,7 +57,7 @@ class ConsumerIteratorTest extends KafkaServerTestHarness { new AtomicLong(0), new AtomicInteger(0), "")) - createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)), servers = servers) + createTopic(zkUtils, topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)), servers = servers) } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala index 6c22e8b..818229c 100644 --- a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala @@ -48,9 +48,9 @@ class PartitionAssignorTest extends Logging { ("g1c" + consumer, WildcardSubscriptionInfo(streamCount, ".*", isWhitelist = true)) }).toSeq:_*) val scenario = Scenario("g1", topicPartitionCounts, subscriptions) - val zkClient = PartitionAssignorTest.setupZkClientMock(scenario) - EasyMock.replay(zkClient) - PartitionAssignorTest.assignAndVerify(scenario, assignor, zkClient, verifyAssignmentIsUniform = true) + val zkUtils = PartitionAssignorTest.setupZkClientMock(scenario) + EasyMock.replay(zkUtils.zkClient) + PartitionAssignorTest.assignAndVerify(scenario, assignor, zkUtils, verifyAssignmentIsUniform = true) }) } @@ -73,10 +73,10 @@ class PartitionAssignorTest extends Logging { ("g1c" + consumer, StaticSubscriptionInfo(streamCounts)) }).toSeq:_*) val scenario = Scenario("g1", topicPartitionCounts, subscriptions) - val zkClient = PartitionAssignorTest.setupZkClientMock(scenario) - EasyMock.replay(zkClient) + val zkUtils = PartitionAssignorTest.setupZkClientMock(scenario) + EasyMock.replay(zkUtils.zkClient) - PartitionAssignorTest.assignAndVerify(scenario, assignor, zkClient) + PartitionAssignorTest.assignAndVerify(scenario, assignor, zkUtils) }) } } @@ -135,6 +135,7 @@ private object PartitionAssignorTest extends Logging { val consumers = java.util.Arrays.asList(scenario.subscriptions.keys.toSeq:_*) val zkClient = EasyMock.createStrictMock(classOf[ZkClient]) + val zkUtils = ZkUtils(zkClient, false) EasyMock.checkOrder(zkClient, false) EasyMock.expect(zkClient.getChildren("/consumers/%s/ids".format(scenario.group))).andReturn(consumers) @@ -149,21 +150,21 @@ private object PartitionAssignorTest extends Logging { scenario.topicPartitionCounts.foreach { case(topic, partitionCount) => val replicaAssignment = Map((0 until partitionCount).map(partition => (partition.toString, Seq(0))):_*) EasyMock.expect(zkClient.readData("/brokers/topics/%s".format(topic), new Stat())) - .andReturn(ZkUtils.replicaAssignmentZkData(replicaAssignment)) + .andReturn(zkUtils.replicaAssignmentZkData(replicaAssignment)) EasyMock.expectLastCall().anyTimes() } - EasyMock.expect(zkClient.getChildren("/brokers/topics")).andReturn( + EasyMock.expect(zkUtils.zkClient.getChildren("/brokers/topics")).andReturn( java.util.Arrays.asList(scenario.topicPartitionCounts.keys.toSeq:_*)) EasyMock.expectLastCall().anyTimes() - zkClient + zkUtils } - private def assignAndVerify(scenario: Scenario, assignor: PartitionAssignor, zkClient: ZkClient, + private def assignAndVerify(scenario: Scenario, assignor: PartitionAssignor, zkUtils: ZkUtils, verifyAssignmentIsUniform: Boolean = false) { val assignments = scenario.subscriptions.map{ case(consumer, subscription) => - val ctx = new AssignmentContext("g1", consumer, excludeInternalTopics = true, zkClient) + val ctx = new AssignmentContext("g1", consumer, excludeInternalTopics = true, zkUtils) assignor.assign(ctx).get(consumer) } http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index cb59542..28b1dd5 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -97,8 +97,8 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging sendMessages(servers, topic, nMessages, 1) // wait to make sure the topic and partition have a leader for the successful case - waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) - waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) + waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0) + waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 1) TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0) TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1) @@ -130,8 +130,8 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging val sentMessages2 = sendMessages(servers, topic, nMessages, 0) ++ sendMessages(servers, topic, nMessages, 1) - waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) - waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) + waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0) + waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 1) val receivedMessages2 = getMessages(topicMessageStreams1, nMessages) ++ getMessages(topicMessageStreams2, nMessages) assertEquals(sentMessages2.sorted, receivedMessages2.sorted) @@ -151,8 +151,8 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging val sentMessages3 = sendMessages(servers, topic, nMessages, 0) ++ sendMessages(servers, topic, nMessages, 1) - waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) - waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) + waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0) + waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 1) val receivedMessages3 = getMessages(topicMessageStreams1, nMessages) ++ getMessages(topicMessageStreams2, nMessages) assertEquals(sentMessages3.sorted, receivedMessages3.sorted) @@ -185,8 +185,8 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging val sentMessages1 = sendMessages(servers, topic, nMessages, 0, GZIPCompressionCodec) ++ sendMessages(servers, topic, nMessages, 1, GZIPCompressionCodec) - waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) - waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) + waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0) + waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 1) TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0) TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1) @@ -218,8 +218,8 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging val sentMessages2 = sendMessages(servers, topic, nMessages, 0, GZIPCompressionCodec) ++ sendMessages(servers, topic, nMessages, 1, GZIPCompressionCodec) - waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) - waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) + waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0) + waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 1) val receivedMessages2 = getMessages(topicMessageStreams1, nMessages) ++ getMessages(topicMessageStreams2, nMessages) assertEquals(sentMessages2.sorted, receivedMessages2.sorted) @@ -239,8 +239,8 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging val sentMessages3 = sendMessages(servers, topic, nMessages, 0, GZIPCompressionCodec) ++ sendMessages(servers, topic, nMessages, 1, GZIPCompressionCodec) - waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) - waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) + waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0) + waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 1) val receivedMessages3 = getMessages(topicMessageStreams1, nMessages) ++ getMessages(topicMessageStreams2, nMessages) assertEquals(sentMessages3.sorted, receivedMessages3.sorted) @@ -294,8 +294,8 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1)) - waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) - waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) + waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0) + waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 1) val zkConsumerConnector = new ZookeeperConsumerConnector(consumerConfig, true) @@ -322,10 +322,10 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging @Test def testLeaderSelectionForPartition() { - val zkClient = ZkUtils.createZkClient(zkConnect, 6000, 30000) + val zkUtils = ZkUtils(zkConnect, 6000, 30000, false) // create topic topic1 with 1 partition on broker 0 - createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers) + createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = servers) // send some messages to each broker val sentMessages1 = sendMessages(servers, topic, nMessages) @@ -349,7 +349,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging val receivedMessages1 = getMessages(topicMessageStreams1, nMessages) assertEquals(sentMessages1, receivedMessages1) zkConsumerConnector1.shutdown() - zkClient.close() + zkUtils.close() } @Test @@ -416,14 +416,14 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging } def getZKChildrenValues(path : String) : Seq[Tuple2[String,String]] = { - val children = zkClient.getChildren(path) + val children = zkUtils.zkClient.getChildren(path) Collections.sort(children) val childrenAsSeq : Seq[java.lang.String] = { import scala.collection.JavaConversions._ children.toSeq } childrenAsSeq.map(partition => - (partition, zkClient.readData(path + "/" + partition).asInstanceOf[String])) + (partition, zkUtils.zkClient.readData(path + "/" + partition).asInstanceOf[String])) } private class TestConsumerRebalanceListener extends ConsumerRebalanceListener { http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala index c93eca5..91ac1f6 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala @@ -75,7 +75,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging { } } // Create topic with one partition - kafka.admin.AdminUtils.createTopic(controller.zkClient, topic, 1, 1) + kafka.admin.AdminUtils.createTopic(controller.zkUtils, topic, 1, 1) val topicPartition = TopicAndPartition("topic1", 0) var partitions = controller.kafkaController.partitionStateMachine.partitionsInState(OnlinePartition) while (!partitions.contains(topicPartition)) { http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala index d8a7948..3bc37e5 100644 --- a/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala @@ -18,7 +18,8 @@ package kafka.coordinator import kafka.server.KafkaConfig -import kafka.utils.{ZkUtils, TestUtils} +import kafka.utils.{TestUtils, ZkUtils} +import kafka.utils.ZkUtils._ import org.junit.Assert._ import org.I0Itec.zkclient.{IZkDataListener, ZkClient} @@ -33,14 +34,15 @@ import org.scalatest.junit.JUnitSuite class CoordinatorMetadataTest extends JUnitSuite { val DefaultNumPartitions = 8 val DefaultNumReplicas = 2 - var zkClient: ZkClient = null + var zkUtils: ZkUtils = null var coordinatorMetadata: CoordinatorMetadata = null @Before def setUp() { val props = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "") - zkClient = EasyMock.createStrictMock(classOf[ZkClient]) - coordinatorMetadata = new CoordinatorMetadata(KafkaConfig.fromProps(props).brokerId, zkClient, null) + val zkClient = EasyMock.createStrictMock(classOf[ZkClient]) + zkUtils = ZkUtils(zkClient, false) + coordinatorMetadata = new CoordinatorMetadata(KafkaConfig.fromProps(props).brokerId, zkUtils, null) } @Test @@ -77,8 +79,8 @@ class CoordinatorMetadataTest extends JUnitSuite { val topics = Set("a") coordinatorMetadata.addGroup(groupId, "range") - expectZkClientSubscribeDataChanges(zkClient, topics) - EasyMock.replay(zkClient) + expectZkClientSubscribeDataChanges(zkUtils, topics) + EasyMock.replay(zkUtils.zkClient) coordinatorMetadata.bindGroupToTopics(groupId, topics) assertEquals(Map("a" -> DefaultNumPartitions), coordinatorMetadata.partitionsPerTopic) } @@ -91,8 +93,8 @@ class CoordinatorMetadataTest extends JUnitSuite { coordinatorMetadata.addGroup(group1, "range") coordinatorMetadata.addGroup(group2, "range") - expectZkClientSubscribeDataChanges(zkClient, topics) - EasyMock.replay(zkClient) + expectZkClientSubscribeDataChanges(zkUtils, topics) + EasyMock.replay(zkUtils.zkClient) coordinatorMetadata.bindGroupToTopics(group1, topics) coordinatorMetadata.bindGroupToTopics(group2, topics) assertEquals(Map("a" -> DefaultNumPartitions), coordinatorMetadata.partitionsPerTopic) @@ -111,8 +113,8 @@ class CoordinatorMetadataTest extends JUnitSuite { val topics = Set("a") coordinatorMetadata.addGroup(groupId, "range") - expectZkClientSubscribeDataChanges(zkClient, topics) - EasyMock.replay(zkClient) + expectZkClientSubscribeDataChanges(zkUtils, topics) + EasyMock.replay(zkUtils.zkClient) coordinatorMetadata.bindGroupToTopics(groupId, topics) coordinatorMetadata.unbindGroupFromTopics(groupId, Set("b")) assertEquals(Map("a" -> DefaultNumPartitions), coordinatorMetadata.partitionsPerTopic) @@ -126,8 +128,8 @@ class CoordinatorMetadataTest extends JUnitSuite { coordinatorMetadata.addGroup(group1, "range") coordinatorMetadata.addGroup(group2, "range") - expectZkClientSubscribeDataChanges(zkClient, topics) - EasyMock.replay(zkClient) + expectZkClientSubscribeDataChanges(zkUtils, topics) + EasyMock.replay(zkUtils.zkClient) coordinatorMetadata.bindGroupToTopics(group1, topics) coordinatorMetadata.bindGroupToTopics(group2, topics) coordinatorMetadata.unbindGroupFromTopics(group1, topics) @@ -140,9 +142,9 @@ class CoordinatorMetadataTest extends JUnitSuite { val topics = Set("a") coordinatorMetadata.addGroup(groupId, "range") - expectZkClientSubscribeDataChanges(zkClient, topics) - expectZkClientUnsubscribeDataChanges(zkClient, topics) - EasyMock.replay(zkClient) + expectZkClientSubscribeDataChanges(zkUtils, topics) + expectZkClientUnsubscribeDataChanges(zkUtils.zkClient, topics) + EasyMock.replay(zkUtils.zkClient) coordinatorMetadata.bindGroupToTopics(groupId, topics) coordinatorMetadata.unbindGroupFromTopics(groupId, topics) assertEquals(Map.empty[String, Int], coordinatorMetadata.partitionsPerTopic) @@ -163,8 +165,8 @@ class CoordinatorMetadataTest extends JUnitSuite { coordinatorMetadata.addGroup(group1, "range") coordinatorMetadata.addGroup(group2, "range") - expectZkClientSubscribeDataChanges(zkClient, topics) - EasyMock.replay(zkClient) + expectZkClientSubscribeDataChanges(zkUtils, topics) + EasyMock.replay(zkUtils.zkClient) coordinatorMetadata.bindGroupToTopics(group1, topics) coordinatorMetadata.bindGroupToTopics(group2, topics) coordinatorMetadata.removeGroup(group1, topics) @@ -179,17 +181,17 @@ class CoordinatorMetadataTest extends JUnitSuite { val topics = Set("a") coordinatorMetadata.addGroup(groupId, "range") - expectZkClientSubscribeDataChanges(zkClient, topics) - expectZkClientUnsubscribeDataChanges(zkClient, topics) - EasyMock.replay(zkClient) + expectZkClientSubscribeDataChanges(zkUtils, topics) + expectZkClientUnsubscribeDataChanges(zkUtils.zkClient, topics) + EasyMock.replay(zkUtils.zkClient) coordinatorMetadata.bindGroupToTopics(groupId, topics) coordinatorMetadata.removeGroup(groupId, topics) assertNull(coordinatorMetadata.getGroup(groupId)) assertEquals(Map.empty[String, Int], coordinatorMetadata.partitionsPerTopic) } - private def expectZkClientSubscribeDataChanges(zkClient: ZkClient, topics: Set[String]) { - topics.foreach(topic => expectZkClientSubscribeDataChange(zkClient, topic)) + private def expectZkClientSubscribeDataChanges(zkUtils: ZkUtils, topics: Set[String]) { + topics.foreach(topic => expectZkClientSubscribeDataChange(zkUtils.zkClient, topic)) } private def expectZkClientUnsubscribeDataChanges(zkClient: ZkClient, topics: Set[String]) { @@ -200,14 +202,14 @@ class CoordinatorMetadataTest extends JUnitSuite { val replicaAssignment = (0 until DefaultNumPartitions) .map(partition => partition.toString -> (0 until DefaultNumReplicas).toSeq).toMap - val topicPath = ZkUtils.getTopicPath(topic) + val topicPath = getTopicPath(topic) EasyMock.expect(zkClient.readData(topicPath, new Stat())) - .andReturn(ZkUtils.replicaAssignmentZkData(replicaAssignment)) + .andReturn(zkUtils.replicaAssignmentZkData(replicaAssignment)) zkClient.subscribeDataChanges(EasyMock.eq(topicPath), EasyMock.isA(classOf[IZkDataListener])) } private def expectZkClientUnsubscribeDataChange(zkClient: ZkClient, topic: String) { - val topicPath = ZkUtils.getTopicPath(topic) + val topicPath = getTopicPath(topic) zkClient.unsubscribeDataChanges(EasyMock.eq(topicPath), EasyMock.isA(classOf[IZkDataListener])) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala index 818673f..a71ddf1 100644 --- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala +++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala @@ -76,7 +76,7 @@ class AutoOffsetResetTest extends KafkaServerTestHarness with Logging { * Returns the count of messages received. */ def resetAndConsume(numMessages: Int, resetTo: String, offset: Long): Int = { - TestUtils.createTopic(zkClient, topic, 1, 1, servers) + TestUtils.createTopic(zkUtils, topic, 1, 1, servers) val producer: Producer[String, Array[Byte]] = TestUtils.createProducer( TestUtils.getBrokerListStrFromServers(servers), http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala index 20a4068..3cf4dae 100644 --- a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala @@ -62,7 +62,7 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness { def testTopicMetadataRequest { // create topic val topic = "test" - AdminUtils.createTopic(zkClient, topic, 1, 1) + AdminUtils.createTopic(zkUtils, topic, 1, 1) // create a topic metadata request val topicMetadataRequest = new TopicMetadataRequest(List(topic), 0) @@ -79,7 +79,7 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness { def testBasicTopicMetadata { // create topic val topic = "test" - createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server1)) + createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server1)) val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata", 2000,0).topicsMetadata @@ -98,8 +98,8 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness { // create topic val topic1 = "testGetAllTopicMetadata1" val topic2 = "testGetAllTopicMetadata2" - createTopic(zkClient, topic1, numPartitions = 1, replicationFactor = 1, servers = Seq(server1)) - createTopic(zkClient, topic2, numPartitions = 1, replicationFactor = 1, servers = Seq(server1)) + createTopic(zkUtils, topic1, numPartitions = 1, replicationFactor = 1, servers = Seq(server1)) + createTopic(zkUtils, topic2, numPartitions = 1, replicationFactor = 1, servers = Seq(server1)) // issue metadata request with empty list of topics val topicsMetadata = ClientUtils.fetchTopicMetadata(Set.empty, brokerEndPoints, "TopicMetadataTest-testGetAllTopicMetadata", @@ -130,7 +130,7 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness { assertEquals(0, topicsMetadata.head.partitionsMetadata.size) // wait for leader to be elected - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) + TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0) TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic, 0) // retry the metadata for the auto created topic @@ -159,7 +159,7 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness { assertEquals("Expecting InvalidTopicCode for topic2 metadata", ErrorMapping.InvalidTopicCode, topicsMetadata(1).errorCode) // wait for leader to be elected - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 0) + TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 0) TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic1, 0) // retry the metadata for the first auto created topic @@ -218,7 +218,7 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness { // create topic val topic: String = "test" - AdminUtils.createTopic(zkClient, topic, 1, numBrokers) + AdminUtils.createTopic(zkUtils, topic, 1, numBrokers) // shutdown a broker adHocServers.last.shutdown() http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/integration/FetcherTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala index c061597..5af5d1a 100644 --- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala +++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala @@ -42,11 +42,11 @@ class FetcherTest extends KafkaServerTestHarness { @Before override def setUp() { super.setUp - TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)), servers = servers) + TestUtils.createTopic(zkUtils, topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)), servers = servers) val cluster = new Cluster(servers.map(s => new Broker(s.config.brokerId, "localhost", s.boundPort()))) - fetcher = new ConsumerFetcherManager("consumer1", new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), zkClient) + fetcher = new ConsumerFetcherManager("consumer1", new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), zkUtils) fetcher.stopConnections() val topicInfos = configs.map(c => new PartitionTopicInfo(topic, http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index e6f0c54..df752db 100755 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -113,7 +113,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar private def produceAndMultiFetch(producer: Producer[String, String]) { for(topic <- List("test1", "test2", "test3", "test4")) - TestUtils.createTopic(zkClient, topic, servers = servers) + TestUtils.createTopic(zkUtils, topic, servers = servers) // send some messages val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); @@ -182,7 +182,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar private def multiProduce(producer: Producer[String, String]) { val topics = Map("test4" -> 0, "test1" -> 0, "test2" -> 0, "test3" -> 0) - topics.keys.map(topic => TestUtils.createTopic(zkClient, topic, servers = servers)) + topics.keys.map(topic => TestUtils.createTopic(zkUtils, topic, servers = servers)) val messages = new mutable.HashMap[String, Seq[String]] val builder = new FetchRequestBuilder() @@ -210,7 +210,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar @Test def testConsumerEmptyTopic() { val newTopic = "new-topic" - TestUtils.createTopic(zkClient, newTopic, numPartitions = 1, replicationFactor = 1, servers = servers) + TestUtils.createTopic(zkUtils, newTopic, numPartitions = 1, replicationFactor = 1, servers = servers) val fetchResponse = consumer.fetch(new FetchRequestBuilder().addFetch(newTopic, 0, 0, 10000).build()) assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext) @@ -219,7 +219,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar @Test def testPipelinedProduceRequests() { val topics = Map("test4" -> 0, "test1" -> 0, "test2" -> 0, "test3" -> 0) - topics.keys.map(topic => TestUtils.createTopic(zkClient, topic, servers = servers)) + topics.keys.map(topic => TestUtils.createTopic(zkUtils, topic, servers = servers)) val props = new Properties() props.put("request.required.acks", "0") val pipelinedProducer: Producer[String, String] = http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala index 4d73be1..b931568 100755 --- a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala +++ b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala @@ -56,10 +56,10 @@ class RollingBounceTest extends ZooKeeperTestHarness { val topic4 = "new-topic4" // create topics with 1 partition, 2 replicas, one on each broker - createTopic(zkClient, topic1, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers) - createTopic(zkClient, topic2, partitionReplicaAssignment = Map(0->Seq(1,2)), servers = servers) - createTopic(zkClient, topic3, partitionReplicaAssignment = Map(0->Seq(2,3)), servers = servers) - createTopic(zkClient, topic4, partitionReplicaAssignment = Map(0->Seq(0,3)), servers = servers) + createTopic(zkUtils, topic1, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers) + createTopic(zkUtils, topic2, partitionReplicaAssignment = Map(0->Seq(1,2)), servers = servers) + createTopic(zkUtils, topic3, partitionReplicaAssignment = Map(0->Seq(2,3)), servers = servers) + createTopic(zkUtils, topic4, partitionReplicaAssignment = Map(0->Seq(0,3)), servers = servers) // Do a rolling bounce and check if leader transitions happen correctly @@ -86,7 +86,7 @@ class RollingBounceTest extends ZooKeeperTestHarness { servers((startIndex + 1) % 4).shutdown() prevLeader = (startIndex + 1) % 4 } - var newleader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId) + var newleader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId) // Ensure the new leader is different from the old assertTrue("Leader transition did not happen for " + topic, newleader.getOrElse(-1) != -1 && (newleader.getOrElse(-1) != prevLeader)) // Start the server back up again