This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 7c4e672 MINOR: Update test to wait for final value to reduce flakiness updated test method for multiple keys (#5517) 7c4e672 is described below commit 7c4e6724699bb6fc65112b5513848c733a03019e Author: Bill Bejeck <bbej...@gmail.com> AuthorDate: Fri Aug 17 15:14:29 2018 -0400 MINOR: Update test to wait for final value to reduce flakiness updated test method for multiple keys (#5517) Updated two integration tests to use IntegrationTestUtils#waitUntilFinalKeyValueRecordsReceived to eliminate flaky test results. Also, I updated IntegrationTestUtils#waitUntilFinalKeyValueRecordsReceived method to support having results with the same key present with different values. For testing, I ran the current suite of streams tests. Reviewers: Matthias J. Sax <matth...@confluent.io>, Guozhang Wang <wangg...@gmail.com> --- .../RepartitionOptimizingIntegrationTest.java | 13 ++----- ...artitionWithMergeOptimizingIntegrationTest.java | 9 +---- .../integration/utils/IntegrationTestUtils.java | 43 +++++++++++----------- 3 files changed, 27 insertions(+), 38 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java index e192c70..5eebf04 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java @@ -195,25 +195,20 @@ public class RepartitionOptimizingIntegrationTest { streams.start(); final List<KeyValue<String, Long>> expectedCountKeyValues = Arrays.asList(KeyValue.pair("A", 3L), KeyValue.pair("B", 3L), KeyValue.pair("C", 3L)); - final List<KeyValue<String, Long>> receivedCountKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig1, COUNT_TOPIC, expectedCountKeyValues.size()); + IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig1, COUNT_TOPIC, expectedCountKeyValues); final List<KeyValue<String, Integer>> expectedAggKeyValues = Arrays.asList(KeyValue.pair("A", 9), KeyValue.pair("B", 9), KeyValue.pair("C", 9)); - final List<KeyValue<String, Integer>> receivedAggKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig2, AGGREGATION_TOPIC, expectedAggKeyValues.size()); + IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig2, AGGREGATION_TOPIC, expectedAggKeyValues); final List<KeyValue<String, String>> expectedReduceKeyValues = Arrays.asList(KeyValue.pair("A", "foo:bar:baz"), KeyValue.pair("B", "foo:bar:baz"), KeyValue.pair("C", "foo:bar:baz")); - final List<KeyValue<String, Integer>> receivedReduceKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig3, REDUCE_TOPIC, expectedAggKeyValues.size()); + IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig3, REDUCE_TOPIC, expectedReduceKeyValues); final List<KeyValue<String, String>> expectedJoinKeyValues = Arrays.asList(KeyValue.pair("A", "foo:3"), KeyValue.pair("A", "bar:3"), KeyValue.pair("A", "baz:3")); - final List<KeyValue<String, Integer>> receivedJoinKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig3, JOINED_TOPIC, expectedJoinKeyValues.size()); + IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig3, JOINED_TOPIC, expectedJoinKeyValues); final List<String> expectedCollectedProcessorValues = Arrays.asList("FOO", "BAR", "BAZ"); - assertThat(receivedCountKeyValues, equalTo(expectedCountKeyValues)); - assertThat(receivedAggKeyValues, equalTo(expectedAggKeyValues)); - assertThat(receivedReduceKeyValues, equalTo(expectedReduceKeyValues)); - assertThat(receivedJoinKeyValues, equalTo(expectedJoinKeyValues)); - assertThat(3, equalTo(processorValueCollector.size())); assertThat(processorValueCollector, equalTo(expectedCollectedProcessorValues)); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java index 58d903a..af1f5f1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java @@ -51,9 +51,7 @@ import java.util.regex.Pattern; import kafka.utils.MockTime; -import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; @Category({IntegrationTest.class}) public class RepartitionWithMergeOptimizingIntegrationTest { @@ -165,13 +163,10 @@ public class RepartitionWithMergeOptimizingIntegrationTest { streams.start(); final List<KeyValue<String, Long>> expectedCountKeyValues = Arrays.asList(KeyValue.pair("A", 6L), KeyValue.pair("B", 6L), KeyValue.pair("C", 6L)); - final List<KeyValue<String, Long>> receivedCountKeyValues = IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig1, COUNT_TOPIC, expectedCountKeyValues); + IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig1, COUNT_TOPIC, expectedCountKeyValues); final List<KeyValue<String, String>> expectedStringCountKeyValues = Arrays.asList(KeyValue.pair("A", "6"), KeyValue.pair("B", "6"), KeyValue.pair("C", "6")); - final List<KeyValue<String, String>> receivedCountStringKeyValues = IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig2, COUNT_STRING_TOPIC, expectedStringCountKeyValues); - - assertThat(receivedCountKeyValues, equalTo(expectedCountKeyValues)); - assertThat(receivedCountStringKeyValues, equalTo(expectedStringCountKeyValues)); + IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig2, COUNT_STRING_TOPIC, expectedStringCountKeyValues); streams.close(5, TimeUnit.SECONDS); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index 1a78ed3..d9602f3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -16,9 +16,6 @@ */ package org.apache.kafka.streams.integration.utils; -import kafka.api.Request; -import kafka.server.KafkaServer; -import kafka.server.MetadataCache; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -41,7 +38,6 @@ import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator; import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; -import scala.Option; import java.io.File; import java.io.IOException; @@ -50,12 +46,16 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.stream.Collectors; + +import kafka.api.Request; +import kafka.server.KafkaServer; +import kafka.server.MetadataCache; +import scala.Option; /** * Utility functions to make integration testing more convenient. @@ -364,26 +364,25 @@ public class IntegrationTestUtils { final long waitTime) throws InterruptedException { final List<KeyValue<K, V>> accumData = new ArrayList<>(); try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) { - final TestCondition valuesRead = new TestCondition() { - @Override - public boolean conditionMet() { - final List<KeyValue<K, V>> readData = - readKeyValues(topic, consumer, waitTime, expectedRecords.size()); - accumData.addAll(readData); + final TestCondition valuesRead = () -> { + final List<KeyValue<K, V>> readData = + readKeyValues(topic, consumer, waitTime, expectedRecords.size()); + accumData.addAll(readData); - final Map<K, V> finalData = new HashMap<>(); + final int accumLastIndex = accumData.size() - 1; + final int expectedLastIndex = expectedRecords.size() - 1; - for (final KeyValue<K, V> keyValue : accumData) { - finalData.put(keyValue.key, keyValue.value); - } + // filter out all intermediate records we don't want + final List<KeyValue<K, V>> accumulatedActual = accumData.stream().filter(expectedRecords::contains).collect(Collectors.toList()); - for (final KeyValue<K, V> keyValue : expectedRecords) { - if (!keyValue.value.equals(finalData.get(keyValue.key))) - return false; - } + // need this check as filtering above could have removed the last record from accumData, but it did not + // equal the last expected record + final boolean lastRecordsMatch = accumData.get(accumLastIndex).equals(expectedRecords.get(expectedLastIndex)); + + // returns true only if the remaining records in both lists are the same and in the same order + // and the last record received matches the last expected record + return accumulatedActual.equals(expectedRecords) && lastRecordsMatch; - return true; - } }; final String conditionDetails = "Did not receive all " + expectedRecords + " records from topic " + topic; TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);