This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 153db48 KAFKA-9273: Extract
testShouldAutoShutdownOnIncompleteMetadata from S… (#9108)
153db48 is described below
commit 153db488660575e8c033c3c4424211a6f567bb57
Author: albert02lowis <[email protected]>
AuthorDate: Sun Aug 16 00:27:36 2020 +0800
KAFKA-9273: Extract testShouldAutoShutdownOnIncompleteMetadata from S…
(#9108)
The main goal is to remove usage of embedded broker (EmbeddedKafkaCluster)
in AbstractJoinIntegrationTest and its subclasses.
This is because the tests under this class are no longer using the embedded
broker, except for two.
testShouldAutoShutdownOnIncompleteMetadata is one of such tests.
Furthermore, this test does not actually perfom stream-table join; it is
testing an edge case of joining with a non-existent topic, so it should be in a
separate test.
Testing strategy: run existing unit and integration test
Reviewers: Boyang Chen <[email protected]>, Bill Bejeck
<[email protected]>
---
.../integration/AbstractJoinIntegrationTest.java | 14 +--
.../integration/JoinStoreIntegrationTest.java | 114 ++++++++++++++++++++
...JoinWithIncompleteMetadataIntegrationTest.java} | 120 ++++++++-------------
.../StreamStreamJoinIntegrationTest.java | 38 -------
.../StreamTableJoinIntegrationTest.java | 32 ------
5 files changed, 158 insertions(+), 160 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
index 639dda2..d7e19c7 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
@@ -29,7 +29,6 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
-import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
@@ -37,9 +36,7 @@ import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
-import org.junit.After;
import org.junit.BeforeClass;
-import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
@@ -66,9 +63,6 @@ import static org.hamcrest.core.IsEqual.equalTo;
@Category({IntegrationTest.class})
@RunWith(value = Parameterized.class)
public abstract class AbstractJoinIntegrationTest {
- @ClassRule
- public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(1);
-
@Rule
public final TemporaryFolder testFolder = new
TemporaryFolder(TestUtils.tempDirectory());
@@ -122,26 +116,20 @@ public abstract class AbstractJoinIntegrationTest {
@BeforeClass
public static void setupConfigsAndUtils() {
+ STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"topology_driver:0000");
STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
- STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.Long().getClass());
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
COMMIT_INTERVAL);
}
void prepareEnvironment() throws InterruptedException {
- CLUSTER.createTopics(INPUT_TOPIC_LEFT, INPUT_TOPIC_RIGHT,
OUTPUT_TOPIC);
-
if (!cacheEnabled) {
STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
0);
}
STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG,
testFolder.getRoot().getPath());
}
- @After
- public void cleanup() throws InterruptedException {
- CLUSTER.deleteAllTopicsAndWait(120000);
- }
void runTestWithDriver(final List<List<TestRecord<Long, String>>>
expectedResult) {
runTestWithDriver(expectedResult, null);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
new file mode 100644
index 0000000..c519117
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StoreQueryParameters;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.StreamJoined;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import static java.time.Duration.ofMillis;
+import static org.junit.Assert.assertThrows;
+
+@Category({IntegrationTest.class})
+public class JoinStoreIntegrationTest {
+ @ClassRule
+ public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(1);
+
+ @Rule
+ public final TemporaryFolder testFolder = new
TemporaryFolder(TestUtils.tempDirectory());
+
+ private static final String APP_ID = "join-store-integration-test";
+ private static final Long COMMIT_INTERVAL = 100L;
+ static final Properties STREAMS_CONFIG = new Properties();
+ static final String INPUT_TOPIC_RIGHT = "inputTopicRight";
+ static final String INPUT_TOPIC_LEFT = "inputTopicLeft";
+ static final String OUTPUT_TOPIC = "outputTopic";
+
+ StreamsBuilder builder;
+
+ @BeforeClass
+ public static void setupConfigsAndUtils() {
+ STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
+ STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
+ STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.Long().getClass());
+ STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
+ STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
COMMIT_INTERVAL);
+ }
+
+ @Before
+ public void prepareTopology() throws InterruptedException {
+ CLUSTER.createTopics(INPUT_TOPIC_LEFT, INPUT_TOPIC_RIGHT,
OUTPUT_TOPIC);
+ STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG,
testFolder.getRoot().getPath());
+
+ builder = new StreamsBuilder();
+ }
+
+ @After
+ public void cleanup() throws InterruptedException {
+ CLUSTER.deleteAllTopicsAndWait(120000);
+ }
+
+ @Test
+ public void shouldNotAccessJoinStoresWhenGivingName() throws
InterruptedException {
+ STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID +
"-no-store-access");
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final KStream<String, Integer> left = builder.stream(INPUT_TOPIC_LEFT,
Consumed.with(Serdes.String(), Serdes.Integer()));
+ final KStream<String, Integer> right =
builder.stream(INPUT_TOPIC_RIGHT, Consumed.with(Serdes.String(),
Serdes.Integer()));
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ left.join(
+ right,
+ (value1, value2) -> value1 + value2,
+ JoinWindows.of(ofMillis(100)),
+ StreamJoined.with(Serdes.String(), Serdes.Integer(),
Serdes.Integer()).withStoreName("join-store"));
+
+ try (final KafkaStreams kafkaStreams = new
KafkaStreams(builder.build(), STREAMS_CONFIG)) {
+ kafkaStreams.setStateListener((newState, oldState) -> {
+ if (newState == KafkaStreams.State.RUNNING) {
+ latch.countDown();
+ }
+ });
+
+ kafkaStreams.start();
+ latch.await();
+ assertThrows(InvalidStateStoreException.class, () ->
kafkaStreams.store(StoreQueryParameters.fromNameAndType("join-store",
QueryableStoreTypes.keyValueStore())));
+ }
+ }
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
similarity index 52%
copy from
streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
copy to
streams/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
index 66f0a04..177fa43 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
@@ -16,57 +16,78 @@
*/
package org.apache.kafka.streams.integration;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreamsWrapper;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.test.TestRecord;
+import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
+import org.junit.After;
import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
+import org.junit.rules.TemporaryFolder;
import static org.junit.Assert.assertTrue;
-/**
- * Tests all available joins of Kafka Streams DSL.
- */
@Category({IntegrationTest.class})
-@RunWith(value = Parameterized.class)
-public class StreamTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
- private KStream<Long, String> leftStream;
+public class JoinWithIncompleteMetadataIntegrationTest {
+ @ClassRule
+ public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(1);
+
+ @Rule
+ public final TemporaryFolder testFolder = new
TemporaryFolder(TestUtils.tempDirectory());
+
+ private static final String APP_ID =
"join-incomplete-metadata-integration-test";
+ private static final Long COMMIT_INTERVAL = 100L;
+ static final Properties STREAMS_CONFIG = new Properties();
+ static final String INPUT_TOPIC_RIGHT = "inputTopicRight";
+ static final String NON_EXISTENT_INPUT_TOPIC_LEFT =
"inputTopicLeft-not-exist";
+ static final String OUTPUT_TOPIC = "outputTopic";
+
+ StreamsBuilder builder;
+ final ValueJoiner<String, String, String> valueJoiner = (value1, value2)
-> value1 + "-" + value2;
private KTable<Long, String> rightTable;
- public StreamTableJoinIntegrationTest(final boolean cacheEnabled) {
- super(cacheEnabled);
+ @BeforeClass
+ public static void setupConfigsAndUtils() {
+ STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
+ STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
+ STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.Long().getClass());
+ STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
+ STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
COMMIT_INTERVAL);
}
@Before
public void prepareTopology() throws InterruptedException {
- super.prepareEnvironment();
-
- appID = "stream-table-join-integration-test";
+ CLUSTER.createTopics(INPUT_TOPIC_RIGHT, OUTPUT_TOPIC);
+ STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG,
testFolder.getRoot().getPath());
builder = new StreamsBuilder();
rightTable = builder.table(INPUT_TOPIC_RIGHT);
- leftStream = builder.stream(INPUT_TOPIC_LEFT);
+ }
+
+ @After
+ public void cleanup() throws InterruptedException {
+ CLUSTER.deleteAllTopicsAndWait(120000);
}
@Test
- public void testShouldAutoShutdownOnIncompleteMetadata() throws
InterruptedException {
- STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID +
"-incomplete");
+ public void testShouldAutoShutdownOnJoinWithIncompleteMetadata() throws
InterruptedException {
+ STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
- final KStream<Long, String> notExistStream =
builder.stream(INPUT_TOPIC_LEFT + "-not-existed");
+ final KStream<Long, String> notExistStream =
builder.stream(NON_EXISTENT_INPUT_TOPIC_LEFT);
final KTable<Long, String> aggregatedTable =
notExistStream.leftJoin(rightTable, valueJoiner)
.groupBy((key, value) -> key)
@@ -85,59 +106,4 @@ public class StreamTableJoinIntegrationTest extends
AbstractJoinIntegrationTest
streams.close();
assertTrue(listener.transitToPendingShutdownSeen());
}
-
- @Test
- public void testInner() {
- STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID +
"-inner");
- STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"topology_driver:0000");
-
- final List<List<TestRecord<Long, String>>> expectedResult =
Arrays.asList(
- null,
- null,
- null,
- null,
- Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a",
null, 5L)),
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d",
null, 15L))
- );
-
- leftStream.join(rightTable, valueJoiner).to(OUTPUT_TOPIC);
- runTestWithDriver(expectedResult);
- }
-
- @Test
- public void testLeft() {
- STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID +
"-left");
- STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"topology_driver:0000");
-
- final List<List<TestRecord<Long, String>>> expectedResult =
Arrays.asList(
- null,
- null,
- Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY,
"A-null", null, 3L)),
- null,
- Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a",
null, 5L)),
- null,
- null,
- null,
- Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY,
"C-null", null, 9L)),
- null,
- null,
- null,
- null,
- null,
- Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d",
null, 15L))
- );
-
- leftStream.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC);
-
- runTestWithDriver(expectedResult);
- }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
index c503a98..67db9bd 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
@@ -16,17 +16,10 @@
*/
package org.apache.kafka.streams.integration;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.errors.InvalidStateStoreException;
-import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.StreamJoined;
-import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMapper;
@@ -40,11 +33,8 @@ import org.junit.runners.Parameterized;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import static java.time.Duration.ofMillis;
import static java.time.Duration.ofSeconds;
-import static org.junit.Assert.assertThrows;
/**
* Tests all available joins of Kafka Streams DSL.
@@ -71,34 +61,6 @@ public class StreamStreamJoinIntegrationTest extends
AbstractJoinIntegrationTest
}
@Test
- public void shouldNotAccessJoinStoresWhenGivingName() throws
InterruptedException {
- STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID +
"-no-store-access");
- final StreamsBuilder builder = new StreamsBuilder();
-
- final KStream<String, Integer> left = builder.stream(INPUT_TOPIC_LEFT,
Consumed.with(Serdes.String(), Serdes.Integer()));
- final KStream<String, Integer> right =
builder.stream(INPUT_TOPIC_RIGHT, Consumed.with(Serdes.String(),
Serdes.Integer()));
- final CountDownLatch latch = new CountDownLatch(1);
-
- left.join(
- right,
- (value1, value2) -> value1 + value2,
- JoinWindows.of(ofMillis(100)),
- StreamJoined.with(Serdes.String(), Serdes.Integer(),
Serdes.Integer()).withStoreName("join-store"));
-
- try (final KafkaStreams kafkaStreams = new
KafkaStreams(builder.build(), STREAMS_CONFIG)) {
- kafkaStreams.setStateListener((newState, oldState) -> {
- if (newState == KafkaStreams.State.RUNNING) {
- latch.countDown();
- }
- });
-
- kafkaStreams.start();
- latch.await();
- assertThrows(InvalidStateStoreException.class, () ->
kafkaStreams.store(StoreQueryParameters.fromNameAndType("join-store",
QueryableStoreTypes.keyValueStore())));
- }
- }
-
- @Test
public void testInner() {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID +
"-inner");
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
index 66f0a04..0f7e8aa 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
@@ -16,15 +16,12 @@
*/
package org.apache.kafka.streams.integration;
-import org.apache.kafka.streams.KafkaStreamsWrapper;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -35,8 +32,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import static org.junit.Assert.assertTrue;
-
/**
* Tests all available joins of Kafka Streams DSL.
*/
@@ -62,34 +57,8 @@ public class StreamTableJoinIntegrationTest extends
AbstractJoinIntegrationTest
}
@Test
- public void testShouldAutoShutdownOnIncompleteMetadata() throws
InterruptedException {
- STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID +
"-incomplete");
- STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
-
- final KStream<Long, String> notExistStream =
builder.stream(INPUT_TOPIC_LEFT + "-not-existed");
-
- final KTable<Long, String> aggregatedTable =
notExistStream.leftJoin(rightTable, valueJoiner)
- .groupBy((key, value) -> key)
- .reduce((value1, value2) -> value1 + value2);
-
- // Write the (continuously updating) results to the output topic.
- aggregatedTable.toStream().to(OUTPUT_TOPIC);
-
- final KafkaStreamsWrapper streams = new
KafkaStreamsWrapper(builder.build(), STREAMS_CONFIG);
- final IntegrationTestUtils.StateListenerStub listener = new
IntegrationTestUtils.StateListenerStub();
- streams.setStreamThreadStateListener(listener);
- streams.start();
-
- TestUtils.waitForCondition(listener::transitToPendingShutdownSeen,
"Did not seen thread state transited to PENDING_SHUTDOWN");
-
- streams.close();
- assertTrue(listener.transitToPendingShutdownSeen());
- }
-
- @Test
public void testInner() {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID +
"-inner");
- STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"topology_driver:0000");
final List<List<TestRecord<Long, String>>> expectedResult =
Arrays.asList(
null,
@@ -116,7 +85,6 @@ public class StreamTableJoinIntegrationTest extends
AbstractJoinIntegrationTest
@Test
public void testLeft() {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID +
"-left");
- STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"topology_driver:0000");
final List<List<TestRecord<Long, String>>> expectedResult =
Arrays.asList(
null,