Repository: kafka
Updated Branches:
  refs/heads/trunk 82e75b960 -> 2c0055e62


HOTFIX: Do Not use unlimited num messages in IntegrationTestUtils

Removed readKeyValues() that give UNLIMITED_MESSAGES which will doom to exhaust 
all wait time, as all its callers actually do provide the expected number of 
messages.

Author: Guozhang Wang <wangg...@gmail.com>

Reviewers: Matthias J. Sax <matth...@confluent.io>, Jason Gustafson 
<ja...@confluent.io>

Closes #2507 from guozhangwang/KHotfix-not-use-limited-num-messages


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2c0055e6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2c0055e6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2c0055e6

Branch: refs/heads/trunk
Commit: 2c0055e62fc6308e45ff8a9c1c386b43fa3a3905
Parents: 82e75b9
Author: Guozhang Wang <wangg...@gmail.com>
Authored: Mon Feb 6 14:55:00 2017 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Mon Feb 6 14:55:12 2017 -0800

----------------------------------------------------------------------
 .../integration/utils/IntegrationTestUtils.java     | 16 +---------------
 1 file changed, 1 insertion(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2c0055e6/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index a38781b..2680a31 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -53,7 +53,6 @@ import java.util.concurrent.Future;
  */
 public class IntegrationTestUtils {
 
-    public static final int UNLIMITED_MESSAGES = -1;
     public static final long DEFAULT_TIMEOUT = 30 * 1000L;
 
     /**
@@ -75,19 +74,6 @@ public class IntegrationTestUtils {
     }
 
     /**
-     * Returns as many messages as possible from the topic until a (currently 
hardcoded) timeout is
-     * reached.
-     *
-     * @param topic          Kafka topic to read messages from
-     * @param consumerConfig Kafka consumer configuration
-     * @param waitTime       Maximum wait time in milliseconds
-     * @return The KeyValue elements retrieved via the consumer.
-     */
-    public static <K, V> List<KeyValue<K, V>> readKeyValues(final String 
topic, final Properties consumerConfig, final long waitTime) {
-        return readKeyValues(topic, consumerConfig, waitTime, 
UNLIMITED_MESSAGES);
-    }
-
-    /**
      * Returns up to `maxMessages` by reading via the provided consumer (the 
topic(s) to read from
      * are already configured in the consumer).
      *
@@ -210,7 +196,7 @@ public class IntegrationTestUtils {
         final TestCondition valuesRead = new TestCondition() {
             @Override
             public boolean conditionMet() {
-                final List<KeyValue<K, V>> readData = readKeyValues(topic, 
consumerConfig, waitTime);
+                final List<KeyValue<K, V>> readData = readKeyValues(topic, 
consumerConfig, waitTime, expectedNumRecords);
                 accumData.addAll(readData);
                 return accumData.size() >= expectedNumRecords;
             }

Reply via email to