This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1a324d7  KAFKA-6729: Reuse source topics for source KTable's 
materialized store's changelog (#5017)
1a324d7 is described below

commit 1a324d784cfc53288730b7c1b5c1bde0685e4686
Author: Guozhang Wang <wangg...@gmail.com>
AuthorDate: Thu May 17 11:28:45 2018 -0700

    KAFKA-6729: Reuse source topics for source KTable's materialized store's 
changelog (#5017)
    
    1. In InternalTopologyBuilder#topicGroups, which is used in 
StreamsPartitionAssignor, look for book-kept storeToChangelogTopic map before 
creating a new internal changelog topics. In this way if the source KTable is 
created, its source topic stored in storeToChangelogTopic will be used.
    
    2. Added unit test (confirmed that without 1) it will fail).
    
    3. MINOR: removed TODOs that are related to removed KStreamBuilder.
    
    4. MINOR: removed TODOs in StreamsBuilderTest util functions and replaced 
with TopologyWrapper.
    
    5. MINOR: removed StreamsBuilderTest#testFrom as it is already covered by 
TopologyTest#shouldNotAllowToAddSourcesWithSameName, plus it requires 
KStreamImpl.SOURCE_NAME which should be a package private field of the 
KStreamImpl.
    
    Reviewers: John Roesler <j...@confluent.io>, Bill Bejeck 
<b...@confluent.io>, Matthias
     J. Sax <matth...@confluent.io>
---
 .../streams/kstream/internals/KStreamImpl.java     |  3 +-
 .../streams/kstream/internals/KTableImpl.java      |  5 ++--
 .../internals/InternalTopologyBuilder.java         | 15 ++++++----
 .../apache/kafka/streams/StreamsBuilderTest.java   | 34 ++++++++--------------
 .../internals/KStreamGlobalKTableJoinTest.java     |  4 +--
 .../internals/KStreamGlobalKTableLeftJoinTest.java |  4 +--
 .../streams/kstream/internals/KStreamImplTest.java |  5 ++--
 .../kstream/internals/KStreamKStreamJoinTest.java  | 12 ++++----
 .../internals/KStreamKStreamLeftJoinTest.java      |  6 ++--
 .../kstream/internals/KStreamKTableJoinTest.java   |  4 +--
 .../internals/KStreamKTableLeftJoinTest.java       |  4 +--
 .../internals/KTableKTableInnerJoinTest.java       |  4 +--
 .../internals/KTableKTableLeftJoinTest.java        |  4 +--
 .../internals/KTableKTableOuterJoinTest.java       |  4 +--
 .../internals/StreamsMetadataStateTest.java        | 12 ++++----
 .../internals/StreamsPartitionAssignorTest.java    | 10 +++----
 .../org/apache/kafka/test/KStreamTestDriver.java   |  4 +--
 17 files changed, 62 insertions(+), 72 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index b8195a0..5331a95 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -53,8 +53,7 @@ import java.util.Set;
 
 public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, 
V> {
 
-    // TODO: change to package-private after removing KStreamBuilder
-    public static final String SOURCE_NAME = "KSTREAM-SOURCE-";
+    static final String SOURCE_NAME = "KSTREAM-SOURCE-";
 
     static final String SINK_NAME = "KSTREAM-SINK-";
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 1c5ad4d..c1f0f7a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -45,10 +45,9 @@ import java.util.Set;
  */
 public class KTableImpl<K, S, V> extends AbstractStream<K> implements 
KTable<K, V> {
 
-    // TODO: these two fields can be package-private after KStreamBuilder is 
removed
-    public static final String SOURCE_NAME = "KTABLE-SOURCE-";
+    static final String SOURCE_NAME = "KTABLE-SOURCE-";
 
-    public static final String STATE_STORE_NAME = "STATE-STORE-";
+    static final String STATE_STORE_NAME = "STATE-STORE-";
 
     private static final String FILTER_NAME = "KTABLE-FILTER-";
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 70437e9..575ac01 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -96,8 +96,7 @@ public class InternalTopologyBuilder {
     // are connected to these state stores
     private final Map<String, Set<Pattern>> stateStoreNameToSourceRegex = new 
HashMap<>();
 
-    // map from state store names to this state store's corresponding 
changelog topic if possible,
-    // this is used in the extended KStreamBuilder.
+    // map from state store names to this state store's corresponding 
changelog topic if possible
     private final Map<String, String> storeToChangelogTopic = new HashMap<>();
 
     // all global topics
@@ -1013,12 +1012,16 @@ public class InternalTopologyBuilder {
                     }
                 }
 
-                // if the node is connected to a state, add to the state topics
+                // if the node is connected to a state store whose changelog 
topics are not predefined, add to the changelog topics
                 for (final StateStoreFactory stateFactory : 
stateFactories.values()) {
                     if (stateFactory.loggingEnabled() && 
stateFactory.users().contains(node)) {
-                        final String name = 
ProcessorStateManager.storeChangelogTopic(applicationId, stateFactory.name());
-                        final InternalTopicConfig internalTopicConfig = 
createChangelogTopicConfig(stateFactory, name);
-                        stateChangelogTopics.put(name, internalTopicConfig);
+                        final String topicName = 
storeToChangelogTopic.containsKey(stateFactory.name()) ?
+                                storeToChangelogTopic.get(stateFactory.name()) 
:
+                                
ProcessorStateManager.storeChangelogTopic(applicationId, stateFactory.name());
+                        if (!stateChangelogTopics.containsKey(topicName)) {
+                            final InternalTopicConfig internalTopicConfig = 
createChangelogTopicConfig(stateFactory, topicName);
+                            stateChangelogTopics.put(topicName, 
internalTopicConfig);
+                        }
                     }
                 }
             }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 7c2bfa6..0a1e6df 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -26,7 +26,6 @@ import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.internals.KStreamImpl;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -39,13 +38,11 @@ import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
@@ -58,13 +55,6 @@ public class StreamsBuilderTest {
     private final StreamsBuilder builder = new StreamsBuilder();
     private final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
 
-    @Test(expected = TopologyException.class)
-    public void testFrom() {
-        builder.stream(Arrays.asList("topic-1", "topic-2"));
-
-        builder.build().addSource(KStreamImpl.SOURCE_NAME + "0000000000", 
"topic-3");
-    }
-
     @Test
     public void shouldAllowJoinUnmaterializedFilteredKTable() {
         final KTable<Bytes, String> filteredKTable = builder.<Bytes, 
String>table("table-topic").filter(MockPredicate.<Bytes, 
String>allGoodPredicate());
@@ -192,7 +182,7 @@ public class StreamsBuilderTest {
     }
     
     @Test
-    public void testMerge() {
+    public void shouldMergeStreams() {
         final String topic1 = "topic-1";
         final String topic2 = "topic-2";
 
@@ -281,6 +271,16 @@ public class StreamsBuilderTest {
         assertFalse(stores.hasNext());
         assertFalse(subtopologies.hasNext());
     }
+
+    @Test
+    public void shouldReuseSourceTopicAsChangelogs() {
+        final String topic = "topic";
+        builder.table(topic, Materialized.<Long, String, KeyValueStore<Bytes, 
byte[]>>as("store"));
+
+        final InternalTopologyBuilder internalTopologyBuilder = 
TopologyWrapper.getInternalTopologyBuilder(builder.build());
+
+        
assertThat(internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.keySet(),
 equalTo(Collections.singleton("topic")));
+    }
     
     @Test(expected = TopologyException.class)
     public void shouldThrowExceptionWhenNoTopicPresent() {
@@ -291,14 +291,4 @@ public class StreamsBuilderTest {
     public void shouldThrowExceptionWhenTopicNamesAreNull() {
         builder.stream(Arrays.<String>asList(null, null));
     }
-
-    // TODO: these two static functions are added because some 
non-TopologyBuilder unit tests need to access the internal topology builder,
-    //       which is usually a bad sign of design patterns between 
TopologyBuilder and StreamThread. We need to consider getting rid of them later
-    public static InternalTopologyBuilder internalTopologyBuilder(final 
StreamsBuilder builder) {
-        return builder.internalTopologyBuilder;
-    }
-
-    public static Collection<Set<String>> getCopartitionedGroups(final 
StreamsBuilder builder) {
-        return builder.internalTopologyBuilder.copartitionGroups();
-    }
-}
\ No newline at end of file
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
index c37e8a9..7a65c4a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
@@ -21,8 +21,8 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
@@ -112,7 +112,7 @@ public class KStreamGlobalKTableJoinTest {
 
     @Test
     public void shouldNotRequireCopartitioning() {
-        final Collection<Set<String>> copartitionGroups = 
StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = 
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals("KStream-GlobalKTable joins do not need to be 
co-partitioned", 0, copartitionGroups.size());
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
index eb0775a..d6196c5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
@@ -21,8 +21,8 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
@@ -114,7 +114,7 @@ public class KStreamGlobalKTableLeftJoinTest {
 
     @Test
     public void shouldNotRequireCopartitioning() {
-        final Collection<Set<String>> copartitionGroups = 
StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = 
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals("KStream-GlobalKTable joins do not need to be 
co-partitioned", 0, copartitionGroups.size());
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 463afb8..ebf3f36 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.GlobalKTable;
@@ -174,7 +173,7 @@ public class KStreamImplTest {
             1 + // to
             2 + // through
             1, // process
-            
StreamsBuilderTest.internalTopologyBuilder(builder).setApplicationId("X").build(null).processors().size());
+            
TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").build(null).processors().size());
     }
 
     @Test
@@ -186,7 +185,7 @@ public class KStreamImplTest {
         stream1.to("topic-5");
         stream2.through("topic-6");
 
-        ProcessorTopology processorTopology = 
StreamsBuilderTest.internalTopologyBuilder(builder).setApplicationId("X").build(null);
+        ProcessorTopology processorTopology = 
TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").build(null);
         
assertThat(processorTopology.source("topic-6").getTimestampExtractor(), 
instanceOf(FailOnInvalidTimestamp.class));
         
assertEquals(processorTopology.source("topic-4").getTimestampExtractor(), null);
         
assertEquals(processorTopology.source("topic-3").getTimestampExtractor(), null);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index de3446c..59f0953 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -21,8 +21,8 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KStream;
@@ -105,7 +105,7 @@ public class KStreamKStreamJoinTest {
             Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
         joined.process(supplier);
 
-        final Collection<Set<String>> copartitionGroups = 
StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = 
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
@@ -207,7 +207,7 @@ public class KStreamKStreamJoinTest {
             JoinWindows.of(100),
             Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
         joined.process(supplier);
-        final Collection<Set<String>> copartitionGroups = 
StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = 
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
@@ -312,7 +312,7 @@ public class KStreamKStreamJoinTest {
             Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
         joined.process(supplier);
 
-        final Collection<Set<String>> copartitionGroups = 
StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = 
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
@@ -535,7 +535,7 @@ public class KStreamKStreamJoinTest {
                 Serdes.String()));
         joined.process(supplier);
 
-        final Collection<Set<String>> copartitionGroups = 
StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = 
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
@@ -644,7 +644,7 @@ public class KStreamKStreamJoinTest {
             Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
         joined.process(supplier);
 
-        final Collection<Set<String>> copartitionGroups = 
StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = 
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 11c5c5b..8535a04 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -21,8 +21,8 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KStream;
@@ -69,7 +69,7 @@ public class KStreamKStreamLeftJoinTest {
                                   Joined.with(Serdes.Integer(), 
Serdes.String(), Serdes.String()));
         joined.process(supplier);
 
-        final Collection<Set<String>> copartitionGroups = 
StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = 
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
@@ -155,7 +155,7 @@ public class KStreamKStreamLeftJoinTest {
                                   Joined.with(Serdes.Integer(), 
Serdes.String(), Serdes.String()));
         joined.process(supplier);
 
-        final Collection<Set<String>> copartitionGroups = 
StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = 
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index 0ce27ab..55635fa 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -21,8 +21,8 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import 
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
@@ -104,7 +104,7 @@ public class KStreamKTableJoinTest {
     @Test
     public void shouldRequireCopartitionedStreams() {
 
-        final Collection<Set<String>> copartitionGroups = 
StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = 
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(streamTopic, tableTopic)), 
copartitionGroups.iterator().next());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index eedda07..98fc500 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -21,8 +21,8 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
@@ -100,7 +100,7 @@ public class KStreamKTableLeftJoinTest {
     @Test
     public void shouldRequireCopartitionedStreams() {
 
-        final Collection<Set<String>> copartitionGroups = 
StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = 
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(streamTopic, tableTopic)), 
copartitionGroups.iterator().next());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
index 7ed8b6a..2efdd85 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -75,7 +75,7 @@ public class KTableKTableInnerJoinTest {
                             final int[] expectedKeys,
                             final MockProcessorSupplier<Integer, String> 
supplier,
                             final KTable<Integer, String> joined) {
-        final Collection<Set<String>> copartitionGroups = 
StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = 
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index 51fd839..79e5f0e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Materialized;
@@ -87,7 +87,7 @@ public class KTableKTableLeftJoinTest {
         final MockProcessorSupplier<Integer, String> supplier = new 
MockProcessorSupplier<>();
         joined.toStream().process(supplier);
 
-        final Collection<Set<String>> copartitionGroups = 
StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = 
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index cf3321f..8cee72f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -21,7 +21,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.processor.MockProcessorContext;
 import org.apache.kafka.streams.processor.Processor;
@@ -83,7 +83,7 @@ public class KTableKTableOuterJoinTest {
         joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER);
         joined.toStream().process(supplier);
 
-        final Collection<Set<String>> copartitionGroups = 
StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = 
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
index e9bb2a3..39f848f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
@@ -26,7 +26,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Predicate;
@@ -96,7 +96,7 @@ public class StreamsMetadataStateTest {
                             Consumed.with(null, null),
                             Materialized.<Object, Object, KeyValueStore<Bytes, 
byte[]>>as(globalTable));
 
-        
StreamsBuilderTest.internalTopologyBuilder(builder).setApplicationId("appId");
+        
TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("appId");
 
         topic1P0 = new TopicPartition("topic-one", 0);
         topic1P1 = new TopicPartition("topic-one", 1);
@@ -122,7 +122,7 @@ public class StreamsMetadataStateTest {
                 new PartitionInfo("topic-four", 0, null, null, null));
 
         cluster = new Cluster(null, Collections.<Node>emptyList(), 
partitionInfos, Collections.<String>emptySet(), Collections.<String>emptySet());
-        discovery = new 
StreamsMetadataState(StreamsBuilderTest.internalTopologyBuilder(builder), 
hostOne);
+        discovery = new 
StreamsMetadataState(TopologyWrapper.getInternalTopologyBuilder(builder.build()),
 hostOne);
         discovery.onChange(hostToPartitions, cluster);
         partitioner = new StreamPartitioner<String, Object>() {
             @Override
@@ -134,7 +134,7 @@ public class StreamsMetadataStateTest {
 
     @Test
     public void shouldNotThrowNPEWhenOnChangeNotCalled() {
-        new 
StreamsMetadataState(StreamsBuilderTest.internalTopologyBuilder(builder), 
hostOne).getAllMetadataForStore("store");
+        new 
StreamsMetadataState(TopologyWrapper.getInternalTopologyBuilder(builder.build()),
 hostOne).getAllMetadataForStore("store");
     }
 
     @Test
@@ -301,7 +301,7 @@ public class StreamsMetadataStateTest {
 
     @Test
     public void shouldGetAnyHostForGlobalStoreByKeyIfMyHostUnknown() {
-        final StreamsMetadataState streamsMetadataState = new 
StreamsMetadataState(StreamsBuilderTest.internalTopologyBuilder(builder), 
StreamsMetadataState.UNKNOWN_HOST);
+        final StreamsMetadataState streamsMetadataState = new 
StreamsMetadataState(TopologyWrapper.getInternalTopologyBuilder(builder.build()),
 StreamsMetadataState.UNKNOWN_HOST);
         streamsMetadataState.onChange(hostToPartitions, cluster);
         assertNotNull(streamsMetadataState.getMetadataWithKey(globalTable, 
"key", Serdes.String().serializer()));
     }
@@ -314,7 +314,7 @@ public class StreamsMetadataStateTest {
 
     @Test
     public void 
shouldGetAnyHostForGlobalStoreByKeyAndPartitionerIfMyHostUnknown() {
-        final StreamsMetadataState streamsMetadataState = new 
StreamsMetadataState(StreamsBuilderTest.internalTopologyBuilder(builder), 
StreamsMetadataState.UNKNOWN_HOST);
+        final StreamsMetadataState streamsMetadataState = new 
StreamsMetadataState(TopologyWrapper.getInternalTopologyBuilder(builder.build()),
 StreamsMetadataState.UNKNOWN_HOST);
         streamsMetadataState.onChange(hostToPartitions, cluster);
         assertNotNull(streamsMetadataState.getMetadataWithKey(globalTable, 
"key", partitioner));
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 9812158..cc507d6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -27,8 +27,8 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
@@ -727,7 +727,7 @@ public class StreamsPartitionAssignorTest {
     public void shouldGenerateTasksForAllCreatedPartitions() {
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final InternalTopologyBuilder internalTopologyBuilder = 
StreamsBuilderTest.internalTopologyBuilder(builder);
+        final InternalTopologyBuilder internalTopologyBuilder = 
TopologyWrapper.getInternalTopologyBuilder(builder.build());
         internalTopologyBuilder.setApplicationId(applicationId);
 
         // KStream with 3 partitions
@@ -796,7 +796,7 @@ public class StreamsPartitionAssignorTest {
         expectedCreatedInternalTopics.put(applicationId + 
"-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 4);
         expectedCreatedInternalTopics.put(applicationId + 
"-KTABLE-AGGREGATE-STATE-STORE-0000000006-changelog", 4);
         expectedCreatedInternalTopics.put(applicationId + 
"-KSTREAM-MAP-0000000001-repartition", 4);
-        expectedCreatedInternalTopics.put(applicationId + 
"-topic3-STATE-STORE-0000000002-changelog", 4);
+        expectedCreatedInternalTopics.put("topic3", 4);     // the source 
topic is reused as changelog topics
 
         // check if all internal topics were created as expected
         assertThat(mockInternalTopicManager.readyTopics, 
equalTo(expectedCreatedInternalTopics));
@@ -906,7 +906,7 @@ public class StreamsPartitionAssignorTest {
     public void 
shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTasks() {
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final InternalTopologyBuilder internalTopologyBuilder = 
StreamsBuilderTest.internalTopologyBuilder(builder);
+        final InternalTopologyBuilder internalTopologyBuilder = 
TopologyWrapper.getInternalTopologyBuilder(builder.build());
         internalTopologyBuilder.setApplicationId(applicationId);
 
         KStream<Object, Object> stream1 = builder
@@ -1026,7 +1026,7 @@ public class StreamsPartitionAssignorTest {
     public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() {
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final InternalTopologyBuilder internalTopologyBuilder = 
StreamsBuilderTest.internalTopologyBuilder(builder);
+        final InternalTopologyBuilder internalTopologyBuilder = 
TopologyWrapper.getInternalTopologyBuilder(builder.build());
         internalTopologyBuilder.setApplicationId(applicationId);
 
         builder.stream("topic1").groupByKey().count();
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java 
b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 033b68d..2c3461a 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
@@ -82,7 +82,7 @@ public class KStreamTestDriver extends ExternalResource {
                       final Serde<?> keySerde,
                       final Serde<?> valSerde,
                       final long cacheSize) {
-        final InternalTopologyBuilder internalTopologyBuilder = 
StreamsBuilderTest.internalTopologyBuilder(builder);
+        final InternalTopologyBuilder internalTopologyBuilder = 
TopologyWrapper.getInternalTopologyBuilder(builder.build());
 
         internalTopologyBuilder.setApplicationId("TestDriver");
         topology = internalTopologyBuilder.build(null);

-- 
To stop receiving notification emails like this one, please contact
guozh...@apache.org.

Reply via email to