Updated Branches:
  refs/heads/trunk 2cda5d1fc -> db37ed005

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index b4a57c6..b0348bb 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -38,7 +38,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime) extends Logg
   var logManager: LogManager = null
   var kafkaZookeeper: KafkaZooKeeper = null
   var replicaManager: ReplicaManager = null
-  private var apis: KafkaApis = null
+  var apis: KafkaApis = null
   var kafkaController: KafkaController = null
   val kafkaScheduler = new KafkaScheduler(4)
   var zkClient: ZkClient = null
@@ -112,6 +112,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime) extends Logg
     info("shutting down")
     val canShutdown = isShuttingDown.compareAndSet(false, true);
     if (canShutdown) {
+      if(kafkaZookeeper != null)
+        Utils.swallow(kafkaZookeeper.shutdown())
       if(socketServer != null)
         Utils.swallow(socketServer.shutdown())
       if(requestHandlerPool != null)
@@ -119,8 +121,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime) extends Logg
       Utils.swallow(kafkaScheduler.shutdown())
       if(apis != null)
         Utils.swallow(apis.close())
-      if(kafkaZookeeper != null)
-        Utils.swallow(kafkaZookeeper.shutdown())
       if(replicaManager != null)
         Utils.swallow(replicaManager.shutdown())
       if(logManager != null)

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 03f621a..a84da13 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -19,8 +19,8 @@ package kafka.server
 
 import kafka.cluster.Broker
 import kafka.message.ByteBufferMessageSet
-import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest, 
FetchResponsePartitionData}
-import kafka.common.{KafkaStorageException, TopicAndPartition, ErrorMapping}
+import kafka.api.{OffsetRequest, FetchResponsePartitionData}
+import kafka.common.{KafkaStorageException, TopicAndPartition}
 
 class ReplicaFetcherThread(name:String,
                            sourceBroker: Broker,

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 89ad4d7..8e49b83 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -52,7 +52,7 @@ class ReplicaManager(val config: KafkaConfig,
   val highWatermarkCheckpoints = config.logDirs.map(dir => (dir, new 
HighwaterMarkCheckpoint(dir))).toMap
   private var hwThreadInitialized = false
   this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: "
-  private val stateChangeLogger = 
Logger.getLogger(KafkaController.stateChangeLogger)
+  val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
 
   newGauge(
     "LeaderCount",

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala 
b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index b0a0e09..95e7218 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -160,8 +160,7 @@ class AdminTest extends JUnit3Suite with 
ZooKeeperTestHarness with Logging {
     AdminUtils.createTopicPartitionAssignmentPathInZK(topic, 
expectedReplicaAssignment, zkClient)
     // create leaders for all partitions
     TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
-    val actualReplicaAssignment = AdminUtils.fetchTopicMetadataFromZk(topic, 
zkClient).partitionsMetadata.map(p => p.replicas)
-    val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => 
b.id).toList).toList
+    val actualReplicaList = leaderForPartitionMap.keys.toArray.map(p => (p -> 
ZkUtils.getReplicasForPartition(zkClient, topic, p))).toMap
     assertEquals(expectedReplicaAssignment.size, actualReplicaList.size)
     for(i <- 0 until actualReplicaList.size)
       assertEquals(expectedReplicaAssignment.get(i).get, actualReplicaList(i))
@@ -176,39 +175,6 @@ class AdminTest extends JUnit3Suite with 
ZooKeeperTestHarness with Logging {
   }
 
   @Test
-  def testGetTopicMetadata() {
-    val expectedReplicaAssignment = Map(
-      0 -> List(0, 1, 2),
-      1 -> List(1, 2, 3)
-    )
-    val leaderForPartitionMap = Map(
-      0 -> 0,
-      1 -> 1
-    )
-    val topic = "auto-topic"
-    TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3))
-    AdminUtils.createTopicPartitionAssignmentPathInZK(topic, 
expectedReplicaAssignment, zkClient)
-    // create leaders for all partitions
-    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
-
-    val newTopicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
-    newTopicMetadata.errorCode match {
-      case ErrorMapping.UnknownTopicOrPartitionCode =>
-        fail("Topic " + topic + " should've been automatically created")
-      case _ =>
-        assertEquals(topic, newTopicMetadata.topic)
-        assertNotNull("partition metadata list cannot be null", 
newTopicMetadata.partitionsMetadata)
-        assertEquals("partition metadata list length should be 2", 2, 
newTopicMetadata.partitionsMetadata.size)
-        val actualReplicaAssignment = 
newTopicMetadata.partitionsMetadata.map(p => p.replicas)
-        val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => 
b.id).toList).toList
-        assertEquals(expectedReplicaAssignment.size, actualReplicaList.size)
-        for(i <- 0 until actualReplicaList.size) {
-          assertEquals(expectedReplicaAssignment(i), actualReplicaList(i))
-        }
-    }
-  }
-
-  @Test
   def testPartitionReassignmentWithLeaderInNewReplicas() {
     val expectedReplicaAssignment = Map(0  -> List(0, 1, 2))
     val topic = "test"
@@ -278,7 +244,7 @@ class AdminTest extends JUnit3Suite with 
ZooKeeperTestHarness with Logging {
       val partitionsBeingReassigned = 
ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas);
       CheckReassignmentStatus.checkIfPartitionReassignmentSucceeded(zkClient, 
topicAndPartition, newReplicas,
         Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == 
ReassignmentCompleted;
-    }, 1000)
+    }, 2000)
     val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, 
partitionToBeReassigned)
     assertEquals("Partition should have been reassigned to 2, 3", newReplicas, 
assignedReplicas)
     // leader should be 2
@@ -360,49 +326,44 @@ class AdminTest extends JUnit3Suite with 
ZooKeeperTestHarness with Logging {
 
   @Test
   def testShutdownBroker() {
-    info("inside testShutdownBroker")
     val expectedReplicaAssignment = Map(1  -> List(0, 1, 2))
     val topic = "test"
     val partition = 1
     // create brokers
     val serverConfigs = TestUtils.createBrokerConfigs(3).map(new 
KafkaConfig(_))
+    val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
     // create the topic
     AdminUtils.createTopicPartitionAssignmentPathInZK(topic, 
expectedReplicaAssignment, zkClient)
-    val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
 
-    // broker 2 should be the leader since it was started first
-    var leaderBeforeShutdown = 
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, 1000, 
None).get
-    var controllerId = ZkUtils.getController(zkClient)
-    var controller = servers.find(p => p.config.brokerId == 
controllerId).get.kafkaController
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic, partition, 1000)
+
+    val controllerId = ZkUtils.getController(zkClient)
+    val controller = servers.find(p => p.config.brokerId == 
controllerId).get.kafkaController
     var partitionsRemaining = controller.shutdownBroker(2)
+    var activeServers = servers.filter(s => s.config.brokerId != 2)
     try {
+      // wait for the update metadata request to trickle to the brokers
+      assertTrue("Topic test not created after timeout", 
TestUtils.waitUntilTrue(() =>
+        activeServers.foldLeft(true)(_ && 
_.apis.leaderCache(TopicAndPartition(topic, 
partition)).leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3), 1000))
       assertEquals(0, partitionsRemaining)
-      var topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
-      var leaderAfterShutdown = 
topicMetadata.partitionsMetadata.head.leader.get.id
-      assertTrue(leaderAfterShutdown != leaderBeforeShutdown)
-      // assertEquals(2, topicMetadata.partitionsMetadata.head.isr.size)
-      assertEquals(2, 
controller.controllerContext.partitionLeadershipInfo(TopicAndPartition("test", 
1)).leaderAndIsr.isr.size)
+      var partitionStateInfo = 
activeServers.head.apis.leaderCache(TopicAndPartition(topic, partition))
+      var leaderAfterShutdown = 
partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
+      assertEquals(0, leaderAfterShutdown)
+      assertEquals(2, 
partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size)
+      assertEquals(List(0,1), 
partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.isr)
 
-      leaderBeforeShutdown = leaderAfterShutdown
-      controllerId = ZkUtils.getController(zkClient)
-      controller = servers.find(p => p.config.brokerId == 
controllerId).get.kafkaController
       partitionsRemaining = controller.shutdownBroker(1)
       assertEquals(0, partitionsRemaining)
-      topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
-      leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
-      assertTrue(leaderAfterShutdown != leaderBeforeShutdown)
-      // assertEquals(1, topicMetadata.partitionsMetadata.head.isr.size)
-      assertEquals(1, 
controller.controllerContext.partitionLeadershipInfo(TopicAndPartition("test", 
1)).leaderAndIsr.isr.size)
+      activeServers = servers.filter(s => s.config.brokerId == 0)
+      partitionStateInfo = 
activeServers.head.apis.leaderCache(TopicAndPartition(topic, partition))
+      leaderAfterShutdown = 
partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
+      assertEquals(0, leaderAfterShutdown)
 
-      leaderBeforeShutdown = leaderAfterShutdown
-      controllerId = ZkUtils.getController(zkClient)
-      controller = servers.find(p => p.config.brokerId == 
controllerId).get.kafkaController
+      assertTrue(servers.foldLeft(true)(_ && 
_.apis.leaderCache(TopicAndPartition(topic, 
partition)).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
       partitionsRemaining = controller.shutdownBroker(0)
       assertEquals(1, partitionsRemaining)
-      topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
-      leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
-      assertTrue(leaderAfterShutdown == leaderBeforeShutdown)
-      assertEquals(1, 
controller.controllerContext.partitionLeadershipInfo(TopicAndPartition("test", 
1)).leaderAndIsr.isr.size)
+      // leader doesn't change since all the replicas are shut down
+      assertTrue(servers.foldLeft(true)(_ && 
_.apis.leaderCache(TopicAndPartition(topic, 
partition)).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
     }
     finally {
       servers.foreach(_.shutdown())

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index 0f15718..f43ac8f 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -87,8 +87,8 @@ object SerializationTestUtils{
   def createTestLeaderAndIsrRequest() : LeaderAndIsrRequest = {
     val leaderAndIsr1 = new LeaderIsrAndControllerEpoch(new 
LeaderAndIsr(leader1, 1, isr1, 1), 1)
     val leaderAndIsr2 = new LeaderIsrAndControllerEpoch(new 
LeaderAndIsr(leader2, 1, isr2, 2), 1)
-    val map = Map(((topic1, 0), PartitionStateInfo(leaderAndIsr1, 3)),
-                  ((topic2, 0), PartitionStateInfo(leaderAndIsr2, 3)))
+    val map = Map(((topic1, 0), PartitionStateInfo(leaderAndIsr1, isr1.toSet)),
+                  ((topic2, 0), PartitionStateInfo(leaderAndIsr2, isr2.toSet)))
     new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[Broker](), 0, 
1, 0, "")
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 
b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index 4d989e4..fcfc583 100644
--- 
a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -23,7 +23,6 @@ import kafka.integration.KafkaServerTestHarness
 import kafka.server._
 import scala.collection._
 import org.scalatest.junit.JUnit3Suite
-import org.apache.log4j.{Level, Logger}
 import kafka.message._
 import kafka.serializer._
 import kafka.admin.CreateTopicCommand
@@ -31,6 +30,7 @@ import org.I0Itec.zkclient.ZkClient
 import kafka.utils._
 import kafka.producer.{ProducerConfig, KeyedMessage, Producer}
 import java.util.{Collections, Properties}
+import org.apache.log4j.{Logger, Level}
 import kafka.utils.TestUtils._
 
 class ZookeeperConsumerConnectorTest extends JUnit3Suite with 
KafkaServerTestHarness with Logging {
@@ -97,6 +97,9 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with 
KafkaServerTestHar
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500)
 
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000)
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1, 1000)
+
     // create a consumer
     val consumerConfig1 = new 
ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, 
true)
@@ -142,7 +145,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite 
with KafkaServerTestHar
     val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, 
true)
     val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new 
mutable.HashMap[String, Int]())
     // send some messages to each broker
-    val sentMessages3 = sendMessagesToBrokerPartition(configs.head, topic, 0, 
nMessages) ++ 
+    val sentMessages3 = sendMessagesToBrokerPartition(configs.head, topic, 0, 
nMessages) ++
                         sendMessagesToBrokerPartition(configs.last, topic, 1, 
nMessages)
 
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
@@ -167,12 +170,15 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite 
with KafkaServerTestHar
     requestHandlerLogger.setLevel(Level.FATAL)
 
     // send some messages to each broker
-    val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, 
nMessages, GZIPCompressionCodec) ++ 
+    val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, 
nMessages, GZIPCompressionCodec) ++
                         sendMessagesToBrokerPartition(configs.last, topic, 1, 
nMessages, GZIPCompressionCodec)
 
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500)
 
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000)
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1, 1000)
+
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(
       TestUtils.createConsumerProperties(zkConnect, group, consumer1))
@@ -240,9 +246,12 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite 
with KafkaServerTestHar
 
   def testCompressionSetConsumption() {
     // send some messages to each broker
-    val sentMessages = sendMessagesToBrokerPartition(configs.head, topic, 0, 
200, DefaultCompressionCodec) ++ 
+    val sentMessages = sendMessagesToBrokerPartition(configs.head, topic, 0, 
200, DefaultCompressionCodec) ++
                        sendMessagesToBrokerPartition(configs.last, topic, 1, 
200, DefaultCompressionCodec)
 
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000)
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1, 1000)
+
     val consumerConfig1 = new 
ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0))
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, 
true)
     val topicMessageStreams1 = 
zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), 
new StringDecoder())
@@ -263,9 +272,12 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite 
with KafkaServerTestHar
     requestHandlerLogger.setLevel(Level.FATAL)
 
     // send some messages to each broker
-    val sentMessages = sendMessagesToBrokerPartition(configs.head, topic, 0, 
nMessages, NoCompressionCodec) ++ 
+    val sentMessages = sendMessagesToBrokerPartition(configs.head, topic, 0, 
nMessages, NoCompressionCodec) ++
                        sendMessagesToBrokerPartition(configs.last, topic, 1, 
nMessages, NoCompressionCodec)
 
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000)
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1, 1000)
+
     val consumerConfig = new 
ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
 
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
@@ -303,6 +315,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite 
with KafkaServerTestHar
     // send some messages to each broker
     val sentMessages1 = sendMessages(configs.head, nMessages, "batch1", 
NoCompressionCodec, 1)
 
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000)
+
     // create a consumer
     val consumerConfig1 = new 
ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, 
true)
@@ -321,12 +335,14 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite 
with KafkaServerTestHar
 
     val receivedMessages1 = getMessages(nMessages, topicMessageStreams1)
     assertEquals(sentMessages1, receivedMessages1)
+    zkConsumerConnector1.shutdown()
+    zkClient.close()
   }
 
-  def sendMessagesToBrokerPartition(config: KafkaConfig, 
-                                    topic: String, 
-                                    partition: Int, 
-                                    numMessages: Int, 
+  def sendMessagesToBrokerPartition(config: KafkaConfig,
+                                    topic: String,
+                                    partition: Int,
+                                    numMessages: Int,
                                     compression: CompressionCodec = 
NoCompressionCodec): List[String] = {
     val header = "test-%d-%d".format(config.brokerId, partition)
     val props = new Properties()

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala 
b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
index 4c646f0..2317760 100644
--- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
@@ -57,7 +57,7 @@ class AutoOffsetResetTest extends JUnit3Suite with 
KafkaServerTestHarness with L
   
   def testResetToEarliestWhenOffsetTooLow() =
     assertEquals(NumMessages, resetAndConsume(NumMessages, "smallest", 
SmallOffset))
-    
+
   def testResetToLatestWhenOffsetTooHigh() =
     assertEquals(0, resetAndConsume(NumMessages, "largest", LargeOffset))
 
@@ -69,12 +69,16 @@ class AutoOffsetResetTest extends JUnit3Suite with 
KafkaServerTestHarness with L
    * Returns the count of messages received.
    */
   def resetAndConsume(numMessages: Int, resetTo: String, offset: Long): Int = {
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
+
     val producer: Producer[String, Array[Byte]] = 
TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs), 
         new DefaultEncoder(), new StringEncoder())
 
     for(i <- 0 until numMessages)
       producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, 
"test".getBytes))
 
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000)
+
     // update offset in zookeeper for consumer to jump "forward" in time
     val dirs = new ZKGroupTopicDirs(group, topic)
     var consumerProps = TestUtils.createConsumerProperties(zkConnect, group, 
testConsumer)
@@ -99,8 +103,10 @@ class AutoOffsetResetTest extends JUnit3Suite with 
KafkaServerTestHarness with L
     } catch {
       case e: ConsumerTimeoutException => 
         info("consumer timed out after receiving " + received + " messages.")
+    } finally {
+      producer.close()
+      consumerConnector.shutdown
     }
-    consumerConnector.shutdown
     received
   }
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala 
b/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
index c4866eb..c3c7631 100644
--- a/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
@@ -21,12 +21,12 @@ import kafka.api.FetchRequestBuilder
 import kafka.message.ByteBufferMessageSet
 import kafka.server.{KafkaRequestHandler, KafkaConfig}
 import org.apache.log4j.{Level, Logger}
-import org.junit.Assert._
 import org.scalatest.junit.JUnit3Suite
 import scala.collection._
-import kafka.producer.KeyedMessage
 import kafka.utils._
 import kafka.common.{ErrorMapping, KafkaException, OffsetOutOfRangeException}
+import kafka.producer.KeyedMessage
+import org.junit.Assert.assertEquals
 
 /**
  * End to end tests of the primitive apis against a local server
@@ -63,6 +63,8 @@ class LazyInitProducerTest extends JUnit3Suite with 
ProducerConsumerTestHarness
 
     producer.send(producerData:_*)
 
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000)
+
     var fetchedMessage: ByteBufferMessageSet = null
     while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
       val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 
0, 0, 10000).build())
@@ -90,6 +92,7 @@ class LazyInitProducerTest extends JUnit3Suite with 
ProducerConsumerTestHarness
         val producedData = List("a_" + topic, "b_" + topic)
         messages += topic -> producedData
         producer.send(producedData.map(m => new KeyedMessage[String, 
String](topic, topic, m)):_*)
+        TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000)
         builder.addFetch(topic, offset, 0, 10000)
       }
 
@@ -132,6 +135,7 @@ class LazyInitProducerTest extends JUnit3Suite with 
ProducerConsumerTestHarness
       builder.addFetch(topic, 0, 0, 10000)
     }
     producer.send(produceList: _*)
+    topics.foreach(topic => TestUtils.waitUntilMetadataIsPropagated(servers, 
topic, 0, 1000))
 
     // wait a bit for produced message to be available
     val request = builder.build()
@@ -155,6 +159,7 @@ class LazyInitProducerTest extends JUnit3Suite with 
ProducerConsumerTestHarness
       builder.addFetch(topic, 0, 0, 10000)
     }
     producer.send(produceList: _*)
+    topics.foreach(topic => TestUtils.waitUntilMetadataIsPropagated(servers, 
topic, 0, 1000))
 
     producer.send(produceList: _*)
     // wait a bit for produced message to be available

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 
b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index 2fc08d3..f764151 100644
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -27,7 +27,7 @@ import org.I0Itec.zkclient.ZkClient
 import kafka.zk.ZooKeeperTestHarness
 import org.scalatest.junit.JUnit3Suite
 import scala.collection._
-import kafka.admin.{AdminUtils, CreateTopicCommand}
+import kafka.admin.CreateTopicCommand
 import kafka.common.{TopicAndPartition, ErrorMapping, 
UnknownTopicOrPartitionException, OffsetOutOfRangeException}
 import kafka.utils.{TestUtils, Utils}
 
@@ -35,7 +35,7 @@ import kafka.utils.{TestUtils, Utils}
  * End to end tests of the primitive apis against a local server
  */
 class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness 
with ZooKeeperTestHarness {
-  
+
   val port = TestUtils.choosePort
   val props = TestUtils.createBrokerConfig(0, port)
   val config = new KafkaConfig(props)
@@ -300,8 +300,7 @@ class PrimitiveApiTest extends JUnit3Suite with 
ProducerConsumerTestHarness with
   def testConsumerEmptyTopic() {
     val newTopic = "new-topic"
     CreateTopicCommand.createTopic(zkClient, newTopic, 1, 1, 
config.brokerId.toString)
-    assertTrue("Topic new-topic not created after timeout", 
TestUtils.waitUntilTrue(() =>
-      AdminUtils.fetchTopicMetadataFromZk(newTopic, zkClient).errorCode != 
ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
+    TestUtils.waitUntilMetadataIsPropagated(servers, newTopic, 0, 1000)
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, newTopic, 0, 500)
     val fetchResponse = consumer.fetch(new 
FetchRequestBuilder().addFetch(newTopic, 0, 0, 10000).build())
     assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext)

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala 
b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index 6db63ba..edf8555 100644
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -22,26 +22,27 @@ import kafka.zk.ZooKeeperTestHarness
 import kafka.admin.CreateTopicCommand
 import java.nio.ByteBuffer
 import junit.framework.Assert._
-import org.easymock.EasyMock
-import kafka.network._
 import kafka.cluster.Broker
 import kafka.utils.TestUtils
 import kafka.utils.TestUtils._
-import kafka.server.{ReplicaManager, KafkaApis, KafkaConfig}
+import kafka.server.{KafkaServer, KafkaConfig}
+import kafka.api.TopicMetadataRequest
 import kafka.common.ErrorMapping
-import kafka.api.{RequestKeys, TopicMetadata, TopicMetadataResponse, 
TopicMetadataRequest}
+import kafka.client.ClientUtils
 
 class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
   val props = createBrokerConfigs(1)
   val configs = props.map(p => new KafkaConfig(p))
-  var brokers: Seq[Broker] = null
+  private var server1: KafkaServer = null
+  val brokers = configs.map(c => new Broker(c.brokerId,c.hostName,c.port))
 
   override def setUp() {
     super.setUp()
-    brokers = TestUtils.createBrokersInZk(zkClient, configs.map(config => 
config.brokerId))
+    server1 = TestUtils.createServer(configs.head)
   }
 
   override def tearDown() {
+    server1.shutdown()
     super.tearDown()
   }
 
@@ -65,16 +66,15 @@ class TopicMetadataTest extends JUnit3Suite with 
ZooKeeperTestHarness {
     // create topic
     val topic = "test"
     CreateTopicCommand.createTopic(zkClient, topic, 1)
-    // set up leader for topic partition 0
-    val leaderForPartitionMap = Map(
-      0 -> configs.head.brokerId
-    )
-    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
-    val topicMetadataRequest = new TopicMetadataRequest(List(topic), 0)
-    val topicMetadata = mockLogManagerAndTestTopic(topicMetadataRequest)
-    assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
-    assertEquals("Expecting metadata for the test topic", "test", 
topicMetadata.head.topic)
-    val partitionMetadata = topicMetadata.head.partitionsMetadata
+    TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic, 0, 1000)
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
+    var topicsMetadata = 
ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testBasicTopicMetadata",
+      2000,0).topicsMetadata
+    assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode)
+    assertEquals(ErrorMapping.NoError, 
topicsMetadata.head.partitionsMetadata.head.errorCode)
+    assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size)
+    assertEquals("Expecting metadata for the test topic", "test", 
topicsMetadata.head.topic)
+    var partitionMetadata = topicsMetadata.head.partitionsMetadata
     assertEquals("Expecting metadata for 1 partition", 1, 
partitionMetadata.size)
     assertEquals("Expecting partition id to be 0", 0, 
partitionMetadata.head.partitionId)
     assertEquals(1, partitionMetadata.head.replicas.size)
@@ -82,60 +82,55 @@ class TopicMetadataTest extends JUnit3Suite with 
ZooKeeperTestHarness {
 
   def testGetAllTopicMetadata {
     // create topic
-    val topic = "test"
-    CreateTopicCommand.createTopic(zkClient, topic, 1)
-    // set up leader for topic partition 0
-    val leaderForPartitionMap = Map(
-      0 -> configs.head.brokerId
-    )
-    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
-    val topicMetadataRequest = new TopicMetadataRequest(List(), 0)
-    val topicMetadata = mockLogManagerAndTestTopic(topicMetadataRequest)
-    assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
-    assertEquals("Expecting metadata for the test topic", "test", 
topicMetadata.head.topic)
-    val partitionMetadata = topicMetadata.head.partitionsMetadata
-    assertEquals("Expecting metadata for 1 partition", 1, 
partitionMetadata.size)
-    assertEquals("Expecting partition id to be 0", 0, 
partitionMetadata.head.partitionId)
-    assertEquals(1, partitionMetadata.head.replicas.size)
+    val topic1 = "testGetAllTopicMetadata1"
+    val topic2 = "testGetAllTopicMetadata2"
+    CreateTopicCommand.createTopic(zkClient, topic1, 1)
+    CreateTopicCommand.createTopic(zkClient, topic2, 1)
+
+    // wait for leader to be elected for both topics
+    TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic1, 0, 1000)
+    TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic2, 0, 1000)
+
+    // issue metadata request with empty list of topics
+    var topicsMetadata = ClientUtils.fetchTopicMetadata(Set.empty, brokers, 
"TopicMetadataTest-testGetAllTopicMetadata",
+      2000, 0).topicsMetadata
+    assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode)
+    assertEquals(2, topicsMetadata.size)
+    assertEquals(ErrorMapping.NoError, 
topicsMetadata.head.partitionsMetadata.head.errorCode)
+    assertEquals(ErrorMapping.NoError, 
topicsMetadata.last.partitionsMetadata.head.errorCode)
+    val partitionMetadataTopic1 = topicsMetadata.head.partitionsMetadata
+    val partitionMetadataTopic2 = topicsMetadata.last.partitionsMetadata
+    assertEquals("Expecting metadata for 1 partition", 1, 
partitionMetadataTopic1.size)
+    assertEquals("Expecting partition id to be 0", 0, 
partitionMetadataTopic1.head.partitionId)
+    assertEquals(1, partitionMetadataTopic1.head.replicas.size)
+    assertEquals("Expecting metadata for 1 partition", 1, 
partitionMetadataTopic2.size)
+    assertEquals("Expecting partition id to be 0", 0, 
partitionMetadataTopic2.head.partitionId)
+    assertEquals(1, partitionMetadataTopic2.head.replicas.size)
   }
 
   def testAutoCreateTopic {
     // auto create topic
-    val topic = "test"
-
-    val topicMetadataRequest = new TopicMetadataRequest(List(topic), 0)
-    val topicMetadata = mockLogManagerAndTestTopic(topicMetadataRequest)
-    assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
-    assertEquals("Expecting metadata for the test topic", "test", 
topicMetadata.head.topic)
-    val partitionMetadata = topicMetadata.head.partitionsMetadata
+    val topic = "testAutoCreateTopic"
+    var topicsMetadata = 
ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testAutoCreateTopic",
+      2000,0).topicsMetadata
+    assertEquals(ErrorMapping.LeaderNotAvailableCode, 
topicsMetadata.head.errorCode)
+    assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size)
+    assertEquals("Expecting metadata for the test topic", topic, 
topicsMetadata.head.topic)
+    assertEquals(0, topicsMetadata.head.partitionsMetadata.size)
+
+    // wait for leader to be elected
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
+    TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic, 0, 1000)
+
+    // retry the metadata for the auto created topic
+    topicsMetadata = 
ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testBasicTopicMetadata",
+      2000,0).topicsMetadata
+    assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode)
+    assertEquals(ErrorMapping.NoError, 
topicsMetadata.head.partitionsMetadata.head.errorCode)
+    var partitionMetadata = topicsMetadata.head.partitionsMetadata
     assertEquals("Expecting metadata for 1 partition", 1, 
partitionMetadata.size)
     assertEquals("Expecting partition id to be 0", 0, 
partitionMetadata.head.partitionId)
-    assertEquals(0, partitionMetadata.head.replicas.size)
-    assertEquals(None, partitionMetadata.head.leader)
-    assertEquals(ErrorMapping.LeaderNotAvailableCode, 
partitionMetadata.head.errorCode)
-  }
-
-  private def mockLogManagerAndTestTopic(request: TopicMetadataRequest): 
Seq[TopicMetadata] = {
-    // topic metadata request only requires 1 call from the replica manager
-    val replicaManager = EasyMock.createMock(classOf[ReplicaManager])
-    EasyMock.expect(replicaManager.config).andReturn(configs.head).anyTimes()
-    EasyMock.replay(replicaManager)
-
-
-    val serializedMetadataRequest = TestUtils.createRequestByteBuffer(request)
-
-    // create the kafka request handler
-    val requestChannel = new RequestChannel(2, 5)
-    val apis = new KafkaApis(requestChannel, replicaManager, zkClient, 1)
-
-    // call the API (to be tested) to get metadata
-    apis.handleTopicMetadataRequest(new RequestChannel.Request
-      (processor=0, requestKey=RequestKeys.MetadataKey, 
buffer=serializedMetadataRequest, startTimeMs=1))
-    val metadataResponse = 
requestChannel.receiveResponse(0).responseSend.asInstanceOf[BoundedByteBufferSend].buffer
-    
-    // check assertions
-    val topicMetadata = 
TopicMetadataResponse.readFrom(metadataResponse).topicsMetadata
-
-    topicMetadata
+    assertEquals(1, partitionMetadata.head.replicas.size)
+    assertTrue(partitionMetadata.head.leader.isDefined)
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala 
b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index 7f7a8d7..458b9ad 100644
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -260,10 +260,11 @@ class AsyncProducerTest extends JUnit3Suite {
                                                          topicPartitionInfos = 
topicPartitionInfos)
     try {
       handler.partitionAndCollate(producerDataList)
-      fail("Should fail with UnknownTopicOrPartitionException")
     }
     catch {
-      case e: UnknownTopicOrPartitionException => // expected, do nothing
+      // should not throw UnknownTopicOrPartitionException to allow resend
+      case e: UnknownTopicOrPartitionException => fail("Should not throw 
UnknownTopicOrPartitionException")
+
     }
   }
 
@@ -291,10 +292,10 @@ class AsyncProducerTest extends JUnit3Suite {
                                                          topicPartitionInfos = 
topicPartitionInfos)
     try {
       handler.handle(producerDataList)
-      fail("Should fail with NoBrokersForPartitionException")
+      fail("Should fail with FailedToSendMessageException")
     }
     catch {
-      case e: NoBrokersForPartitionException => // expected, do nothing
+      case e: FailedToSendMessageException => // we retry on any exception now
     }
   }
 
@@ -418,6 +419,8 @@ class AsyncProducerTest extends JUnit3Suite {
     val response2 = ProducerResponse(0,
       Map((TopicAndPartition("topic1", 0), 
ProducerResponseStatus(ErrorMapping.NoError, 0L))))
     val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
+    // don't care about config mock
+    
EasyMock.expect(mockSyncProducer.config).andReturn(EasyMock.anyObject()).anyTimes()
     EasyMock.expect(mockSyncProducer.send(request1)).andThrow(new 
RuntimeException) // simulate SocketTimeoutException
     EasyMock.expect(mockSyncProducer.send(request2)).andReturn(response1)
     EasyMock.expect(mockSyncProducer.send(request3)).andReturn(response2)

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala 
b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index bc37531..b511d90 100644
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -23,14 +23,17 @@ import kafka.message.Message
 import kafka.server.{KafkaConfig, KafkaRequestHandler, KafkaServer}
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.log4j.{Level, Logger}
-import org.junit.Assert._
 import org.junit.Test
 import kafka.utils._
 import java.util
-import kafka.admin.{AdminUtils, CreateTopicCommand}
+import kafka.admin.CreateTopicCommand
 import util.Properties
 import kafka.api.FetchRequestBuilder
-import kafka.common.{KafkaException, ErrorMapping, 
FailedToSendMessageException}
+import kafka.common.FailedToSendMessageException
+import org.junit.Assert.assertTrue
+import org.junit.Assert.assertFalse
+import org.junit.Assert.assertEquals
+
 
 
 class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
@@ -43,6 +46,7 @@ class ProducerTest extends JUnit3Suite with 
ZooKeeperTestHarness with Logging{
   private var consumer1: SimpleConsumer = null
   private var consumer2: SimpleConsumer = null
   private val requestHandlerLogger = 
Logger.getLogger(classOf[KafkaRequestHandler])
+  private var servers = List.empty[KafkaServer]
 
   private val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
   private val config1 = new KafkaConfig(props1) {
@@ -60,6 +64,7 @@ class ProducerTest extends JUnit3Suite with 
ZooKeeperTestHarness with Logging{
     // set up 2 brokers with 4 partitions each
     server1 = TestUtils.createServer(config1)
     server2 = TestUtils.createServer(config2)
+    servers = List(server1,server2)
 
     val props = new Properties()
     props.put("host", "localhost")
@@ -68,7 +73,6 @@ class ProducerTest extends JUnit3Suite with 
ZooKeeperTestHarness with Logging{
     consumer1 = new SimpleConsumer("localhost", port1, 1000000, 64*1024, "")
     consumer2 = new SimpleConsumer("localhost", port2, 100, 64*1024, "")
 
-
     // temporarily set request handler logger to a higher level
     requestHandlerLogger.setLevel(Level.FATAL)
   }
@@ -87,10 +91,11 @@ class ProducerTest extends JUnit3Suite with 
ZooKeeperTestHarness with Logging{
 
 
   def testUpdateBrokerPartitionInfo() {
+    val topic = "new-topic"
     CreateTopicCommand.createTopic(zkClient, "new-topic", 1, 2)
-    assertTrue("Topic new-topic not created after timeout", 
TestUtils.waitUntilTrue(() =>
-      AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != 
ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
+    // wait until the update metadata request for new topic reaches all servers
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 500)
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
 
     val props1 = new util.Properties()
     props1.put("metadata.broker.list", "localhost:80,localhost:81")
@@ -98,10 +103,10 @@ class ProducerTest extends JUnit3Suite with 
ZooKeeperTestHarness with Logging{
     val producerConfig1 = new ProducerConfig(props1)
     val producer1 = new Producer[String, String](producerConfig1)
     try{
-      producer1.send(new KeyedMessage[String, String]("new-topic", "test", 
"test1"))
+      producer1.send(new KeyedMessage[String, String](topic, "test", "test1"))
       fail("Test should fail because the broker list provided are not valid")
     } catch {
-      case e: KafkaException =>
+      case e: FailedToSendMessageException =>
       case oe => fail("fails with exception", oe)
     } finally {
       producer1.close()
@@ -113,7 +118,7 @@ class ProducerTest extends JUnit3Suite with 
ZooKeeperTestHarness with Logging{
     val producerConfig2= new ProducerConfig(props2)
     val producer2 = new Producer[String, String](producerConfig2)
     try{
-      producer2.send(new KeyedMessage[String, String]("new-topic", "test", 
"test1"))
+      producer2.send(new KeyedMessage[String, String](topic, "test", "test1"))
     } catch {
       case e => fail("Should succeed sending the message", e)
     } finally {
@@ -126,7 +131,7 @@ class ProducerTest extends JUnit3Suite with 
ZooKeeperTestHarness with Logging{
     val producerConfig3 = new ProducerConfig(props3)
     val producer3 = new Producer[String, String](producerConfig3)
     try{
-      producer3.send(new KeyedMessage[String, String]("new-topic", "test", 
"test1"))
+      producer3.send(new KeyedMessage[String, String](topic, "test", "test1"))
     } catch {
       case e => fail("Should succeed sending the message", e)
     } finally {
@@ -151,27 +156,27 @@ class ProducerTest extends JUnit3Suite with 
ZooKeeperTestHarness with Logging{
     val producerConfig1 = new ProducerConfig(props1)
     val producerConfig2 = new ProducerConfig(props2)
 
+    val topic = "new-topic"
     // create topic with 1 partition and await leadership
-    CreateTopicCommand.createTopic(zkClient, "new-topic", 1, 2)
-    assertTrue("Topic new-topic not created after timeout", 
TestUtils.waitUntilTrue(() =>
-      AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != 
ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
+    CreateTopicCommand.createTopic(zkClient, topic, 1, 2)
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000)
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
 
     val producer1 = new Producer[String, String](producerConfig1)
     val producer2 = new Producer[String, String](producerConfig2)
     // Available partition ids should be 0.
-    producer1.send(new KeyedMessage[String, String]("new-topic", "test", 
"test1"))
-    producer1.send(new KeyedMessage[String, String]("new-topic", "test", 
"test2"))
+    producer1.send(new KeyedMessage[String, String](topic, "test", "test1"))
+    producer1.send(new KeyedMessage[String, String](topic, "test", "test2"))
     // get the leader
-    val leaderOpt = ZkUtils.getLeaderForPartition(zkClient, "new-topic", 0)
+    val leaderOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
     assertTrue("Leader for topic new-topic partition 0 should exist", 
leaderOpt.isDefined)
     val leader = leaderOpt.get
 
     val messageSet = if(leader == server1.config.brokerId) {
-      val response1 = consumer1.fetch(new 
FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
+      val response1 = consumer1.fetch(new 
FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
       response1.messageSet("new-topic", 0).iterator.toBuffer
     }else {
-      val response2 = consumer2.fetch(new 
FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
+      val response2 = consumer2.fetch(new 
FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
       response2.messageSet("new-topic", 0).iterator.toBuffer
     }
     assertEquals("Should have fetched 2 messages", 2, messageSet.size)
@@ -180,7 +185,7 @@ class ProducerTest extends JUnit3Suite with 
ZooKeeperTestHarness with Logging{
     producer1.close()
 
     try {
-      producer2.send(new KeyedMessage[String, String]("new-topic", "test", 
"test2"))
+      producer2.send(new KeyedMessage[String, String](topic, "test", "test2"))
       fail("Should have timed out for 3 acks.")
     }
     catch {
@@ -202,21 +207,22 @@ class ProducerTest extends JUnit3Suite with 
ZooKeeperTestHarness with Logging{
     props.put("request.required.acks", "1")
     props.put("metadata.broker.list", 
TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
 
+    val topic = "new-topic"
     // create topic
-    CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0,0,0,0")
-    assertTrue("Topic new-topic not created after timeout", 
TestUtils.waitUntilTrue(() =>
-      AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != 
ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 1, 500)
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 2, 500)
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 3, 500)
+    CreateTopicCommand.createTopic(zkClient, topic, 4, 2, "0,0,0,0")
+    // waiting for 1 partition is enough
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000)
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500)
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 2, 500)
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 3, 500)
 
     val config = new ProducerConfig(props)
     val producer = new Producer[String, String](config)
     try {
       // Available partition ids should be 0, 1, 2 and 3, all lead and hosted 
only
       // on broker 0
-      producer.send(new KeyedMessage[String, String]("new-topic", "test", 
"test1"))
+      producer.send(new KeyedMessage[String, String](topic, "test", "test1"))
     } catch {
       case e => fail("Unexpected exception: " + e)
     }
@@ -227,7 +233,7 @@ class ProducerTest extends JUnit3Suite with 
ZooKeeperTestHarness with Logging{
 
     try {
       // These sends should fail since there are no available brokers
-      producer.send(new KeyedMessage[String, String]("new-topic", "test", 
"test1"))
+      producer.send(new KeyedMessage[String, String](topic, "test", "test1"))
       fail("Should fail since no leader exists for the partition.")
     } catch {
       case e => // success
@@ -235,12 +241,12 @@ class ProducerTest extends JUnit3Suite with 
ZooKeeperTestHarness with Logging{
 
     // restart server 1
     server1.startup()
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
 
     try {
       // cross check if broker 1 got the messages
-      val response1 = consumer1.fetch(new 
FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
-      val messageSet1 = response1.messageSet("new-topic", 0).iterator
+      val response1 = consumer1.fetch(new 
FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
+      val messageSet1 = response1.messageSet(topic, 0).iterator
       assertTrue("Message set should have 1 message", messageSet1.hasNext)
       assertEquals(new Message(bytes = "test1".getBytes, key = 
"test".getBytes), messageSet1.next.message)
       assertFalse("Message set should have another message", 
messageSet1.hasNext)
@@ -259,22 +265,22 @@ class ProducerTest extends JUnit3Suite with 
ZooKeeperTestHarness with Logging{
     props.put("request.timeout.ms", String.valueOf(timeoutMs))
     props.put("metadata.broker.list", 
TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
     props.put("request.required.acks", "1")
-
+    
props.put("client.id","ProducerTest-testAsyncSendCanCorrectlyFailWithTimeout")
     val config = new ProducerConfig(props)
     val producer = new Producer[String, String](config)
 
+    val topic = "new-topic"
     // create topics in ZK
-    CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, 
"0:1,0:1,0:1,0:1")
-    assertTrue("Topic new-topic not created after timeout", 
TestUtils.waitUntilTrue(() =>
-      AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != 
ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
+    CreateTopicCommand.createTopic(zkClient, topic, 4, 2, "0:1,0:1,0:1,0:1")
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000)
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
 
     // do a simple test to make sure plumbing is okay
     try {
       // this message should be assigned to partition 0 whose leader is on 
broker 0
-      producer.send(new KeyedMessage[String, String]("new-topic", "test", 
"test"))
+      producer.send(new KeyedMessage[String, String](topic, "test", "test"))
       // cross check if brokers got the messages
-      val response1 = consumer1.fetch(new 
FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
+      val response1 = consumer1.fetch(new 
FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
       val messageSet1 = response1.messageSet("new-topic", 0).iterator
       assertTrue("Message set should have 1 message", messageSet1.hasNext)
       assertEquals(new Message("test".getBytes), messageSet1.next.message)
@@ -290,7 +296,7 @@ class ProducerTest extends JUnit3Suite with 
ZooKeeperTestHarness with Logging{
     try {
       // this message should be assigned to partition 0 whose leader is on 
broker 0, but
       // broker 0 will not response within timeoutMs millis.
-      producer.send(new KeyedMessage[String, String]("new-topic", "test", 
"test"))
+      producer.send(new KeyedMessage[String, String](topic, "test", "test"))
     } catch {
       case e: FailedToSendMessageException => /* success */
       case e: Exception => fail("Not expected", e)

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala 
b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 8f88177..c4328f0 100644
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -132,7 +132,7 @@ class LeaderElectionTest extends JUnit3Suite with 
ZooKeeperTestHarness {
     val leaderAndIsr = new collection.mutable.HashMap[(String, Int), 
LeaderIsrAndControllerEpoch]
     leaderAndIsr.put((topic, partitionId),
       new LeaderIsrAndControllerEpoch(new LeaderAndIsr(brokerId2, 
List(brokerId1, brokerId2)), 2))
-    val partitionStateInfo = leaderAndIsr.mapValues(l => new 
PartitionStateInfo(l, 1)).toMap
+    val partitionStateInfo = leaderAndIsr.mapValues(l => new 
PartitionStateInfo(l, Set(0,1))).toMap
     val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfo, 
brokers.toSet, controllerId,
                                                       staleControllerEpoch, 0, 
"")
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 3728f8c..947e795 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -50,6 +50,8 @@ class ServerShutdownTest extends JUnit3Suite with 
ZooKeeperTestHarness {
 
     // create topic
     CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
+    TestUtils.waitUntilMetadataIsPropagated(Seq(server), topic, 0, 1000)
+
     // send some messages
     producer.send(sent1.map(m => new KeyedMessage[Int, String](topic, 0, 
m)):_*)
 
@@ -65,11 +67,12 @@ class ServerShutdownTest extends JUnit3Suite with 
ZooKeeperTestHarness {
     server = new KafkaServer(config)
     server.startup()
 
+    // wait for the broker to receive the update metadata request after startup
+    TestUtils.waitUntilMetadataIsPropagated(Seq(server), topic, 0, 1000)
+
     producer = new Producer[Int, String](new ProducerConfig(producerConfig))
     val consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "")
 
-    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
-
     var fetchedMessage: ByteBufferMessageSet = null
     while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
       val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 
0, 0, 10000).maxWait(0).build())

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 1557047..c7dd8a7 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -20,7 +20,7 @@ import kafka.cluster.{Partition, Replica}
 import kafka.log.Log
 import kafka.message.{ByteBufferMessageSet, Message}
 import kafka.network.{BoundedByteBufferSend, RequestChannel}
-import kafka.utils.{Time, TestUtils, MockTime}
+import kafka.utils.{ZkUtils, Time, TestUtils, MockTime}
 import org.easymock.EasyMock
 import org.I0Itec.zkclient.ZkClient
 import org.scalatest.junit.JUnit3Suite
@@ -57,7 +57,9 @@ class SimpleFetchTest extends JUnit3Suite {
     val fetchSize = 100
     val messages = new Message("test-message".getBytes())
 
-    val zkClient = EasyMock.createMock(classOf[ZkClient])
+    // create nice mock since we don't particularly care about zkclient calls
+    val zkClient = EasyMock.createNiceMock(classOf[ZkClient])
+    
EasyMock.expect(zkClient.exists(ZkUtils.ControllerEpochPath)).andReturn(false)
     EasyMock.replay(zkClient)
 
     val log = EasyMock.createMock(classOf[kafka.log.Log])
@@ -151,7 +153,8 @@ class SimpleFetchTest extends JUnit3Suite {
     val followerReplicaId = configs(1).brokerId
     val followerLEO = 15
 
-    val zkClient = EasyMock.createMock(classOf[ZkClient])
+    val zkClient = EasyMock.createNiceMock(classOf[ZkClient])
+    
EasyMock.expect(zkClient.exists(ZkUtils.ControllerEpochPath)).andReturn(false)
     EasyMock.replay(zkClient)
 
     val log = EasyMock.createMock(classOf[kafka.log.Log])

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index f9c9e64..3cb1d4a 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -37,6 +37,7 @@ import kafka.api._
 import collection.mutable.Map
 import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder}
 import kafka.common.TopicAndPartition
+import junit.framework.Assert
 
 
 /**
@@ -499,6 +500,12 @@ object TestUtils extends Logging {
     byteBuffer.rewind()
     byteBuffer
   }
+
+  def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, 
partition: Int, timeout: Long) = {
+    Assert.assertTrue("Partition [%s,%d] metadata not propagated after 
timeout".format(topic, partition),
+      TestUtils.waitUntilTrue(() =>
+        servers.foldLeft(true)(_ && 
_.apis.leaderCache.keySet.contains(TopicAndPartition(topic, partition))), 
timeout))
+  }
   
 }
 

Reply via email to