Repository: kafka Updated Branches: refs/heads/trunk 8c8b54ee4 -> fee6f6f92
MINOR: Removed 1/2 of the hardcoded sleeps in Streams Author: Eno Thereska <[email protected]> Reviewers: Guozhang Wang <[email protected]>, Ismael Juma <[email protected]> Closes #1422 from enothereska/minor-integration-timeout2 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fee6f6f9 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fee6f6f9 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fee6f6f9 Branch: refs/heads/trunk Commit: fee6f6f927b36ac74bc4a8b233711234558f3b51 Parents: 8c8b54e Author: Eno Thereska <[email protected]> Authored: Wed May 25 13:08:57 2016 +0100 Committer: Ismael Juma <[email protected]> Committed: Wed May 25 13:08:57 2016 +0100 ---------------------------------------------------------------------- .../integration/FanoutIntegrationTest.java | 11 ++- .../InternalTopicIntegrationTest.java | 4 + .../integration/JoinIntegrationTest.java | 10 +-- .../integration/MapFunctionIntegrationTest.java | 8 +- .../integration/PassThroughIntegrationTest.java | 8 +- .../integration/WordCountIntegrationTest.java | 6 +- .../integration/utils/IntegrationTestUtils.java | 80 +++++++++++++++++++- 7 files changed, 98 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/fee6f6f9/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java index 2e11cd2..5199caa 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java @@ -134,10 +134,6 @@ public class FanoutIntegrationTest { producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); IntegrationTestUtils.produceValuesSynchronously(INPUT_TOPIC_A, inputValues, producerConfig); - // Give the stream processing application some time to do its work. - Thread.sleep(10000); - streams.close(); - // // Step 3: Verify the application's output data. // @@ -149,7 +145,8 @@ public class FanoutIntegrationTest { consumerConfigB.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerConfigB.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); consumerConfigB.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - List<String> actualValuesForB = IntegrationTestUtils.readValues(OUTPUT_TOPIC_B, consumerConfigB, inputValues.size()); + List<String> actualValuesForB = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfigB, + OUTPUT_TOPIC_B, inputValues.size()); assertThat(actualValuesForB, equalTo(expectedValuesForB)); // Verify output topic C @@ -159,7 +156,9 @@ public class FanoutIntegrationTest { consumerConfigC.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerConfigC.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); consumerConfigC.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - List<String> actualValuesForC = IntegrationTestUtils.readValues(OUTPUT_TOPIC_C, consumerConfigC, inputValues.size()); + List<String> actualValuesForC = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfigC, + OUTPUT_TOPIC_C, inputValues.size()); + streams.close(); assertThat(actualValuesForC, equalTo(expectedValuesForC)); } http://git-wip-us.apache.org/repos/asf/kafka/blob/fee6f6f9/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java index 66111c4..e431b57 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java @@ -149,6 +149,10 @@ public class InternalTopicIntegrationTest { KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); streams.start(); + // Wait briefly for the topology to be fully up and running (otherwise it might miss some or all + // of the input data we produce below). + Thread.sleep(5000); + // // Step 2: Produce some input data to the input topic. // http://git-wip-us.apache.org/repos/asf/kafka/blob/fee6f6f9/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java index 93e31e2..4f318ec 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java @@ -219,7 +219,7 @@ public class JoinIntegrationTest { // Wait briefly for the topology to be fully up and running (otherwise it might miss some or all // of the input data we produce below). - Thread.sleep(5000); + Thread.sleep(10000); // // Step 2: Publish user-region information. @@ -246,10 +246,6 @@ public class JoinIntegrationTest { userClicksProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class); IntegrationTestUtils.produceKeyValuesSynchronously(USER_CLICKS_TOPIC, userClicks, userClicksProducerConfig); - // Give the stream processing application some time to do its work. - Thread.sleep(10000); - streams.close(); - // // Step 4: Verify the application's output data. // @@ -259,7 +255,9 @@ public class JoinIntegrationTest { consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); - List<KeyValue<String, Long>> actualClicksPerRegion = IntegrationTestUtils.readKeyValues(OUTPUT_TOPIC, consumerConfig); + List<KeyValue<String, Long>> actualClicksPerRegion = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, + OUTPUT_TOPIC, expectedClicksPerRegion.size()); + streams.close(); assertThat(actualClicksPerRegion, equalTo(expectedClicksPerRegion)); } http://git-wip-us.apache.org/repos/asf/kafka/blob/fee6f6f9/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java index 31ac400..3c37aa1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java @@ -107,10 +107,6 @@ public class MapFunctionIntegrationTest { producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, inputValues, producerConfig); - // Give the stream processing application some time to do its work. - Thread.sleep(10000); - streams.close(); - // // Step 3: Verify the application's output data. // @@ -120,7 +116,9 @@ public class MapFunctionIntegrationTest { consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - List<String> actualValues = IntegrationTestUtils.readValues(DEFAULT_OUTPUT_TOPIC, consumerConfig, inputValues.size()); + List<String> actualValues = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfig, + DEFAULT_OUTPUT_TOPIC, inputValues.size()); + streams.close(); assertThat(actualValues, equalTo(expectedValues)); } http://git-wip-us.apache.org/repos/asf/kafka/blob/fee6f6f9/streams/src/test/java/org/apache/kafka/streams/integration/PassThroughIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/PassThroughIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/PassThroughIntegrationTest.java index e126ed8..e81d21c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/PassThroughIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/PassThroughIntegrationTest.java @@ -94,10 +94,6 @@ public class PassThroughIntegrationTest { producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, inputValues, producerConfig); - // Give the stream processing application some time to do its work. - Thread.sleep(10000); - streams.close(); - // // Step 3: Verify the application's output data. // @@ -107,7 +103,9 @@ public class PassThroughIntegrationTest { consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - List<String> actualValues = IntegrationTestUtils.readValues(DEFAULT_OUTPUT_TOPIC, consumerConfig, inputValues.size()); + List<String> actualValues = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfig, + DEFAULT_OUTPUT_TOPIC, inputValues.size()); + streams.close(); assertThat(actualValues, equalTo(inputValues)); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/fee6f6f9/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java index c8583d1..c86409a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java @@ -134,15 +134,15 @@ public class WordCountIntegrationTest { // // Step 3: Verify the application's output data. // - Thread.sleep(10000); - streams.close(); Properties consumerConfig = new Properties(); consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "wordcount-integration-test-standard-consumer"); consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); - List<KeyValue<String, Long>> actualWordCounts = IntegrationTestUtils.readKeyValues(DEFAULT_OUTPUT_TOPIC, consumerConfig); + List<KeyValue<String, Long>> actualWordCounts = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, + DEFAULT_OUTPUT_TOPIC, expectedWordCounts.size()); + streams.close(); assertThat(actualWordCounts, equalTo(expectedWordCounts)); } http://git-wip-us.apache.org/repos/asf/kafka/blob/fee6f6f9/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index 89fe0c4..c3f9089 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -44,7 +44,8 @@ import java.util.concurrent.Future; */ public class IntegrationTestUtils { - private static final int UNLIMITED_MESSAGES = -1; + public static final int UNLIMITED_MESSAGES = -1; + public static final long DEFAULT_TIMEOUT = 30 * 1000L; /** * Returns up to `maxMessages` message-values from the topic. @@ -54,10 +55,10 @@ public class IntegrationTestUtils { * @param maxMessages Maximum number of messages to read via the consumer. * @return The values retrieved via the consumer. */ - public static <K, V> List<V> readValues(String topic, Properties consumerConfig, int maxMessages) { + public static <V> List<V> readValues(String topic, Properties consumerConfig, int maxMessages) { List<V> returnList = new ArrayList<>(); - List<KeyValue<K, V>> kvs = readKeyValues(topic, consumerConfig, maxMessages); - for (KeyValue<K, V> kv : kvs) { + List<KeyValue<Object, V>> kvs = readKeyValues(topic, consumerConfig, maxMessages); + for (KeyValue<?, V> kv : kvs) { returnList.add(kv.value); } return returnList; @@ -154,4 +155,75 @@ public class IntegrationTestUtils { produceKeyValuesSynchronously(topic, keyedRecords, producerConfig); } + public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(Properties consumerConfig, + String topic, + int expectedNumRecords) throws InterruptedException { + + return waitUntilMinKeyValueRecordsReceived(consumerConfig, topic, expectedNumRecords, DEFAULT_TIMEOUT); + } + + /** + * Wait until enough data (key-value records) has been consumed. + * @param consumerConfig Kafka Consumer configuration + * @param topic Topic to consume from + * @param expectedNumRecords Minimum number of expected records + * @param waitTime Upper bound in waiting time in milliseconds + * @return All the records consumed, or null if no records are consumed + * @throws InterruptedException + * @throws AssertionError if the given wait time elapses + */ + public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(Properties consumerConfig, + String topic, + int expectedNumRecords, + long waitTime) throws InterruptedException { + List<KeyValue<K, V>> accumData = new ArrayList<>(); + long startTime = System.currentTimeMillis(); + while (true) { + List<KeyValue<K, V>> readData = readKeyValues(topic, consumerConfig); + accumData.addAll(readData); + if (accumData.size() >= expectedNumRecords) + return accumData; + if (System.currentTimeMillis() > startTime + waitTime) + throw new AssertionError("Expected " + expectedNumRecords + + " but received only " + accumData.size() + + " records before timeout " + waitTime + " ms"); + Thread.sleep(Math.min(waitTime, 100L)); + } + } + + public static <V> List<V> waitUntilMinValuesRecordsReceived(Properties consumerConfig, + String topic, + int expectedNumRecords) throws InterruptedException { + + return waitUntilMinValuesRecordsReceived(consumerConfig, topic, expectedNumRecords, DEFAULT_TIMEOUT); + } + + /** + * Wait until enough data (value records) has been consumed. + * @param consumerConfig Kafka Consumer configuration + * @param topic Topic to consume from + * @param expectedNumRecords Minimum number of expected records + * @param waitTime Upper bound in waiting time in milliseconds + * @return All the records consumed, or null if no records are consumed + * @throws InterruptedException + * @throws AssertionError if the given wait time elapses + */ + public static <V> List<V> waitUntilMinValuesRecordsReceived(Properties consumerConfig, + String topic, + int expectedNumRecords, + long waitTime) throws InterruptedException { + List<V> accumData = new ArrayList<>(); + long startTime = System.currentTimeMillis(); + while (true) { + List<V> readData = readValues(topic, consumerConfig, expectedNumRecords); + accumData.addAll(readData); + if (accumData.size() >= expectedNumRecords) + return accumData; + if (System.currentTimeMillis() > startTime + waitTime) + throw new AssertionError("Expected " + expectedNumRecords + + " but received only " + accumData.size() + + " records before timeout " + waitTime + " ms"); + Thread.sleep(Math.min(waitTime, 100L)); + } + } } \ No newline at end of file
