Updated Branches: refs/heads/trunk 2cda5d1fc -> db37ed005
http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index b4a57c6..b0348bb 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -38,7 +38,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg var logManager: LogManager = null var kafkaZookeeper: KafkaZooKeeper = null var replicaManager: ReplicaManager = null - private var apis: KafkaApis = null + var apis: KafkaApis = null var kafkaController: KafkaController = null val kafkaScheduler = new KafkaScheduler(4) var zkClient: ZkClient = null @@ -112,6 +112,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg info("shutting down") val canShutdown = isShuttingDown.compareAndSet(false, true); if (canShutdown) { + if(kafkaZookeeper != null) + Utils.swallow(kafkaZookeeper.shutdown()) if(socketServer != null) Utils.swallow(socketServer.shutdown()) if(requestHandlerPool != null) @@ -119,8 +121,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg Utils.swallow(kafkaScheduler.shutdown()) if(apis != null) Utils.swallow(apis.close()) - if(kafkaZookeeper != null) - Utils.swallow(kafkaZookeeper.shutdown()) if(replicaManager != null) Utils.swallow(replicaManager.shutdown()) if(logManager != null) http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 03f621a..a84da13 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -19,8 +19,8 @@ package kafka.server import kafka.cluster.Broker import kafka.message.ByteBufferMessageSet -import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest, FetchResponsePartitionData} -import kafka.common.{KafkaStorageException, TopicAndPartition, ErrorMapping} +import kafka.api.{OffsetRequest, FetchResponsePartitionData} +import kafka.common.{KafkaStorageException, TopicAndPartition} class ReplicaFetcherThread(name:String, sourceBroker: Broker, http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 89ad4d7..8e49b83 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -52,7 +52,7 @@ class ReplicaManager(val config: KafkaConfig, val highWatermarkCheckpoints = config.logDirs.map(dir => (dir, new HighwaterMarkCheckpoint(dir))).toMap private var hwThreadInitialized = false this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: " - private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger) + val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger) newGauge( "LeaderCount", http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/test/scala/unit/kafka/admin/AdminTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index b0a0e09..95e7218 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -160,8 +160,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) // create leaders for all partitions TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1) - val actualReplicaAssignment = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata.map(p => p.replicas) - val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id).toList).toList + val actualReplicaList = leaderForPartitionMap.keys.toArray.map(p => (p -> ZkUtils.getReplicasForPartition(zkClient, topic, p))).toMap assertEquals(expectedReplicaAssignment.size, actualReplicaList.size) for(i <- 0 until actualReplicaList.size) assertEquals(expectedReplicaAssignment.get(i).get, actualReplicaList(i)) @@ -176,39 +175,6 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { } @Test - def testGetTopicMetadata() { - val expectedReplicaAssignment = Map( - 0 -> List(0, 1, 2), - 1 -> List(1, 2, 3) - ) - val leaderForPartitionMap = Map( - 0 -> 0, - 1 -> 1 - ) - val topic = "auto-topic" - TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3)) - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) - // create leaders for all partitions - TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1) - - val newTopicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient) - newTopicMetadata.errorCode match { - case ErrorMapping.UnknownTopicOrPartitionCode => - fail("Topic " + topic + " should've been automatically created") - case _ => - assertEquals(topic, newTopicMetadata.topic) - assertNotNull("partition metadata list cannot be null", newTopicMetadata.partitionsMetadata) - assertEquals("partition metadata list length should be 2", 2, newTopicMetadata.partitionsMetadata.size) - val actualReplicaAssignment = newTopicMetadata.partitionsMetadata.map(p => p.replicas) - val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id).toList).toList - assertEquals(expectedReplicaAssignment.size, actualReplicaList.size) - for(i <- 0 until actualReplicaList.size) { - assertEquals(expectedReplicaAssignment(i), actualReplicaList(i)) - } - } - } - - @Test def testPartitionReassignmentWithLeaderInNewReplicas() { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topic = "test" @@ -278,7 +244,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas); CheckReassignmentStatus.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas, Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted; - }, 1000) + }, 2000) val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) assertEquals("Partition should have been reassigned to 2, 3", newReplicas, assignedReplicas) // leader should be 2 @@ -360,49 +326,44 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { @Test def testShutdownBroker() { - info("inside testShutdownBroker") val expectedReplicaAssignment = Map(1 -> List(0, 1, 2)) val topic = "test" val partition = 1 // create brokers val serverConfigs = TestUtils.createBrokerConfigs(3).map(new KafkaConfig(_)) + val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s)) // create the topic AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) - val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s)) - // broker 2 should be the leader since it was started first - var leaderBeforeShutdown = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, 1000, None).get - var controllerId = ZkUtils.getController(zkClient) - var controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController + TestUtils.waitUntilMetadataIsPropagated(servers, topic, partition, 1000) + + val controllerId = ZkUtils.getController(zkClient) + val controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController var partitionsRemaining = controller.shutdownBroker(2) + var activeServers = servers.filter(s => s.config.brokerId != 2) try { + // wait for the update metadata request to trickle to the brokers + assertTrue("Topic test not created after timeout", TestUtils.waitUntilTrue(() => + activeServers.foldLeft(true)(_ && _.apis.leaderCache(TopicAndPartition(topic, partition)).leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3), 1000)) assertEquals(0, partitionsRemaining) - var topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient) - var leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id - assertTrue(leaderAfterShutdown != leaderBeforeShutdown) - // assertEquals(2, topicMetadata.partitionsMetadata.head.isr.size) - assertEquals(2, controller.controllerContext.partitionLeadershipInfo(TopicAndPartition("test", 1)).leaderAndIsr.isr.size) + var partitionStateInfo = activeServers.head.apis.leaderCache(TopicAndPartition(topic, partition)) + var leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader + assertEquals(0, leaderAfterShutdown) + assertEquals(2, partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size) + assertEquals(List(0,1), partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.isr) - leaderBeforeShutdown = leaderAfterShutdown - controllerId = ZkUtils.getController(zkClient) - controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController partitionsRemaining = controller.shutdownBroker(1) assertEquals(0, partitionsRemaining) - topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient) - leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id - assertTrue(leaderAfterShutdown != leaderBeforeShutdown) - // assertEquals(1, topicMetadata.partitionsMetadata.head.isr.size) - assertEquals(1, controller.controllerContext.partitionLeadershipInfo(TopicAndPartition("test", 1)).leaderAndIsr.isr.size) + activeServers = servers.filter(s => s.config.brokerId == 0) + partitionStateInfo = activeServers.head.apis.leaderCache(TopicAndPartition(topic, partition)) + leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader + assertEquals(0, leaderAfterShutdown) - leaderBeforeShutdown = leaderAfterShutdown - controllerId = ZkUtils.getController(zkClient) - controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController + assertTrue(servers.foldLeft(true)(_ && _.apis.leaderCache(TopicAndPartition(topic, partition)).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0)) partitionsRemaining = controller.shutdownBroker(0) assertEquals(1, partitionsRemaining) - topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient) - leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id - assertTrue(leaderAfterShutdown == leaderBeforeShutdown) - assertEquals(1, controller.controllerContext.partitionLeadershipInfo(TopicAndPartition("test", 1)).leaderAndIsr.isr.size) + // leader doesn't change since all the replicas are shut down + assertTrue(servers.foldLeft(true)(_ && _.apis.leaderCache(TopicAndPartition(topic, partition)).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0)) } finally { servers.foreach(_.shutdown()) http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index 0f15718..f43ac8f 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -87,8 +87,8 @@ object SerializationTestUtils{ def createTestLeaderAndIsrRequest() : LeaderAndIsrRequest = { val leaderAndIsr1 = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader1, 1, isr1, 1), 1) val leaderAndIsr2 = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader2, 1, isr2, 2), 1) - val map = Map(((topic1, 0), PartitionStateInfo(leaderAndIsr1, 3)), - ((topic2, 0), PartitionStateInfo(leaderAndIsr2, 3))) + val map = Map(((topic1, 0), PartitionStateInfo(leaderAndIsr1, isr1.toSet)), + ((topic2, 0), PartitionStateInfo(leaderAndIsr2, isr2.toSet))) new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[Broker](), 0, 1, 0, "") } http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index 4d989e4..fcfc583 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -23,7 +23,6 @@ import kafka.integration.KafkaServerTestHarness import kafka.server._ import scala.collection._ import org.scalatest.junit.JUnit3Suite -import org.apache.log4j.{Level, Logger} import kafka.message._ import kafka.serializer._ import kafka.admin.CreateTopicCommand @@ -31,6 +30,7 @@ import org.I0Itec.zkclient.ZkClient import kafka.utils._ import kafka.producer.{ProducerConfig, KeyedMessage, Producer} import java.util.{Collections, Properties} +import org.apache.log4j.{Logger, Level} import kafka.utils.TestUtils._ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging { @@ -97,6 +97,9 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500) + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1, 1000) + // create a consumer val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1)) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) @@ -142,7 +145,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true) val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]()) // send some messages to each broker - val sentMessages3 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages) ++ + val sentMessages3 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages) ++ sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500) @@ -167,12 +170,15 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar requestHandlerLogger.setLevel(Level.FATAL) // send some messages to each broker - val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec) ++ + val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec) ++ sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500) + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1, 1000) + // create a consumer val consumerConfig1 = new ConsumerConfig( TestUtils.createConsumerProperties(zkConnect, group, consumer1)) @@ -240,9 +246,12 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar def testCompressionSetConsumption() { // send some messages to each broker - val sentMessages = sendMessagesToBrokerPartition(configs.head, topic, 0, 200, DefaultCompressionCodec) ++ + val sentMessages = sendMessagesToBrokerPartition(configs.head, topic, 0, 200, DefaultCompressionCodec) ++ sendMessagesToBrokerPartition(configs.last, topic, 1, 200, DefaultCompressionCodec) + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1, 1000) + val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0)) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) @@ -263,9 +272,12 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar requestHandlerLogger.setLevel(Level.FATAL) // send some messages to each broker - val sentMessages = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, NoCompressionCodec) ++ + val sentMessages = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, NoCompressionCodec) ++ sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, NoCompressionCodec) + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1, 1000) + val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1)) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500) @@ -303,6 +315,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar // send some messages to each broker val sentMessages1 = sendMessages(configs.head, nMessages, "batch1", NoCompressionCodec, 1) + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000) + // create a consumer val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1)) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) @@ -321,12 +335,14 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val receivedMessages1 = getMessages(nMessages, topicMessageStreams1) assertEquals(sentMessages1, receivedMessages1) + zkConsumerConnector1.shutdown() + zkClient.close() } - def sendMessagesToBrokerPartition(config: KafkaConfig, - topic: String, - partition: Int, - numMessages: Int, + def sendMessagesToBrokerPartition(config: KafkaConfig, + topic: String, + partition: Int, + numMessages: Int, compression: CompressionCodec = NoCompressionCodec): List[String] = { val header = "test-%d-%d".format(config.brokerId, partition) val props = new Properties() http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala index 4c646f0..2317760 100644 --- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala +++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala @@ -57,7 +57,7 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L def testResetToEarliestWhenOffsetTooLow() = assertEquals(NumMessages, resetAndConsume(NumMessages, "smallest", SmallOffset)) - + def testResetToLatestWhenOffsetTooHigh() = assertEquals(0, resetAndConsume(NumMessages, "largest", LargeOffset)) @@ -69,12 +69,16 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L * Returns the count of messages received. */ def resetAndConsume(numMessages: Int, resetTo: String, offset: Long): Int = { + TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) + val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs), new DefaultEncoder(), new StringEncoder()) for(i <- 0 until numMessages) producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, "test".getBytes)) + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000) + // update offset in zookeeper for consumer to jump "forward" in time val dirs = new ZKGroupTopicDirs(group, topic) var consumerProps = TestUtils.createConsumerProperties(zkConnect, group, testConsumer) @@ -99,8 +103,10 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L } catch { case e: ConsumerTimeoutException => info("consumer timed out after receiving " + received + " messages.") + } finally { + producer.close() + consumerConnector.shutdown } - consumerConnector.shutdown received } http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala b/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala index c4866eb..c3c7631 100644 --- a/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala +++ b/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala @@ -21,12 +21,12 @@ import kafka.api.FetchRequestBuilder import kafka.message.ByteBufferMessageSet import kafka.server.{KafkaRequestHandler, KafkaConfig} import org.apache.log4j.{Level, Logger} -import org.junit.Assert._ import org.scalatest.junit.JUnit3Suite import scala.collection._ -import kafka.producer.KeyedMessage import kafka.utils._ import kafka.common.{ErrorMapping, KafkaException, OffsetOutOfRangeException} +import kafka.producer.KeyedMessage +import org.junit.Assert.assertEquals /** * End to end tests of the primitive apis against a local server @@ -63,6 +63,8 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness producer.send(producerData:_*) + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000) + var fetchedMessage: ByteBufferMessageSet = null while(fetchedMessage == null || fetchedMessage.validBytes == 0) { val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build()) @@ -90,6 +92,7 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness val producedData = List("a_" + topic, "b_" + topic) messages += topic -> producedData producer.send(producedData.map(m => new KeyedMessage[String, String](topic, topic, m)):_*) + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000) builder.addFetch(topic, offset, 0, 10000) } @@ -132,6 +135,7 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness builder.addFetch(topic, 0, 0, 10000) } producer.send(produceList: _*) + topics.foreach(topic => TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000)) // wait a bit for produced message to be available val request = builder.build() @@ -155,6 +159,7 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness builder.addFetch(topic, 0, 0, 10000) } producer.send(produceList: _*) + topics.foreach(topic => TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000)) producer.send(produceList: _*) // wait a bit for produced message to be available http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index 2fc08d3..f764151 100644 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -27,7 +27,7 @@ import org.I0Itec.zkclient.ZkClient import kafka.zk.ZooKeeperTestHarness import org.scalatest.junit.JUnit3Suite import scala.collection._ -import kafka.admin.{AdminUtils, CreateTopicCommand} +import kafka.admin.CreateTopicCommand import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException} import kafka.utils.{TestUtils, Utils} @@ -35,7 +35,7 @@ import kafka.utils.{TestUtils, Utils} * End to end tests of the primitive apis against a local server */ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with ZooKeeperTestHarness { - + val port = TestUtils.choosePort val props = TestUtils.createBrokerConfig(0, port) val config = new KafkaConfig(props) @@ -300,8 +300,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with def testConsumerEmptyTopic() { val newTopic = "new-topic" CreateTopicCommand.createTopic(zkClient, newTopic, 1, 1, config.brokerId.toString) - assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() => - AdminUtils.fetchTopicMetadataFromZk(newTopic, zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime)) + TestUtils.waitUntilMetadataIsPropagated(servers, newTopic, 0, 1000) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, newTopic, 0, 500) val fetchResponse = consumer.fetch(new FetchRequestBuilder().addFetch(newTopic, 0, 0, 10000).build()) assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext) http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala index 6db63ba..edf8555 100644 --- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala @@ -22,26 +22,27 @@ import kafka.zk.ZooKeeperTestHarness import kafka.admin.CreateTopicCommand import java.nio.ByteBuffer import junit.framework.Assert._ -import org.easymock.EasyMock -import kafka.network._ import kafka.cluster.Broker import kafka.utils.TestUtils import kafka.utils.TestUtils._ -import kafka.server.{ReplicaManager, KafkaApis, KafkaConfig} +import kafka.server.{KafkaServer, KafkaConfig} +import kafka.api.TopicMetadataRequest import kafka.common.ErrorMapping -import kafka.api.{RequestKeys, TopicMetadata, TopicMetadataResponse, TopicMetadataRequest} +import kafka.client.ClientUtils class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { val props = createBrokerConfigs(1) val configs = props.map(p => new KafkaConfig(p)) - var brokers: Seq[Broker] = null + private var server1: KafkaServer = null + val brokers = configs.map(c => new Broker(c.brokerId,c.hostName,c.port)) override def setUp() { super.setUp() - brokers = TestUtils.createBrokersInZk(zkClient, configs.map(config => config.brokerId)) + server1 = TestUtils.createServer(configs.head) } override def tearDown() { + server1.shutdown() super.tearDown() } @@ -65,16 +66,15 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { // create topic val topic = "test" CreateTopicCommand.createTopic(zkClient, topic, 1) - // set up leader for topic partition 0 - val leaderForPartitionMap = Map( - 0 -> configs.head.brokerId - ) - TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1) - val topicMetadataRequest = new TopicMetadataRequest(List(topic), 0) - val topicMetadata = mockLogManagerAndTestTopic(topicMetadataRequest) - assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size) - assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic) - val partitionMetadata = topicMetadata.head.partitionsMetadata + TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic, 0, 1000) + TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) + var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testBasicTopicMetadata", + 2000,0).topicsMetadata + assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode) + assertEquals(ErrorMapping.NoError, topicsMetadata.head.partitionsMetadata.head.errorCode) + assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size) + assertEquals("Expecting metadata for the test topic", "test", topicsMetadata.head.topic) + var partitionMetadata = topicsMetadata.head.partitionsMetadata assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size) assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId) assertEquals(1, partitionMetadata.head.replicas.size) @@ -82,60 +82,55 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { def testGetAllTopicMetadata { // create topic - val topic = "test" - CreateTopicCommand.createTopic(zkClient, topic, 1) - // set up leader for topic partition 0 - val leaderForPartitionMap = Map( - 0 -> configs.head.brokerId - ) - TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1) - val topicMetadataRequest = new TopicMetadataRequest(List(), 0) - val topicMetadata = mockLogManagerAndTestTopic(topicMetadataRequest) - assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size) - assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic) - val partitionMetadata = topicMetadata.head.partitionsMetadata - assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size) - assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId) - assertEquals(1, partitionMetadata.head.replicas.size) + val topic1 = "testGetAllTopicMetadata1" + val topic2 = "testGetAllTopicMetadata2" + CreateTopicCommand.createTopic(zkClient, topic1, 1) + CreateTopicCommand.createTopic(zkClient, topic2, 1) + + // wait for leader to be elected for both topics + TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic1, 0, 1000) + TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic2, 0, 1000) + + // issue metadata request with empty list of topics + var topicsMetadata = ClientUtils.fetchTopicMetadata(Set.empty, brokers, "TopicMetadataTest-testGetAllTopicMetadata", + 2000, 0).topicsMetadata + assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode) + assertEquals(2, topicsMetadata.size) + assertEquals(ErrorMapping.NoError, topicsMetadata.head.partitionsMetadata.head.errorCode) + assertEquals(ErrorMapping.NoError, topicsMetadata.last.partitionsMetadata.head.errorCode) + val partitionMetadataTopic1 = topicsMetadata.head.partitionsMetadata + val partitionMetadataTopic2 = topicsMetadata.last.partitionsMetadata + assertEquals("Expecting metadata for 1 partition", 1, partitionMetadataTopic1.size) + assertEquals("Expecting partition id to be 0", 0, partitionMetadataTopic1.head.partitionId) + assertEquals(1, partitionMetadataTopic1.head.replicas.size) + assertEquals("Expecting metadata for 1 partition", 1, partitionMetadataTopic2.size) + assertEquals("Expecting partition id to be 0", 0, partitionMetadataTopic2.head.partitionId) + assertEquals(1, partitionMetadataTopic2.head.replicas.size) } def testAutoCreateTopic { // auto create topic - val topic = "test" - - val topicMetadataRequest = new TopicMetadataRequest(List(topic), 0) - val topicMetadata = mockLogManagerAndTestTopic(topicMetadataRequest) - assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size) - assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic) - val partitionMetadata = topicMetadata.head.partitionsMetadata + val topic = "testAutoCreateTopic" + var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testAutoCreateTopic", + 2000,0).topicsMetadata + assertEquals(ErrorMapping.LeaderNotAvailableCode, topicsMetadata.head.errorCode) + assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size) + assertEquals("Expecting metadata for the test topic", topic, topicsMetadata.head.topic) + assertEquals(0, topicsMetadata.head.partitionsMetadata.size) + + // wait for leader to be elected + TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) + TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic, 0, 1000) + + // retry the metadata for the auto created topic + topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testBasicTopicMetadata", + 2000,0).topicsMetadata + assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode) + assertEquals(ErrorMapping.NoError, topicsMetadata.head.partitionsMetadata.head.errorCode) + var partitionMetadata = topicsMetadata.head.partitionsMetadata assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size) assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId) - assertEquals(0, partitionMetadata.head.replicas.size) - assertEquals(None, partitionMetadata.head.leader) - assertEquals(ErrorMapping.LeaderNotAvailableCode, partitionMetadata.head.errorCode) - } - - private def mockLogManagerAndTestTopic(request: TopicMetadataRequest): Seq[TopicMetadata] = { - // topic metadata request only requires 1 call from the replica manager - val replicaManager = EasyMock.createMock(classOf[ReplicaManager]) - EasyMock.expect(replicaManager.config).andReturn(configs.head).anyTimes() - EasyMock.replay(replicaManager) - - - val serializedMetadataRequest = TestUtils.createRequestByteBuffer(request) - - // create the kafka request handler - val requestChannel = new RequestChannel(2, 5) - val apis = new KafkaApis(requestChannel, replicaManager, zkClient, 1) - - // call the API (to be tested) to get metadata - apis.handleTopicMetadataRequest(new RequestChannel.Request - (processor=0, requestKey=RequestKeys.MetadataKey, buffer=serializedMetadataRequest, startTimeMs=1)) - val metadataResponse = requestChannel.receiveResponse(0).responseSend.asInstanceOf[BoundedByteBufferSend].buffer - - // check assertions - val topicMetadata = TopicMetadataResponse.readFrom(metadataResponse).topicsMetadata - - topicMetadata + assertEquals(1, partitionMetadata.head.replicas.size) + assertTrue(partitionMetadata.head.leader.isDefined) } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index 7f7a8d7..458b9ad 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -260,10 +260,11 @@ class AsyncProducerTest extends JUnit3Suite { topicPartitionInfos = topicPartitionInfos) try { handler.partitionAndCollate(producerDataList) - fail("Should fail with UnknownTopicOrPartitionException") } catch { - case e: UnknownTopicOrPartitionException => // expected, do nothing + // should not throw UnknownTopicOrPartitionException to allow resend + case e: UnknownTopicOrPartitionException => fail("Should not throw UnknownTopicOrPartitionException") + } } @@ -291,10 +292,10 @@ class AsyncProducerTest extends JUnit3Suite { topicPartitionInfos = topicPartitionInfos) try { handler.handle(producerDataList) - fail("Should fail with NoBrokersForPartitionException") + fail("Should fail with FailedToSendMessageException") } catch { - case e: NoBrokersForPartitionException => // expected, do nothing + case e: FailedToSendMessageException => // we retry on any exception now } } @@ -418,6 +419,8 @@ class AsyncProducerTest extends JUnit3Suite { val response2 = ProducerResponse(0, Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NoError, 0L)))) val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer]) + // don't care about config mock + EasyMock.expect(mockSyncProducer.config).andReturn(EasyMock.anyObject()).anyTimes() EasyMock.expect(mockSyncProducer.send(request1)).andThrow(new RuntimeException) // simulate SocketTimeoutException EasyMock.expect(mockSyncProducer.send(request2)).andReturn(response1) EasyMock.expect(mockSyncProducer.send(request3)).andReturn(response2) http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/test/scala/unit/kafka/producer/ProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index bc37531..b511d90 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -23,14 +23,17 @@ import kafka.message.Message import kafka.server.{KafkaConfig, KafkaRequestHandler, KafkaServer} import kafka.zk.ZooKeeperTestHarness import org.apache.log4j.{Level, Logger} -import org.junit.Assert._ import org.junit.Test import kafka.utils._ import java.util -import kafka.admin.{AdminUtils, CreateTopicCommand} +import kafka.admin.CreateTopicCommand import util.Properties import kafka.api.FetchRequestBuilder -import kafka.common.{KafkaException, ErrorMapping, FailedToSendMessageException} +import kafka.common.FailedToSendMessageException +import org.junit.Assert.assertTrue +import org.junit.Assert.assertFalse +import org.junit.Assert.assertEquals + class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ @@ -43,6 +46,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ private var consumer1: SimpleConsumer = null private var consumer2: SimpleConsumer = null private val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler]) + private var servers = List.empty[KafkaServer] private val props1 = TestUtils.createBrokerConfig(brokerId1, port1) private val config1 = new KafkaConfig(props1) { @@ -60,6 +64,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ // set up 2 brokers with 4 partitions each server1 = TestUtils.createServer(config1) server2 = TestUtils.createServer(config2) + servers = List(server1,server2) val props = new Properties() props.put("host", "localhost") @@ -68,7 +73,6 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ consumer1 = new SimpleConsumer("localhost", port1, 1000000, 64*1024, "") consumer2 = new SimpleConsumer("localhost", port2, 100, 64*1024, "") - // temporarily set request handler logger to a higher level requestHandlerLogger.setLevel(Level.FATAL) } @@ -87,10 +91,11 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ def testUpdateBrokerPartitionInfo() { + val topic = "new-topic" CreateTopicCommand.createTopic(zkClient, "new-topic", 1, 2) - assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() => - AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime)) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500) + // wait until the update metadata request for new topic reaches all servers + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 500) + TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500) val props1 = new util.Properties() props1.put("metadata.broker.list", "localhost:80,localhost:81") @@ -98,10 +103,10 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ val producerConfig1 = new ProducerConfig(props1) val producer1 = new Producer[String, String](producerConfig1) try{ - producer1.send(new KeyedMessage[String, String]("new-topic", "test", "test1")) + producer1.send(new KeyedMessage[String, String](topic, "test", "test1")) fail("Test should fail because the broker list provided are not valid") } catch { - case e: KafkaException => + case e: FailedToSendMessageException => case oe => fail("fails with exception", oe) } finally { producer1.close() @@ -113,7 +118,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ val producerConfig2= new ProducerConfig(props2) val producer2 = new Producer[String, String](producerConfig2) try{ - producer2.send(new KeyedMessage[String, String]("new-topic", "test", "test1")) + producer2.send(new KeyedMessage[String, String](topic, "test", "test1")) } catch { case e => fail("Should succeed sending the message", e) } finally { @@ -126,7 +131,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ val producerConfig3 = new ProducerConfig(props3) val producer3 = new Producer[String, String](producerConfig3) try{ - producer3.send(new KeyedMessage[String, String]("new-topic", "test", "test1")) + producer3.send(new KeyedMessage[String, String](topic, "test", "test1")) } catch { case e => fail("Should succeed sending the message", e) } finally { @@ -151,27 +156,27 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ val producerConfig1 = new ProducerConfig(props1) val producerConfig2 = new ProducerConfig(props2) + val topic = "new-topic" // create topic with 1 partition and await leadership - CreateTopicCommand.createTopic(zkClient, "new-topic", 1, 2) - assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() => - AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime)) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500) + CreateTopicCommand.createTopic(zkClient, topic, 1, 2) + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000) + TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500) val producer1 = new Producer[String, String](producerConfig1) val producer2 = new Producer[String, String](producerConfig2) // Available partition ids should be 0. - producer1.send(new KeyedMessage[String, String]("new-topic", "test", "test1")) - producer1.send(new KeyedMessage[String, String]("new-topic", "test", "test2")) + producer1.send(new KeyedMessage[String, String](topic, "test", "test1")) + producer1.send(new KeyedMessage[String, String](topic, "test", "test2")) // get the leader - val leaderOpt = ZkUtils.getLeaderForPartition(zkClient, "new-topic", 0) + val leaderOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0) assertTrue("Leader for topic new-topic partition 0 should exist", leaderOpt.isDefined) val leader = leaderOpt.get val messageSet = if(leader == server1.config.brokerId) { - val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build()) + val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build()) response1.messageSet("new-topic", 0).iterator.toBuffer }else { - val response2 = consumer2.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build()) + val response2 = consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build()) response2.messageSet("new-topic", 0).iterator.toBuffer } assertEquals("Should have fetched 2 messages", 2, messageSet.size) @@ -180,7 +185,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ producer1.close() try { - producer2.send(new KeyedMessage[String, String]("new-topic", "test", "test2")) + producer2.send(new KeyedMessage[String, String](topic, "test", "test2")) fail("Should have timed out for 3 acks.") } catch { @@ -202,21 +207,22 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ props.put("request.required.acks", "1") props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) + val topic = "new-topic" // create topic - CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0,0,0,0") - assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() => - AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime)) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 1, 500) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 2, 500) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 3, 500) + CreateTopicCommand.createTopic(zkClient, topic, 4, 2, "0,0,0,0") + // waiting for 1 partition is enough + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000) + TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500) + TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500) + TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 2, 500) + TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 3, 500) val config = new ProducerConfig(props) val producer = new Producer[String, String](config) try { // Available partition ids should be 0, 1, 2 and 3, all lead and hosted only // on broker 0 - producer.send(new KeyedMessage[String, String]("new-topic", "test", "test1")) + producer.send(new KeyedMessage[String, String](topic, "test", "test1")) } catch { case e => fail("Unexpected exception: " + e) } @@ -227,7 +233,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ try { // These sends should fail since there are no available brokers - producer.send(new KeyedMessage[String, String]("new-topic", "test", "test1")) + producer.send(new KeyedMessage[String, String](topic, "test", "test1")) fail("Should fail since no leader exists for the partition.") } catch { case e => // success @@ -235,12 +241,12 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ // restart server 1 server1.startup() - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500) + TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500) try { // cross check if broker 1 got the messages - val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build()) - val messageSet1 = response1.messageSet("new-topic", 0).iterator + val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build()) + val messageSet1 = response1.messageSet(topic, 0).iterator assertTrue("Message set should have 1 message", messageSet1.hasNext) assertEquals(new Message(bytes = "test1".getBytes, key = "test".getBytes), messageSet1.next.message) assertFalse("Message set should have another message", messageSet1.hasNext) @@ -259,22 +265,22 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ props.put("request.timeout.ms", String.valueOf(timeoutMs)) props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) props.put("request.required.acks", "1") - + props.put("client.id","ProducerTest-testAsyncSendCanCorrectlyFailWithTimeout") val config = new ProducerConfig(props) val producer = new Producer[String, String](config) + val topic = "new-topic" // create topics in ZK - CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0:1,0:1,0:1,0:1") - assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() => - AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime)) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500) + CreateTopicCommand.createTopic(zkClient, topic, 4, 2, "0:1,0:1,0:1,0:1") + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000) + TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500) // do a simple test to make sure plumbing is okay try { // this message should be assigned to partition 0 whose leader is on broker 0 - producer.send(new KeyedMessage[String, String]("new-topic", "test", "test")) + producer.send(new KeyedMessage[String, String](topic, "test", "test")) // cross check if brokers got the messages - val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build()) + val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build()) val messageSet1 = response1.messageSet("new-topic", 0).iterator assertTrue("Message set should have 1 message", messageSet1.hasNext) assertEquals(new Message("test".getBytes), messageSet1.next.message) @@ -290,7 +296,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ try { // this message should be assigned to partition 0 whose leader is on broker 0, but // broker 0 will not response within timeoutMs millis. - producer.send(new KeyedMessage[String, String]("new-topic", "test", "test")) + producer.send(new KeyedMessage[String, String](topic, "test", "test")) } catch { case e: FailedToSendMessageException => /* success */ case e: Exception => fail("Not expected", e) http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index 8f88177..c4328f0 100644 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -132,7 +132,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { val leaderAndIsr = new collection.mutable.HashMap[(String, Int), LeaderIsrAndControllerEpoch] leaderAndIsr.put((topic, partitionId), new LeaderIsrAndControllerEpoch(new LeaderAndIsr(brokerId2, List(brokerId1, brokerId2)), 2)) - val partitionStateInfo = leaderAndIsr.mapValues(l => new PartitionStateInfo(l, 1)).toMap + val partitionStateInfo = leaderAndIsr.mapValues(l => new PartitionStateInfo(l, Set(0,1))).toMap val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfo, brokers.toSet, controllerId, staleControllerEpoch, 0, "") http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index 3728f8c..947e795 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -50,6 +50,8 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { // create topic CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0") + TestUtils.waitUntilMetadataIsPropagated(Seq(server), topic, 0, 1000) + // send some messages producer.send(sent1.map(m => new KeyedMessage[Int, String](topic, 0, m)):_*) @@ -65,11 +67,12 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { server = new KafkaServer(config) server.startup() + // wait for the broker to receive the update metadata request after startup + TestUtils.waitUntilMetadataIsPropagated(Seq(server), topic, 0, 1000) + producer = new Producer[Int, String](new ProducerConfig(producerConfig)) val consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "") - waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) - var fetchedMessage: ByteBufferMessageSet = null while(fetchedMessage == null || fetchedMessage.validBytes == 0) { val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).maxWait(0).build()) http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index 1557047..c7dd8a7 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -20,7 +20,7 @@ import kafka.cluster.{Partition, Replica} import kafka.log.Log import kafka.message.{ByteBufferMessageSet, Message} import kafka.network.{BoundedByteBufferSend, RequestChannel} -import kafka.utils.{Time, TestUtils, MockTime} +import kafka.utils.{ZkUtils, Time, TestUtils, MockTime} import org.easymock.EasyMock import org.I0Itec.zkclient.ZkClient import org.scalatest.junit.JUnit3Suite @@ -57,7 +57,9 @@ class SimpleFetchTest extends JUnit3Suite { val fetchSize = 100 val messages = new Message("test-message".getBytes()) - val zkClient = EasyMock.createMock(classOf[ZkClient]) + // create nice mock since we don't particularly care about zkclient calls + val zkClient = EasyMock.createNiceMock(classOf[ZkClient]) + EasyMock.expect(zkClient.exists(ZkUtils.ControllerEpochPath)).andReturn(false) EasyMock.replay(zkClient) val log = EasyMock.createMock(classOf[kafka.log.Log]) @@ -151,7 +153,8 @@ class SimpleFetchTest extends JUnit3Suite { val followerReplicaId = configs(1).brokerId val followerLEO = 15 - val zkClient = EasyMock.createMock(classOf[ZkClient]) + val zkClient = EasyMock.createNiceMock(classOf[ZkClient]) + EasyMock.expect(zkClient.exists(ZkUtils.ControllerEpochPath)).andReturn(false) EasyMock.replay(zkClient) val log = EasyMock.createMock(classOf[kafka.log.Log]) http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index f9c9e64..3cb1d4a 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -37,6 +37,7 @@ import kafka.api._ import collection.mutable.Map import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder} import kafka.common.TopicAndPartition +import junit.framework.Assert /** @@ -499,6 +500,12 @@ object TestUtils extends Logging { byteBuffer.rewind() byteBuffer } + + def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int, timeout: Long) = { + Assert.assertTrue("Partition [%s,%d] metadata not propagated after timeout".format(topic, partition), + TestUtils.waitUntilTrue(() => + servers.foldLeft(true)(_ && _.apis.leaderCache.keySet.contains(TopicAndPartition(topic, partition))), timeout)) + } }