fix #1983, fix KafkaUtilSpec failure

Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/ec59bbd1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/ec59bbd1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/ec59bbd1

Branch: refs/heads/master
Commit: ec59bbd1144abf4879d53950fc4d839c3c2cf622
Parents: c7ba3d5
Author: manuzhang <[email protected]>
Authored: Tue Mar 1 09:06:32 2016 +0800
Committer: manuzhang <[email protected]>
Committed: Tue Mar 1 11:23:49 2016 +0800

----------------------------------------------------------------------
 .../streaming/kafka/util/KafkaServerHarness.scala      | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ec59bbd1/external/kafka/src/test/scala/io/gearpump/streaming/kafka/util/KafkaServerHarness.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/util/KafkaServerHarness.scala
 
b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/util/KafkaServerHarness.scala
index ddce4a4..2ee7260 100644
--- 
a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/util/KafkaServerHarness.scala
+++ 
b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/util/KafkaServerHarness.scala
@@ -18,6 +18,9 @@
 
 package io.gearpump.streaming.kafka.util
 
+import java.util.Properties
+
+import kafka.admin.AdminUtils
 import kafka.common.KafkaException
 import kafka.server.{KafkaConfig => KafkaServerConfig, KafkaServer}
 import kafka.utils.{Utils, TestUtils}
@@ -44,10 +47,16 @@ trait KafkaServerHarness extends ZookeeperHarness {
     super.tearDown
   }
 
-  def createTopicUntilLeaderIsElected(topic: String, partitions: Int, 
replicas: Int) = {
+  def createTopicUntilLeaderIsElected(topic: String, partitions: Int, 
replicas: Int, timeout: Long = 10000) = {
     val zkClient = connectZk()
     try {
-      TestUtils.createTopic(zkClient, topic, partitions, replicas, servers)
+      // create topic
+      AdminUtils.createTopic(zkClient, topic, partitions, replicas, new 
Properties)
+      // wait until the update metadata request for new topic reaches all 
servers
+      (0 until partitions).map { case i =>
+        TestUtils.waitUntilMetadataIsPropagated(servers, topic, i, timeout)
+        i -> TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, i, 
timeout)
+      }.toMap
     } catch {
       case e: Exception => throw e
     } finally {

Reply via email to