This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new ff12de5 MINOR: Updated StreamTableJoinIntegrationTest to use TTD
(#7722)
ff12de5 is described below
commit ff12de576c64596b29320fe4998e1444e4b8d48b
Author: Bill Bejeck <[email protected]>
AuthorDate: Fri Nov 22 16:25:57 2019 -0500
MINOR: Updated StreamTableJoinIntegrationTest to use TTD (#7722)
Convert StreamTableJoinIntegrationTest to use the ToplogyTestDriver to
eliminate flakiness and speed up the build.
Reviewers: John Roesler <[email protected]>
---
.../integration/AbstractJoinIntegrationTest.java | 45 ++++++++++++++++++++++
.../StreamTableJoinIntegrationTest.java | 30 ++++++++-------
2 files changed, 61 insertions(+), 14 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
index 6660a20..e7b87a2 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
@@ -30,6 +30,9 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.ValueJoiner;
@@ -37,6 +40,7 @@ import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
@@ -52,15 +56,18 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import static
org.apache.kafka.test.StreamsTestUtils.startKafkaStreamsAndWaitForRunningState;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
/**
* Tests all available joins of Kafka Streams DSL.
@@ -186,6 +193,44 @@ public abstract class AbstractJoinIntegrationTest {
}
+ void runTestWithDriver(final List<List<TestRecord<Long, String>>>
expectedResult) {
+ runTestWithDriver(expectedResult, null);
+ }
+
+ void runTestWithDriver(final List<List<TestRecord<Long, String>>>
expectedResult, final String storeName) {
+ try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(STREAMS_CONFIG), STREAMS_CONFIG)) {
+ final TestInputTopic<Long, String> right =
driver.createInputTopic(INPUT_TOPIC_RIGHT, new LongSerializer(), new
StringSerializer());
+ final TestInputTopic<Long, String> left =
driver.createInputTopic(INPUT_TOPIC_LEFT, new LongSerializer(), new
StringSerializer());
+ final TestOutputTopic<Long, String> outputTopic =
driver.createOutputTopic(OUTPUT_TOPIC, new LongDeserializer(), new
StringDeserializer());
+ final Map<String, TestInputTopic<Long, String>> testInputTopicMap
= new HashMap<>();
+
+ testInputTopicMap.put(INPUT_TOPIC_RIGHT, right);
+ testInputTopicMap.put(INPUT_TOPIC_LEFT, left);
+
+ TestRecord<Long, String> expectedFinalResult = null;
+
+ final long firstTimestamp = System.currentTimeMillis();
+ long ts = firstTimestamp;
+ final Iterator<List<TestRecord<Long, String>>> resultIterator =
expectedResult.iterator();
+ for (final Input<String> singleInputRecord : input) {
+
testInputTopicMap.get(singleInputRecord.topic).pipeInput(singleInputRecord.record.key,
singleInputRecord.record.value, ++ts);
+
+ final List<TestRecord<Long, String>> expected =
resultIterator.next();
+ if (expected != null) {
+ final List<TestRecord<Long, String>> updatedExpected = new
LinkedList<>();
+ for (final TestRecord<Long, String> record : expected) {
+ updatedExpected.add(new TestRecord<>(record.key(),
record.value(), null, firstTimestamp + record.timestamp()));
+ }
+
+ final List<TestRecord<Long, String>> output =
outputTopic.readRecordsToList();
+ assertEquals(output, updatedExpected);
+ expectedFinalResult = updatedExpected.get(expected.size()
- 1);
+ }
+ }
+ }
+ }
+
+
/*
* Runs the actual test. Checks the result after each input record to
ensure fixed processing order.
* If an input tuple does not trigger any result, "expectedResult" should
contain a "null" entry
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
index 4ddab57..66f0a04 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
@@ -17,12 +17,12 @@
package org.apache.kafka.streams.integration;
import org.apache.kafka.streams.KafkaStreamsWrapper;
-import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
@@ -64,6 +64,7 @@ public class StreamTableJoinIntegrationTest extends
AbstractJoinIntegrationTest
@Test
public void testShouldAutoShutdownOnIncompleteMetadata() throws
InterruptedException {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID +
"-incomplete");
+ STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
final KStream<Long, String> notExistStream =
builder.stream(INPUT_TOPIC_LEFT + "-not-existed");
@@ -86,15 +87,16 @@ public class StreamTableJoinIntegrationTest extends
AbstractJoinIntegrationTest
}
@Test
- public void testInner() throws Exception {
+ public void testInner() {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID +
"-inner");
+ STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"topology_driver:0000");
- final List<List<KeyValueTimestamp<Long, String>>> expectedResult =
Arrays.asList(
+ final List<List<TestRecord<Long, String>>> expectedResult =
Arrays.asList(
null,
null,
null,
null,
- Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY,
"B-a", 5L)),
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a",
null, 5L)),
null,
null,
null,
@@ -104,38 +106,38 @@ public class StreamTableJoinIntegrationTest extends
AbstractJoinIntegrationTest
null,
null,
null,
- Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY,
"D-d", 15L))
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d",
null, 15L))
);
leftStream.join(rightTable, valueJoiner).to(OUTPUT_TOPIC);
-
- runTest(expectedResult);
+ runTestWithDriver(expectedResult);
}
@Test
- public void testLeft() throws Exception {
+ public void testLeft() {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID +
"-left");
+ STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"topology_driver:0000");
- final List<List<KeyValueTimestamp<Long, String>>> expectedResult =
Arrays.asList(
+ final List<List<TestRecord<Long, String>>> expectedResult =
Arrays.asList(
null,
null,
- Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY,
"A-null", 3L)),
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY,
"A-null", null, 3L)),
null,
- Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY,
"B-a", 5L)),
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a",
null, 5L)),
null,
null,
null,
- Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY,
"C-null", 9L)),
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY,
"C-null", null, 9L)),
null,
null,
null,
null,
null,
- Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY,
"D-d", 15L))
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d",
null, 15L))
);
leftStream.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC);
- runTest(expectedResult);
+ runTestWithDriver(expectedResult);
}
}