This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7c4e672  MINOR: Update test to wait for final value to reduce 
flakiness updated test method for multiple keys (#5517)
7c4e672 is described below

commit 7c4e6724699bb6fc65112b5513848c733a03019e
Author: Bill Bejeck <bbej...@gmail.com>
AuthorDate: Fri Aug 17 15:14:29 2018 -0400

    MINOR: Update test to wait for final value to reduce flakiness updated test 
method for multiple keys (#5517)
    
    Updated two integration tests to use 
IntegrationTestUtils#waitUntilFinalKeyValueRecordsReceived to eliminate flaky 
test results.
    
    Also, I updated IntegrationTestUtils#waitUntilFinalKeyValueRecordsReceived 
method to support having results with the same key present with different 
values.
    
    For testing, I ran the current suite of streams tests.
    
    Reviewers: Matthias J. Sax <matth...@confluent.io>, Guozhang Wang 
<wangg...@gmail.com>
---
 .../RepartitionOptimizingIntegrationTest.java      | 13 ++-----
 ...artitionWithMergeOptimizingIntegrationTest.java |  9 +----
 .../integration/utils/IntegrationTestUtils.java    | 43 +++++++++++-----------
 3 files changed, 27 insertions(+), 38 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java
index e192c70..5eebf04 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java
@@ -195,25 +195,20 @@ public class RepartitionOptimizingIntegrationTest {
         streams.start();
 
         final List<KeyValue<String, Long>> expectedCountKeyValues = 
Arrays.asList(KeyValue.pair("A", 3L), KeyValue.pair("B", 3L), 
KeyValue.pair("C", 3L));
-        final List<KeyValue<String, Long>> receivedCountKeyValues = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig1, 
COUNT_TOPIC, expectedCountKeyValues.size());
+        
IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig1, 
COUNT_TOPIC, expectedCountKeyValues);
 
         final List<KeyValue<String, Integer>> expectedAggKeyValues = 
Arrays.asList(KeyValue.pair("A", 9), KeyValue.pair("B", 9), KeyValue.pair("C", 
9));
-        final List<KeyValue<String, Integer>> receivedAggKeyValues = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig2, 
AGGREGATION_TOPIC, expectedAggKeyValues.size());
+        
IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig2, 
AGGREGATION_TOPIC, expectedAggKeyValues);
 
         final List<KeyValue<String, String>> expectedReduceKeyValues = 
Arrays.asList(KeyValue.pair("A", "foo:bar:baz"), KeyValue.pair("B", 
"foo:bar:baz"), KeyValue.pair("C", "foo:bar:baz"));
-        final List<KeyValue<String, Integer>> receivedReduceKeyValues = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig3, 
REDUCE_TOPIC, expectedAggKeyValues.size());
+        
IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig3, 
REDUCE_TOPIC, expectedReduceKeyValues);
 
         final List<KeyValue<String, String>> expectedJoinKeyValues = 
Arrays.asList(KeyValue.pair("A", "foo:3"), KeyValue.pair("A", "bar:3"), 
KeyValue.pair("A", "baz:3"));
-        final List<KeyValue<String, Integer>> receivedJoinKeyValues = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig3, 
JOINED_TOPIC, expectedJoinKeyValues.size());
+        
IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig3, 
JOINED_TOPIC, expectedJoinKeyValues);
 
 
         final List<String> expectedCollectedProcessorValues = 
Arrays.asList("FOO", "BAR", "BAZ");
 
-        assertThat(receivedCountKeyValues, equalTo(expectedCountKeyValues));
-        assertThat(receivedAggKeyValues, equalTo(expectedAggKeyValues));
-        assertThat(receivedReduceKeyValues, equalTo(expectedReduceKeyValues));
-        assertThat(receivedJoinKeyValues, equalTo(expectedJoinKeyValues));
-
         assertThat(3, equalTo(processorValueCollector.size()));
         assertThat(processorValueCollector, 
equalTo(expectedCollectedProcessorValues));
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java
index 58d903a..af1f5f1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java
@@ -51,9 +51,7 @@ import java.util.regex.Pattern;
 
 import kafka.utils.MockTime;
 
-import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
 
 @Category({IntegrationTest.class})
 public class RepartitionWithMergeOptimizingIntegrationTest {
@@ -165,13 +163,10 @@ public class 
RepartitionWithMergeOptimizingIntegrationTest {
         streams.start();
 
         final List<KeyValue<String, Long>> expectedCountKeyValues = 
Arrays.asList(KeyValue.pair("A", 6L), KeyValue.pair("B", 6L), 
KeyValue.pair("C", 6L));
-        final List<KeyValue<String, Long>> receivedCountKeyValues = 
IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig1, 
COUNT_TOPIC, expectedCountKeyValues);
+        
IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig1, 
COUNT_TOPIC, expectedCountKeyValues);
 
         final List<KeyValue<String, String>> expectedStringCountKeyValues = 
Arrays.asList(KeyValue.pair("A", "6"), KeyValue.pair("B", "6"), 
KeyValue.pair("C", "6"));
-        final List<KeyValue<String, String>> receivedCountStringKeyValues = 
IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig2, 
COUNT_STRING_TOPIC, expectedStringCountKeyValues);
-
-        assertThat(receivedCountKeyValues, equalTo(expectedCountKeyValues));
-        assertThat(receivedCountStringKeyValues, 
equalTo(expectedStringCountKeyValues));
+        
IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig2, 
COUNT_STRING_TOPIC, expectedStringCountKeyValues);
 
         streams.close(5, TimeUnit.SECONDS);
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 1a78ed3..d9602f3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -16,9 +16,6 @@
  */
 package org.apache.kafka.streams.integration.utils;
 
-import kafka.api.Request;
-import kafka.server.KafkaServer;
-import kafka.server.MetadataCache;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -41,7 +38,6 @@ import 
org.apache.kafka.streams.processor.internals.StreamThread;
 import 
org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
-import scala.Option;
 
 import java.io.File;
 import java.io.IOException;
@@ -50,12 +46,16 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+import kafka.api.Request;
+import kafka.server.KafkaServer;
+import kafka.server.MetadataCache;
+import scala.Option;
 
 /**
  * Utility functions to make integration testing more convenient.
@@ -364,26 +364,25 @@ public class IntegrationTestUtils {
                                                                                
     final long waitTime) throws InterruptedException {
         final List<KeyValue<K, V>> accumData = new ArrayList<>();
         try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
-            final TestCondition valuesRead = new TestCondition() {
-                @Override
-                public boolean conditionMet() {
-                    final List<KeyValue<K, V>> readData =
-                            readKeyValues(topic, consumer, waitTime, 
expectedRecords.size());
-                    accumData.addAll(readData);
+            final TestCondition valuesRead = () -> {
+                final List<KeyValue<K, V>> readData =
+                        readKeyValues(topic, consumer, waitTime, 
expectedRecords.size());
+                accumData.addAll(readData);
 
-                    final Map<K, V> finalData = new HashMap<>();
+                final int accumLastIndex = accumData.size() - 1;
+                final int expectedLastIndex = expectedRecords.size() - 1;
 
-                    for (final KeyValue<K, V> keyValue : accumData) {
-                        finalData.put(keyValue.key, keyValue.value);
-                    }
+                // filter out all intermediate records we don't want
+                final List<KeyValue<K, V>> accumulatedActual = 
accumData.stream().filter(expectedRecords::contains).collect(Collectors.toList());
 
-                    for (final KeyValue<K, V> keyValue : expectedRecords) {
-                        if 
(!keyValue.value.equals(finalData.get(keyValue.key)))
-                            return false;
-                    }
+                // need this check as filtering above could have removed the 
last record from accumData, but it did not
+                // equal the last expected record
+                final boolean lastRecordsMatch = 
accumData.get(accumLastIndex).equals(expectedRecords.get(expectedLastIndex));
+
+                // returns true only if the remaining records in both lists 
are the same and in the same order
+                // and the last record received matches the last expected 
record
+                return accumulatedActual.equals(expectedRecords) && 
lastRecordsMatch;
 
-                    return true;
-                }
             };
             final String conditionDetails = "Did not receive all " + 
expectedRecords + " records from topic " + topic;
             TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);

Reply via email to