This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 153db48  KAFKA-9273: Extract 
testShouldAutoShutdownOnIncompleteMetadata from S… (#9108)
153db48 is described below

commit 153db488660575e8c033c3c4424211a6f567bb57
Author: albert02lowis <[email protected]>
AuthorDate: Sun Aug 16 00:27:36 2020 +0800

    KAFKA-9273: Extract testShouldAutoShutdownOnIncompleteMetadata from S… 
(#9108)
    
    The main goal is to remove usage of embedded broker (EmbeddedKafkaCluster) 
in AbstractJoinIntegrationTest and its subclasses.
    This is because the tests under this class are no longer using the embedded 
broker, except for two.
    testShouldAutoShutdownOnIncompleteMetadata is one of such tests.
    Furthermore, this test does not actually perfom stream-table join; it is 
testing an edge case of joining with a non-existent topic, so it should be in a 
separate test.
    
    Testing strategy: run existing unit and integration test
    
    Reviewers: Boyang Chen <[email protected]>, Bill Bejeck 
<[email protected]>
---
 .../integration/AbstractJoinIntegrationTest.java   |  14 +--
 .../integration/JoinStoreIntegrationTest.java      | 114 ++++++++++++++++++++
 ...JoinWithIncompleteMetadataIntegrationTest.java} | 120 ++++++++-------------
 .../StreamStreamJoinIntegrationTest.java           |  38 -------
 .../StreamTableJoinIntegrationTest.java            |  32 ------
 5 files changed, 158 insertions(+), 160 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
index 639dda2..d7e19c7 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
@@ -29,7 +29,6 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.TestOutputTopic;
 import org.apache.kafka.streams.TopologyTestDriver;
-import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
@@ -37,9 +36,7 @@ import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.test.TestRecord;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
 import org.junit.BeforeClass;
-import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TemporaryFolder;
@@ -66,9 +63,6 @@ import static org.hamcrest.core.IsEqual.equalTo;
 @Category({IntegrationTest.class})
 @RunWith(value = Parameterized.class)
 public abstract class AbstractJoinIntegrationTest {
-    @ClassRule
-    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
-
     @Rule
     public final TemporaryFolder testFolder = new 
TemporaryFolder(TestUtils.tempDirectory());
 
@@ -122,26 +116,20 @@ public abstract class AbstractJoinIntegrationTest {
     @BeforeClass
     public static void setupConfigsAndUtils() {
 
+        STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"topology_driver:0000");
         STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
-        STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass());
         STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
         STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
COMMIT_INTERVAL);
     }
 
     void prepareEnvironment() throws InterruptedException {
-        CLUSTER.createTopics(INPUT_TOPIC_LEFT, INPUT_TOPIC_RIGHT, 
OUTPUT_TOPIC);
-
         if (!cacheEnabled) {
             STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
0);
         }
 
         STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, 
testFolder.getRoot().getPath());
     }
-    @After
-    public void cleanup() throws InterruptedException {
-        CLUSTER.deleteAllTopicsAndWait(120000);
-    }
 
     void runTestWithDriver(final List<List<TestRecord<Long, String>>> 
expectedResult) {
         runTestWithDriver(expectedResult, null);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
new file mode 100644
index 0000000..c519117
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StoreQueryParameters;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.StreamJoined;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import static java.time.Duration.ofMillis;
+import static org.junit.Assert.assertThrows;
+
+@Category({IntegrationTest.class})
+public class JoinStoreIntegrationTest {
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+
+    @Rule
+    public final TemporaryFolder testFolder = new 
TemporaryFolder(TestUtils.tempDirectory());
+
+    private static final String APP_ID = "join-store-integration-test";
+    private static final Long COMMIT_INTERVAL = 100L;
+    static final Properties STREAMS_CONFIG = new Properties();
+    static final String INPUT_TOPIC_RIGHT = "inputTopicRight";
+    static final String INPUT_TOPIC_LEFT = "inputTopicLeft";
+    static final String OUTPUT_TOPIC = "outputTopic";
+
+    StreamsBuilder builder;
+
+    @BeforeClass
+    public static void setupConfigsAndUtils() {
+        STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass());
+        STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
COMMIT_INTERVAL);
+    }
+
+    @Before
+    public void prepareTopology() throws InterruptedException {
+        CLUSTER.createTopics(INPUT_TOPIC_LEFT, INPUT_TOPIC_RIGHT, 
OUTPUT_TOPIC);
+        STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, 
testFolder.getRoot().getPath());
+
+        builder = new StreamsBuilder();
+    }
+
+    @After
+    public void cleanup() throws InterruptedException {
+        CLUSTER.deleteAllTopicsAndWait(120000);
+    }
+
+    @Test
+    public void shouldNotAccessJoinStoresWhenGivingName() throws 
InterruptedException {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-no-store-access");
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<String, Integer> left = builder.stream(INPUT_TOPIC_LEFT, 
Consumed.with(Serdes.String(), Serdes.Integer()));
+        final KStream<String, Integer> right = 
builder.stream(INPUT_TOPIC_RIGHT, Consumed.with(Serdes.String(), 
Serdes.Integer()));
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        left.join(
+            right,
+            (value1, value2) -> value1 + value2,
+            JoinWindows.of(ofMillis(100)),
+            StreamJoined.with(Serdes.String(), Serdes.Integer(), 
Serdes.Integer()).withStoreName("join-store"));
+
+        try (final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(), STREAMS_CONFIG)) {
+            kafkaStreams.setStateListener((newState, oldState) -> {
+                if (newState == KafkaStreams.State.RUNNING) {
+                    latch.countDown();
+                }
+            });
+
+            kafkaStreams.start();
+            latch.await();
+            assertThrows(InvalidStateStoreException.class, () -> 
kafkaStreams.store(StoreQueryParameters.fromNameAndType("join-store", 
QueryableStoreTypes.keyValueStore())));
+        }
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
similarity index 52%
copy from 
streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
copy to 
streams/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
index 66f0a04..177fa43 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
@@ -16,57 +16,78 @@
  */
 package org.apache.kafka.streams.integration;
 
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KafkaStreamsWrapper;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.test.TestRecord;
+import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
+import org.junit.After;
 import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
+import org.junit.rules.TemporaryFolder;
 
 import static org.junit.Assert.assertTrue;
 
-/**
- * Tests all available joins of Kafka Streams DSL.
- */
 @Category({IntegrationTest.class})
-@RunWith(value = Parameterized.class)
-public class StreamTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
-    private KStream<Long, String> leftStream;
+public class JoinWithIncompleteMetadataIntegrationTest {
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+
+    @Rule
+    public final TemporaryFolder testFolder = new 
TemporaryFolder(TestUtils.tempDirectory());
+
+    private static final String APP_ID = 
"join-incomplete-metadata-integration-test";
+    private static final Long COMMIT_INTERVAL = 100L;
+    static final Properties STREAMS_CONFIG = new Properties();
+    static final String INPUT_TOPIC_RIGHT = "inputTopicRight";
+    static final String NON_EXISTENT_INPUT_TOPIC_LEFT = 
"inputTopicLeft-not-exist";
+    static final String OUTPUT_TOPIC = "outputTopic";
+
+    StreamsBuilder builder;
+    final ValueJoiner<String, String, String> valueJoiner = (value1, value2) 
-> value1 + "-" + value2;
     private KTable<Long, String> rightTable;
 
-    public StreamTableJoinIntegrationTest(final boolean cacheEnabled) {
-        super(cacheEnabled);
+    @BeforeClass
+    public static void setupConfigsAndUtils() {
+        STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass());
+        STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
COMMIT_INTERVAL);
     }
 
     @Before
     public void prepareTopology() throws InterruptedException {
-        super.prepareEnvironment();
-
-        appID = "stream-table-join-integration-test";
+        CLUSTER.createTopics(INPUT_TOPIC_RIGHT, OUTPUT_TOPIC);
+        STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, 
testFolder.getRoot().getPath());
 
         builder = new StreamsBuilder();
         rightTable = builder.table(INPUT_TOPIC_RIGHT);
-        leftStream = builder.stream(INPUT_TOPIC_LEFT);
+    }
+
+    @After
+    public void cleanup() throws InterruptedException {
+        CLUSTER.deleteAllTopicsAndWait(120000);
     }
 
     @Test
-    public void testShouldAutoShutdownOnIncompleteMetadata() throws 
InterruptedException {
-        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-incomplete");
+    public void testShouldAutoShutdownOnJoinWithIncompleteMetadata() throws 
InterruptedException {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
         STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
 
-        final KStream<Long, String> notExistStream = 
builder.stream(INPUT_TOPIC_LEFT + "-not-existed");
+        final KStream<Long, String> notExistStream = 
builder.stream(NON_EXISTENT_INPUT_TOPIC_LEFT);
 
         final KTable<Long, String> aggregatedTable = 
notExistStream.leftJoin(rightTable, valueJoiner)
                 .groupBy((key, value) -> key)
@@ -85,59 +106,4 @@ public class StreamTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest
         streams.close();
         assertTrue(listener.transitToPendingShutdownSeen());
     }
-
-    @Test
-    public void testInner() {
-        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-inner");
-        STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"topology_driver:0000");
-
-        final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
-            null,
-            null,
-            null,
-            null,
-            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", 
null, 5L)),
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", 
null,  15L))
-        );
-
-        leftStream.join(rightTable, valueJoiner).to(OUTPUT_TOPIC);
-        runTestWithDriver(expectedResult);
-    }
-
-    @Test
-    public void testLeft() {
-        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-left");
-        STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"topology_driver:0000");
-
-        final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
-            null,
-            null,
-            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"A-null", null, 3L)),
-            null,
-            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", 
null, 5L)),
-            null,
-            null,
-            null,
-            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"C-null", null, 9L)),
-            null,
-            null,
-            null,
-            null,
-            null,
-            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", 
null, 15L))
-        );
-
-        leftStream.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC);
-
-        runTestWithDriver(expectedResult);
-    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
index c503a98..67db9bd 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
@@ -16,17 +16,10 @@
  */
 package org.apache.kafka.streams.integration;
 
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.errors.InvalidStateStoreException;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.StreamJoined;
-import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.test.TestRecord;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockMapper;
@@ -40,11 +33,8 @@ import org.junit.runners.Parameterized;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.CountDownLatch;
 
-import static java.time.Duration.ofMillis;
 import static java.time.Duration.ofSeconds;
-import static org.junit.Assert.assertThrows;
 
 /**
  * Tests all available joins of Kafka Streams DSL.
@@ -71,34 +61,6 @@ public class StreamStreamJoinIntegrationTest extends 
AbstractJoinIntegrationTest
     }
 
     @Test
-    public void shouldNotAccessJoinStoresWhenGivingName() throws 
InterruptedException {
-        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-no-store-access");
-        final StreamsBuilder builder = new StreamsBuilder();
-
-        final KStream<String, Integer> left = builder.stream(INPUT_TOPIC_LEFT, 
Consumed.with(Serdes.String(), Serdes.Integer()));
-        final KStream<String, Integer> right = 
builder.stream(INPUT_TOPIC_RIGHT, Consumed.with(Serdes.String(), 
Serdes.Integer()));
-        final CountDownLatch latch = new CountDownLatch(1);
-
-        left.join(
-            right,
-            (value1, value2) -> value1 + value2,
-            JoinWindows.of(ofMillis(100)),
-            StreamJoined.with(Serdes.String(), Serdes.Integer(), 
Serdes.Integer()).withStoreName("join-store"));
-
-        try (final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(), STREAMS_CONFIG)) {
-            kafkaStreams.setStateListener((newState, oldState) -> {
-                if (newState == KafkaStreams.State.RUNNING) {
-                    latch.countDown();
-                }
-            });
-
-            kafkaStreams.start();
-            latch.await();
-            assertThrows(InvalidStateStoreException.class, () -> 
kafkaStreams.store(StoreQueryParameters.fromNameAndType("join-store", 
QueryableStoreTypes.keyValueStore())));
-        }
-    }
-
-    @Test
     public void testInner() {
         STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-inner");
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
index 66f0a04..0f7e8aa 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
@@ -16,15 +16,12 @@
  */
 package org.apache.kafka.streams.integration;
 
-import org.apache.kafka.streams.KafkaStreamsWrapper;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.test.TestRecord;
 import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -35,8 +32,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
-import static org.junit.Assert.assertTrue;
-
 /**
  * Tests all available joins of Kafka Streams DSL.
  */
@@ -62,34 +57,8 @@ public class StreamTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest
     }
 
     @Test
-    public void testShouldAutoShutdownOnIncompleteMetadata() throws 
InterruptedException {
-        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-incomplete");
-        STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
-
-        final KStream<Long, String> notExistStream = 
builder.stream(INPUT_TOPIC_LEFT + "-not-existed");
-
-        final KTable<Long, String> aggregatedTable = 
notExistStream.leftJoin(rightTable, valueJoiner)
-                .groupBy((key, value) -> key)
-                .reduce((value1, value2) -> value1 + value2);
-
-        // Write the (continuously updating) results to the output topic.
-        aggregatedTable.toStream().to(OUTPUT_TOPIC);
-
-        final KafkaStreamsWrapper streams = new 
KafkaStreamsWrapper(builder.build(), STREAMS_CONFIG);
-        final IntegrationTestUtils.StateListenerStub listener = new 
IntegrationTestUtils.StateListenerStub();
-        streams.setStreamThreadStateListener(listener);
-        streams.start();
-
-        TestUtils.waitForCondition(listener::transitToPendingShutdownSeen, 
"Did not seen thread state transited to PENDING_SHUTDOWN");
-
-        streams.close();
-        assertTrue(listener.transitToPendingShutdownSeen());
-    }
-
-    @Test
     public void testInner() {
         STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-inner");
-        STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"topology_driver:0000");
 
         final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
             null,
@@ -116,7 +85,6 @@ public class StreamTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest
     @Test
     public void testLeft() {
         STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-left");
-        STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"topology_driver:0000");
 
         final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
             null,

Reply via email to