This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new ff12de5  MINOR: Updated StreamTableJoinIntegrationTest to use TTD 
(#7722)
ff12de5 is described below

commit ff12de576c64596b29320fe4998e1444e4b8d48b
Author: Bill Bejeck <[email protected]>
AuthorDate: Fri Nov 22 16:25:57 2019 -0500

    MINOR: Updated StreamTableJoinIntegrationTest to use TTD (#7722)
    
    Convert StreamTableJoinIntegrationTest to use the ToplogyTestDriver to 
eliminate flakiness and speed up the build.
    
    Reviewers: John Roesler <[email protected]>
---
 .../integration/AbstractJoinIntegrationTest.java   | 45 ++++++++++++++++++++++
 .../StreamTableJoinIntegrationTest.java            | 30 ++++++++-------
 2 files changed, 61 insertions(+), 14 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
index 6660a20..e7b87a2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
@@ -30,6 +30,9 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.ValueJoiner;
@@ -37,6 +40,7 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.test.TestRecord;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -52,15 +56,18 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static 
org.apache.kafka.test.StreamsTestUtils.startKafkaStreamsAndWaitForRunningState;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
 
 /**
  * Tests all available joins of Kafka Streams DSL.
@@ -186,6 +193,44 @@ public abstract class AbstractJoinIntegrationTest {
     }
 
 
+    void runTestWithDriver(final List<List<TestRecord<Long, String>>> 
expectedResult) {
+        runTestWithDriver(expectedResult, null);
+    }
+
+    void runTestWithDriver(final List<List<TestRecord<Long, String>>> 
expectedResult, final String storeName) {
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(STREAMS_CONFIG), STREAMS_CONFIG)) {
+            final TestInputTopic<Long, String> right = 
driver.createInputTopic(INPUT_TOPIC_RIGHT, new LongSerializer(), new 
StringSerializer());
+            final TestInputTopic<Long, String> left = 
driver.createInputTopic(INPUT_TOPIC_LEFT, new LongSerializer(), new 
StringSerializer());
+            final TestOutputTopic<Long, String> outputTopic = 
driver.createOutputTopic(OUTPUT_TOPIC, new LongDeserializer(), new 
StringDeserializer());
+            final Map<String, TestInputTopic<Long, String>> testInputTopicMap 
= new HashMap<>();
+
+            testInputTopicMap.put(INPUT_TOPIC_RIGHT, right);
+            testInputTopicMap.put(INPUT_TOPIC_LEFT, left);
+
+            TestRecord<Long, String> expectedFinalResult = null;
+
+            final long firstTimestamp = System.currentTimeMillis();
+            long ts = firstTimestamp;
+            final Iterator<List<TestRecord<Long, String>>> resultIterator = 
expectedResult.iterator();
+            for (final Input<String> singleInputRecord : input) {
+                
testInputTopicMap.get(singleInputRecord.topic).pipeInput(singleInputRecord.record.key,
 singleInputRecord.record.value, ++ts);
+
+                final List<TestRecord<Long, String>> expected = 
resultIterator.next();
+                if (expected != null) {
+                    final List<TestRecord<Long, String>> updatedExpected = new 
LinkedList<>();
+                    for (final TestRecord<Long, String> record : expected) {
+                        updatedExpected.add(new TestRecord<>(record.key(), 
record.value(), null, firstTimestamp + record.timestamp()));
+                    }
+
+                    final List<TestRecord<Long, String>> output = 
outputTopic.readRecordsToList();
+                    assertEquals(output, updatedExpected);
+                    expectedFinalResult = updatedExpected.get(expected.size() 
- 1);
+                }
+            }
+        }
+    }
+
+
     /*
      * Runs the actual test. Checks the result after each input record to 
ensure fixed processing order.
      * If an input tuple does not trigger any result, "expectedResult" should 
contain a "null" entry
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
index 4ddab57..66f0a04 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
@@ -17,12 +17,12 @@
 package org.apache.kafka.streams.integration;
 
 import org.apache.kafka.streams.KafkaStreamsWrapper;
-import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.test.TestRecord;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
@@ -64,6 +64,7 @@ public class StreamTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest
     @Test
     public void testShouldAutoShutdownOnIncompleteMetadata() throws 
InterruptedException {
         STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-incomplete");
+        STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
 
         final KStream<Long, String> notExistStream = 
builder.stream(INPUT_TOPIC_LEFT + "-not-existed");
 
@@ -86,15 +87,16 @@ public class StreamTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest
     }
 
     @Test
-    public void testInner() throws Exception {
+    public void testInner() {
         STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-inner");
+        STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"topology_driver:0000");
 
-        final List<List<KeyValueTimestamp<Long, String>>> expectedResult = 
Arrays.asList(
+        final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
             null,
             null,
             null,
             null,
-            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, 
"B-a", 5L)),
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", 
null, 5L)),
             null,
             null,
             null,
@@ -104,38 +106,38 @@ public class StreamTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest
             null,
             null,
             null,
-            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, 
"D-d", 15L))
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", 
null,  15L))
         );
 
         leftStream.join(rightTable, valueJoiner).to(OUTPUT_TOPIC);
-
-        runTest(expectedResult);
+        runTestWithDriver(expectedResult);
     }
 
     @Test
-    public void testLeft() throws Exception {
+    public void testLeft() {
         STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-left");
+        STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"topology_driver:0000");
 
-        final List<List<KeyValueTimestamp<Long, String>>> expectedResult = 
Arrays.asList(
+        final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
             null,
             null,
-            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, 
"A-null", 3L)),
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"A-null", null, 3L)),
             null,
-            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, 
"B-a", 5L)),
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", 
null, 5L)),
             null,
             null,
             null,
-            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, 
"C-null", 9L)),
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"C-null", null, 9L)),
             null,
             null,
             null,
             null,
             null,
-            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, 
"D-d", 15L))
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", 
null, 15L))
         );
 
         leftStream.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC);
 
-        runTest(expectedResult);
+        runTestWithDriver(expectedResult);
     }
 }

Reply via email to