fix #1983, fix KafkaUtilSpec failure
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/ec59bbd1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/ec59bbd1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/ec59bbd1 Branch: refs/heads/master Commit: ec59bbd1144abf4879d53950fc4d839c3c2cf622 Parents: c7ba3d5 Author: manuzhang <[email protected]> Authored: Tue Mar 1 09:06:32 2016 +0800 Committer: manuzhang <[email protected]> Committed: Tue Mar 1 11:23:49 2016 +0800 ---------------------------------------------------------------------- .../streaming/kafka/util/KafkaServerHarness.scala | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ec59bbd1/external/kafka/src/test/scala/io/gearpump/streaming/kafka/util/KafkaServerHarness.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/util/KafkaServerHarness.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/util/KafkaServerHarness.scala index ddce4a4..2ee7260 100644 --- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/util/KafkaServerHarness.scala +++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/util/KafkaServerHarness.scala @@ -18,6 +18,9 @@ package io.gearpump.streaming.kafka.util +import java.util.Properties + +import kafka.admin.AdminUtils import kafka.common.KafkaException import kafka.server.{KafkaConfig => KafkaServerConfig, KafkaServer} import kafka.utils.{Utils, TestUtils} @@ -44,10 +47,16 @@ trait KafkaServerHarness extends ZookeeperHarness { super.tearDown } - def createTopicUntilLeaderIsElected(topic: String, partitions: Int, replicas: Int) = { + def createTopicUntilLeaderIsElected(topic: String, partitions: Int, replicas: Int, timeout: Long = 10000) = { val zkClient = connectZk() try { - TestUtils.createTopic(zkClient, topic, partitions, replicas, servers) + // create topic + AdminUtils.createTopic(zkClient, topic, partitions, replicas, new Properties) + // wait until the update metadata request for new topic reaches all servers + (0 until partitions).map { case i => + TestUtils.waitUntilMetadataIsPropagated(servers, topic, i, timeout) + i -> TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, i, timeout) + }.toMap } catch { case e: Exception => throw e } finally {
