Repository: kafka
Updated Branches:
  refs/heads/trunk 8c8b54ee4 -> fee6f6f92


MINOR: Removed 1/2 of the hardcoded sleeps in Streams

Author: Eno Thereska <[email protected]>

Reviewers: Guozhang Wang <[email protected]>, Ismael Juma <[email protected]>

Closes #1422 from enothereska/minor-integration-timeout2


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fee6f6f9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fee6f6f9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fee6f6f9

Branch: refs/heads/trunk
Commit: fee6f6f927b36ac74bc4a8b233711234558f3b51
Parents: 8c8b54e
Author: Eno Thereska <[email protected]>
Authored: Wed May 25 13:08:57 2016 +0100
Committer: Ismael Juma <[email protected]>
Committed: Wed May 25 13:08:57 2016 +0100

----------------------------------------------------------------------
 .../integration/FanoutIntegrationTest.java      | 11 ++-
 .../InternalTopicIntegrationTest.java           |  4 +
 .../integration/JoinIntegrationTest.java        | 10 +--
 .../integration/MapFunctionIntegrationTest.java |  8 +-
 .../integration/PassThroughIntegrationTest.java |  8 +-
 .../integration/WordCountIntegrationTest.java   |  6 +-
 .../integration/utils/IntegrationTestUtils.java | 80 +++++++++++++++++++-
 7 files changed, 98 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fee6f6f9/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
index 2e11cd2..5199caa 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
@@ -134,10 +134,6 @@ public class FanoutIntegrationTest {
         producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
         IntegrationTestUtils.produceValuesSynchronously(INPUT_TOPIC_A, 
inputValues, producerConfig);
 
-        // Give the stream processing application some time to do its work.
-        Thread.sleep(10000);
-        streams.close();
-
         //
         // Step 3: Verify the application's output data.
         //
@@ -149,7 +145,8 @@ public class FanoutIntegrationTest {
         consumerConfigB.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
         consumerConfigB.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class);
         consumerConfigB.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
-        List<String> actualValuesForB = 
IntegrationTestUtils.readValues(OUTPUT_TOPIC_B, consumerConfigB, 
inputValues.size());
+        List<String> actualValuesForB = 
IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfigB,
+            OUTPUT_TOPIC_B, inputValues.size());
         assertThat(actualValuesForB, equalTo(expectedValuesForB));
 
         // Verify output topic C
@@ -159,7 +156,9 @@ public class FanoutIntegrationTest {
         consumerConfigC.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
         consumerConfigC.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class);
         consumerConfigC.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
-        List<String> actualValuesForC = 
IntegrationTestUtils.readValues(OUTPUT_TOPIC_C, consumerConfigC, 
inputValues.size());
+        List<String> actualValuesForC = 
IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfigC,
+            OUTPUT_TOPIC_C, inputValues.size());
+        streams.close();
         assertThat(actualValuesForC, equalTo(expectedValuesForC));
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fee6f6f9/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 66111c4..e431b57 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -149,6 +149,10 @@ public class InternalTopicIntegrationTest {
         KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
         streams.start();
 
+        // Wait briefly for the topology to be fully up and running (otherwise 
it might miss some or all
+        // of the input data we produce below).
+        Thread.sleep(5000);
+
         //
         // Step 2: Produce some input data to the input topic.
         //

http://git-wip-us.apache.org/repos/asf/kafka/blob/fee6f6f9/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
index 93e31e2..4f318ec 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
@@ -219,7 +219,7 @@ public class JoinIntegrationTest {
 
         // Wait briefly for the topology to be fully up and running (otherwise 
it might miss some or all
         // of the input data we produce below).
-        Thread.sleep(5000);
+        Thread.sleep(10000);
 
         //
         // Step 2: Publish user-region information.
@@ -246,10 +246,6 @@ public class JoinIntegrationTest {
         
userClicksProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
LongSerializer.class);
         IntegrationTestUtils.produceKeyValuesSynchronously(USER_CLICKS_TOPIC, 
userClicks, userClicksProducerConfig);
 
-        // Give the stream processing application some time to do its work.
-        Thread.sleep(10000);
-        streams.close();
-
         //
         // Step 4: Verify the application's output data.
         //
@@ -259,7 +255,9 @@ public class JoinIntegrationTest {
         consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
         consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
         consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
LongDeserializer.class);
-        List<KeyValue<String, Long>> actualClicksPerRegion = 
IntegrationTestUtils.readKeyValues(OUTPUT_TOPIC, consumerConfig);
+        List<KeyValue<String, Long>> actualClicksPerRegion = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig,
+            OUTPUT_TOPIC, expectedClicksPerRegion.size());
+        streams.close();
         assertThat(actualClicksPerRegion, equalTo(expectedClicksPerRegion));
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fee6f6f9/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java
index 31ac400..3c37aa1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java
@@ -107,10 +107,6 @@ public class MapFunctionIntegrationTest {
         producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
         IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, 
inputValues, producerConfig);
 
-        // Give the stream processing application some time to do its work.
-        Thread.sleep(10000);
-        streams.close();
-
         //
         // Step 3: Verify the application's output data.
         //
@@ -120,7 +116,9 @@ public class MapFunctionIntegrationTest {
         consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
         consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class);
         consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
-        List<String> actualValues = 
IntegrationTestUtils.readValues(DEFAULT_OUTPUT_TOPIC, consumerConfig, 
inputValues.size());
+        List<String> actualValues = 
IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfig,
+            DEFAULT_OUTPUT_TOPIC, inputValues.size());
+        streams.close();
         assertThat(actualValues, equalTo(expectedValues));
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fee6f6f9/streams/src/test/java/org/apache/kafka/streams/integration/PassThroughIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/PassThroughIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/PassThroughIntegrationTest.java
index e126ed8..e81d21c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/PassThroughIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/PassThroughIntegrationTest.java
@@ -94,10 +94,6 @@ public class PassThroughIntegrationTest {
         producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
         IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, 
inputValues, producerConfig);
 
-        // Give the stream processing application some time to do its work.
-        Thread.sleep(10000);
-        streams.close();
-
         //
         // Step 3: Verify the application's output data.
         //
@@ -107,7 +103,9 @@ public class PassThroughIntegrationTest {
         consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
         consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
         consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
-        List<String> actualValues = 
IntegrationTestUtils.readValues(DEFAULT_OUTPUT_TOPIC, consumerConfig, 
inputValues.size());
+        List<String> actualValues = 
IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfig,
+            DEFAULT_OUTPUT_TOPIC, inputValues.size());
+        streams.close();
         assertThat(actualValues, equalTo(inputValues));
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/fee6f6f9/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java
index c8583d1..c86409a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java
@@ -134,15 +134,15 @@ public class WordCountIntegrationTest {
         //
         // Step 3: Verify the application's output data.
         //
-        Thread.sleep(10000);
-        streams.close();
         Properties consumerConfig = new Properties();
         consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, 
"wordcount-integration-test-standard-consumer");
         consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
         consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
         consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
LongDeserializer.class);
-        List<KeyValue<String, Long>> actualWordCounts = 
IntegrationTestUtils.readKeyValues(DEFAULT_OUTPUT_TOPIC, consumerConfig);
+        List<KeyValue<String, Long>> actualWordCounts = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig,
+            DEFAULT_OUTPUT_TOPIC, expectedWordCounts.size());
+        streams.close();
         assertThat(actualWordCounts, equalTo(expectedWordCounts));
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fee6f6f9/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 89fe0c4..c3f9089 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -44,7 +44,8 @@ import java.util.concurrent.Future;
  */
 public class IntegrationTestUtils {
 
-    private static final int UNLIMITED_MESSAGES = -1;
+    public static final int UNLIMITED_MESSAGES = -1;
+    public static final long DEFAULT_TIMEOUT = 30 * 1000L;
 
     /**
      * Returns up to `maxMessages` message-values from the topic.
@@ -54,10 +55,10 @@ public class IntegrationTestUtils {
      * @param maxMessages    Maximum number of messages to read via the 
consumer.
      * @return The values retrieved via the consumer.
      */
-    public static <K, V> List<V> readValues(String topic, Properties 
consumerConfig, int maxMessages) {
+    public static <V> List<V> readValues(String topic, Properties 
consumerConfig, int maxMessages) {
         List<V> returnList = new ArrayList<>();
-        List<KeyValue<K, V>> kvs = readKeyValues(topic, consumerConfig, 
maxMessages);
-        for (KeyValue<K, V> kv : kvs) {
+        List<KeyValue<Object, V>> kvs = readKeyValues(topic, consumerConfig, 
maxMessages);
+        for (KeyValue<?, V> kv : kvs) {
             returnList.add(kv.value);
         }
         return returnList;
@@ -154,4 +155,75 @@ public class IntegrationTestUtils {
         produceKeyValuesSynchronously(topic, keyedRecords, producerConfig);
     }
 
+    public static <K, V> List<KeyValue<K, V>> 
waitUntilMinKeyValueRecordsReceived(Properties consumerConfig,
+                                                                               
   String topic,
+                                                                               
   int expectedNumRecords) throws InterruptedException {
+
+        return waitUntilMinKeyValueRecordsReceived(consumerConfig, topic, 
expectedNumRecords, DEFAULT_TIMEOUT);
+    }
+
+    /**
+     * Wait until enough data (key-value records) has been consumed.
+     * @param consumerConfig Kafka Consumer configuration
+     * @param topic          Topic to consume from
+     * @param expectedNumRecords Minimum number of expected records
+     * @param waitTime       Upper bound in waiting time in milliseconds
+     * @return All the records consumed, or null if no records are consumed
+     * @throws InterruptedException
+     * @throws AssertionError if the given wait time elapses
+     */
+    public static <K, V> List<KeyValue<K, V>> 
waitUntilMinKeyValueRecordsReceived(Properties consumerConfig,
+                                                                               
   String topic,
+                                                                               
   int expectedNumRecords,
+                                                                               
   long waitTime) throws InterruptedException {
+        List<KeyValue<K, V>> accumData = new ArrayList<>();
+        long startTime = System.currentTimeMillis();
+        while (true) {
+            List<KeyValue<K, V>> readData = readKeyValues(topic, 
consumerConfig);
+            accumData.addAll(readData);
+            if (accumData.size() >= expectedNumRecords)
+                return accumData;
+            if (System.currentTimeMillis() > startTime + waitTime)
+                throw new AssertionError("Expected " +  expectedNumRecords +
+                    " but received only " + accumData.size() +
+                    " records before timeout " + waitTime + " ms");
+            Thread.sleep(Math.min(waitTime, 100L));
+        }
+    }
+
+    public static <V> List<V> waitUntilMinValuesRecordsReceived(Properties 
consumerConfig,
+                                                                String topic,
+                                                                int 
expectedNumRecords) throws InterruptedException {
+
+        return waitUntilMinValuesRecordsReceived(consumerConfig, topic, 
expectedNumRecords, DEFAULT_TIMEOUT);
+    }
+
+    /**
+     * Wait until enough data (value records) has been consumed.
+     * @param consumerConfig Kafka Consumer configuration
+     * @param topic          Topic to consume from
+     * @param expectedNumRecords Minimum number of expected records
+     * @param waitTime       Upper bound in waiting time in milliseconds
+     * @return All the records consumed, or null if no records are consumed
+     * @throws InterruptedException
+     * @throws AssertionError if the given wait time elapses
+     */
+    public static <V> List<V> waitUntilMinValuesRecordsReceived(Properties 
consumerConfig,
+                                                                String topic,
+                                                                int 
expectedNumRecords,
+                                                                long waitTime) 
throws InterruptedException {
+        List<V> accumData = new ArrayList<>();
+        long startTime = System.currentTimeMillis();
+        while (true) {
+            List<V> readData = readValues(topic, consumerConfig, 
expectedNumRecords);
+            accumData.addAll(readData);
+            if (accumData.size() >= expectedNumRecords)
+                return accumData;
+            if (System.currentTimeMillis() > startTime + waitTime)
+                throw new AssertionError("Expected " +  expectedNumRecords +
+                    " but received only " + accumData.size() +
+                    " records before timeout " + waitTime + " ms");
+            Thread.sleep(Math.min(waitTime, 100L));
+        }
+    }
 }
\ No newline at end of file

Reply via email to