Repository: kafka Updated Branches: refs/heads/trunk 82e75b960 -> 2c0055e62
HOTFIX: Do Not use unlimited num messages in IntegrationTestUtils Removed readKeyValues() that give UNLIMITED_MESSAGES which will doom to exhaust all wait time, as all its callers actually do provide the expected number of messages. Author: Guozhang Wang <wangg...@gmail.com> Reviewers: Matthias J. Sax <matth...@confluent.io>, Jason Gustafson <ja...@confluent.io> Closes #2507 from guozhangwang/KHotfix-not-use-limited-num-messages Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2c0055e6 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2c0055e6 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2c0055e6 Branch: refs/heads/trunk Commit: 2c0055e62fc6308e45ff8a9c1c386b43fa3a3905 Parents: 82e75b9 Author: Guozhang Wang <wangg...@gmail.com> Authored: Mon Feb 6 14:55:00 2017 -0800 Committer: Jason Gustafson <ja...@confluent.io> Committed: Mon Feb 6 14:55:12 2017 -0800 ---------------------------------------------------------------------- .../integration/utils/IntegrationTestUtils.java | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/2c0055e6/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index a38781b..2680a31 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -53,7 +53,6 @@ import java.util.concurrent.Future; */ public class IntegrationTestUtils { - public static final int UNLIMITED_MESSAGES = -1; public static final long DEFAULT_TIMEOUT = 30 * 1000L; /** @@ -75,19 +74,6 @@ public class IntegrationTestUtils { } /** - * Returns as many messages as possible from the topic until a (currently hardcoded) timeout is - * reached. - * - * @param topic Kafka topic to read messages from - * @param consumerConfig Kafka consumer configuration - * @param waitTime Maximum wait time in milliseconds - * @return The KeyValue elements retrieved via the consumer. - */ - public static <K, V> List<KeyValue<K, V>> readKeyValues(final String topic, final Properties consumerConfig, final long waitTime) { - return readKeyValues(topic, consumerConfig, waitTime, UNLIMITED_MESSAGES); - } - - /** * Returns up to `maxMessages` by reading via the provided consumer (the topic(s) to read from * are already configured in the consumer). * @@ -210,7 +196,7 @@ public class IntegrationTestUtils { final TestCondition valuesRead = new TestCondition() { @Override public boolean conditionMet() { - final List<KeyValue<K, V>> readData = readKeyValues(topic, consumerConfig, waitTime); + final List<KeyValue<K, V>> readData = readKeyValues(topic, consumerConfig, waitTime, expectedNumRecords); accumData.addAll(readData); return accumData.size() >= expectedNumRecords; }