[ 
https://issues.apache.org/jira/browse/KAFKA-6729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16479521#comment-16479521
 ] 

ASF GitHub Bot commented on KAFKA-6729:
---------------------------------------

guozhangwang closed pull request #5017: KAFKA-6729: Reuse source topics for 
source KTable's materialized store's changelog
URL: https://github.com/apache/kafka/pull/5017
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index b8195a03b9a..5331a958ac0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -53,8 +53,7 @@
 
 public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, 
V> {
 
-    // TODO: change to package-private after removing KStreamBuilder
-    public static final String SOURCE_NAME = "KSTREAM-SOURCE-";
+    static final String SOURCE_NAME = "KSTREAM-SOURCE-";
 
     static final String SINK_NAME = "KSTREAM-SINK-";
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 1c5ad4d19c9..c1f0f7ac3fe 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -45,10 +45,9 @@
  */
 public class KTableImpl<K, S, V> extends AbstractStream<K> implements 
KTable<K, V> {
 
-    // TODO: these two fields can be package-private after KStreamBuilder is 
removed
-    public static final String SOURCE_NAME = "KTABLE-SOURCE-";
+    static final String SOURCE_NAME = "KTABLE-SOURCE-";
 
-    public static final String STATE_STORE_NAME = "STATE-STORE-";
+    static final String STATE_STORE_NAME = "STATE-STORE-";
 
     private static final String FILTER_NAME = "KTABLE-FILTER-";
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 70437e91592..575ac012bf5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -96,8 +96,7 @@
     // are connected to these state stores
     private final Map<String, Set<Pattern>> stateStoreNameToSourceRegex = new 
HashMap<>();
 
-    // map from state store names to this state store's corresponding 
changelog topic if possible,
-    // this is used in the extended KStreamBuilder.
+    // map from state store names to this state store's corresponding 
changelog topic if possible
     private final Map<String, String> storeToChangelogTopic = new HashMap<>();
 
     // all global topics
@@ -1013,12 +1012,16 @@ private void buildProcessorNode(final Map<String, 
ProcessorNode> processorMap,
                     }
                 }
 
-                // if the node is connected to a state, add to the state topics
+                // if the node is connected to a state store whose changelog 
topics are not predefined, add to the changelog topics
                 for (final StateStoreFactory stateFactory : 
stateFactories.values()) {
                     if (stateFactory.loggingEnabled() && 
stateFactory.users().contains(node)) {
-                        final String name = 
ProcessorStateManager.storeChangelogTopic(applicationId, stateFactory.name());
-                        final InternalTopicConfig internalTopicConfig = 
createChangelogTopicConfig(stateFactory, name);
-                        stateChangelogTopics.put(name, internalTopicConfig);
+                        final String topicName = 
storeToChangelogTopic.containsKey(stateFactory.name()) ?
+                                storeToChangelogTopic.get(stateFactory.name()) 
:
+                                
ProcessorStateManager.storeChangelogTopic(applicationId, stateFactory.name());
+                        if (!stateChangelogTopics.containsKey(topicName)) {
+                            final InternalTopicConfig internalTopicConfig = 
createChangelogTopicConfig(stateFactory, topicName);
+                            stateChangelogTopics.put(topicName, 
internalTopicConfig);
+                        }
                     }
                 }
             }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 7c2bfa6b16a..0a1e6df3622 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -26,7 +26,6 @@
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.internals.KStreamImpl;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -39,13 +38,11 @@
 import org.junit.Test;
 
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
@@ -58,13 +55,6 @@
     private final StreamsBuilder builder = new StreamsBuilder();
     private final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
 
-    @Test(expected = TopologyException.class)
-    public void testFrom() {
-        builder.stream(Arrays.asList("topic-1", "topic-2"));
-
-        builder.build().addSource(KStreamImpl.SOURCE_NAME + "0000000000", 
"topic-3");
-    }
-
     @Test
     public void shouldAllowJoinUnmaterializedFilteredKTable() {
         final KTable<Bytes, String> filteredKTable = builder.<Bytes, 
String>table("table-topic").filter(MockPredicate.<Bytes, 
String>allGoodPredicate());
@@ -192,7 +182,7 @@ public void shouldProcessViaThroughTopic() {
     }
     
     @Test
-    public void testMerge() {
+    public void shouldMergeStreams() {
         final String topic1 = "topic-1";
         final String topic2 = "topic-2";
 
@@ -281,6 +271,16 @@ public void shouldUseDefaultNodeAndStoreNames() {
         assertFalse(stores.hasNext());
         assertFalse(subtopologies.hasNext());
     }
+
+    @Test
+    public void shouldReuseSourceTopicAsChangelogs() {
+        final String topic = "topic";
+        builder.table(topic, Materialized.<Long, String, KeyValueStore<Bytes, 
byte[]>>as("store"));
+
+        final InternalTopologyBuilder internalTopologyBuilder = 
TopologyWrapper.getInternalTopologyBuilder(builder.build());
+
+        
assertThat(internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.keySet(),
 equalTo(Collections.singleton("topic")));
+    }
     
     @Test(expected = TopologyException.class)
     public void shouldThrowExceptionWhenNoTopicPresent() {
@@ -291,14 +291,4 @@ public void shouldThrowExceptionWhenNoTopicPresent() {
     public void shouldThrowExceptionWhenTopicNamesAreNull() {
         builder.stream(Arrays.<String>asList(null, null));
     }
-
-    // TODO: these two static functions are added because some 
non-TopologyBuilder unit tests need to access the internal topology builder,
-    //       which is usually a bad sign of design patterns between 
TopologyBuilder and StreamThread. We need to consider getting rid of them later
-    public static InternalTopologyBuilder internalTopologyBuilder(final 
StreamsBuilder builder) {
-        return builder.internalTopologyBuilder;
-    }
-
-    public static Collection<Set<String>> getCopartitionedGroups(final 
StreamsBuilder builder) {
-        return builder.internalTopologyBuilder.copartitionGroups();
-    }
-}
\ No newline at end of file
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
index c37e8a954ea..7a65c4a1156 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
@@ -21,8 +21,8 @@
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
@@ -112,7 +112,7 @@ private void pushNullValueToGlobalTable(final int 
messageCount) {
 
     @Test
     public void shouldNotRequireCopartitioning() {
-        final Collection<Set<String>> copartitionGroups = 
StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = 
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals("KStream-GlobalKTable joins do not need to be 
co-partitioned", 0, copartitionGroups.size());
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
index eb0775a0847..d6196c5e864 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
@@ -21,8 +21,8 @@
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
@@ -114,7 +114,7 @@ private void pushNullValueToGlobalTable(final int 
messageCount) {
 
     @Test
     public void shouldNotRequireCopartitioning() {
-        final Collection<Set<String>> copartitionGroups = 
StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = 
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals("KStream-GlobalKTable joins do not need to be 
co-partitioned", 0, copartitionGroups.size());
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 463afb86d85..ebf3f36d1f9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -22,7 +22,6 @@
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.GlobalKTable;
@@ -174,7 +173,7 @@ public Integer apply(Integer value1, Integer value2) {
             1 + // to
             2 + // through
             1, // process
-            
StreamsBuilderTest.internalTopologyBuilder(builder).setApplicationId("X").build(null).processors().size());
+            
TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").build(null).processors().size());
     }
 
     @Test
@@ -186,7 +185,7 @@ public void 
shouldUseRecordMetadataTimestampExtractorWithThrough() {
         stream1.to("topic-5");
         stream2.through("topic-6");
 
-        ProcessorTopology processorTopology = 
StreamsBuilderTest.internalTopologyBuilder(builder).setApplicationId("X").build(null);
+        ProcessorTopology processorTopology = 
TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").build(null);
         
assertThat(processorTopology.source("topic-6").getTimestampExtractor(), 
instanceOf(FailOnInvalidTimestamp.class));
         
assertEquals(processorTopology.source("topic-4").getTimestampExtractor(), null);
         
assertEquals(processorTopology.source("topic-3").getTimestampExtractor(), null);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index de3446c1a08..59f09530ca7 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -21,8 +21,8 @@
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KStream;
@@ -105,7 +105,7 @@ public void testJoin() {
             Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
         joined.process(supplier);
 
-        final Collection<Set<String>> copartitionGroups = 
StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = 
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
@@ -207,7 +207,7 @@ public void testOuterJoin() {
             JoinWindows.of(100),
             Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
         joined.process(supplier);
-        final Collection<Set<String>> copartitionGroups = 
StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = 
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
@@ -312,7 +312,7 @@ public void testWindowing() {
             Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
         joined.process(supplier);
 
-        final Collection<Set<String>> copartitionGroups = 
StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = 
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
@@ -535,7 +535,7 @@ public void testAsymmetricWindowingAfter() {
                 Serdes.String()));
         joined.process(supplier);
 
-        final Collection<Set<String>> copartitionGroups = 
StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = 
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
@@ -644,7 +644,7 @@ public void testAsymmetricWindowingBefore() {
             Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
         joined.process(supplier);
 
-        final Collection<Set<String>> copartitionGroups = 
StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = 
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 11c5c5b9852..8535a040a33 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -21,8 +21,8 @@
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KStream;
@@ -69,7 +69,7 @@ public void testLeftJoin() {
                                   Joined.with(Serdes.Integer(), 
Serdes.String(), Serdes.String()));
         joined.process(supplier);
 
-        final Collection<Set<String>> copartitionGroups = 
StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = 
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
@@ -155,7 +155,7 @@ public void testWindowing() {
                                   Joined.with(Serdes.Integer(), 
Serdes.String(), Serdes.String()));
         joined.process(supplier);
 
-        final Collection<Set<String>> copartitionGroups = 
StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = 
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index 0ce27ab51cb..55635fa6517 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -21,8 +21,8 @@
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import 
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
@@ -104,7 +104,7 @@ private void pushNullValueToTable() {
     @Test
     public void shouldRequireCopartitionedStreams() {
 
-        final Collection<Set<String>> copartitionGroups = 
StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = 
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(streamTopic, tableTopic)), 
copartitionGroups.iterator().next());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index eedda074a41..98fc5001789 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -21,8 +21,8 @@
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
@@ -100,7 +100,7 @@ private void pushNullValueToTable(final int messageCount) {
     @Test
     public void shouldRequireCopartitionedStreams() {
 
-        final Collection<Set<String>> copartitionGroups = 
StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = 
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(streamTopic, tableTopic)), 
copartitionGroups.iterator().next());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
index 7ed8b6aeeef..2efdd8523f0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
@@ -22,7 +22,7 @@
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -75,7 +75,7 @@ private void doTestJoin(final StreamsBuilder builder,
                             final int[] expectedKeys,
                             final MockProcessorSupplier<Integer, String> 
supplier,
                             final KTable<Integer, String> joined) {
-        final Collection<Set<String>> copartitionGroups = 
StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = 
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index 51fd8390241..79e5f0e2d89 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -22,7 +22,7 @@
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Materialized;
@@ -87,7 +87,7 @@ public void testJoin() {
         final MockProcessorSupplier<Integer, String> supplier = new 
MockProcessorSupplier<>();
         joined.toStream().process(supplier);
 
-        final Collection<Set<String>> copartitionGroups = 
StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = 
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index cf3321f8f4b..8cee72ffc7b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -21,7 +21,7 @@
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.processor.MockProcessorContext;
 import org.apache.kafka.streams.processor.Processor;
@@ -83,7 +83,7 @@ public void testJoin() {
         joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER);
         joined.toStream().process(supplier);
 
-        final Collection<Set<String>> copartitionGroups = 
StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = 
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
index e9bb2a396ea..39f848f0dc9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
@@ -26,7 +26,7 @@
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Predicate;
@@ -96,7 +96,7 @@ public Object apply(final Object value) {
                             Consumed.with(null, null),
                             Materialized.<Object, Object, KeyValueStore<Bytes, 
byte[]>>as(globalTable));
 
-        
StreamsBuilderTest.internalTopologyBuilder(builder).setApplicationId("appId");
+        
TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("appId");
 
         topic1P0 = new TopicPartition("topic-one", 0);
         topic1P1 = new TopicPartition("topic-one", 1);
@@ -122,7 +122,7 @@ public Object apply(final Object value) {
                 new PartitionInfo("topic-four", 0, null, null, null));
 
         cluster = new Cluster(null, Collections.<Node>emptyList(), 
partitionInfos, Collections.<String>emptySet(), Collections.<String>emptySet());
-        discovery = new 
StreamsMetadataState(StreamsBuilderTest.internalTopologyBuilder(builder), 
hostOne);
+        discovery = new 
StreamsMetadataState(TopologyWrapper.getInternalTopologyBuilder(builder.build()),
 hostOne);
         discovery.onChange(hostToPartitions, cluster);
         partitioner = new StreamPartitioner<String, Object>() {
             @Override
@@ -134,7 +134,7 @@ public Integer partition(final String key, final Object 
value, final int numPart
 
     @Test
     public void shouldNotThrowNPEWhenOnChangeNotCalled() {
-        new 
StreamsMetadataState(StreamsBuilderTest.internalTopologyBuilder(builder), 
hostOne).getAllMetadataForStore("store");
+        new 
StreamsMetadataState(TopologyWrapper.getInternalTopologyBuilder(builder.build()),
 hostOne).getAllMetadataForStore("store");
     }
 
     @Test
@@ -301,7 +301,7 @@ public void shouldGetMyMetadataForGlobalStoreWithKey() {
 
     @Test
     public void shouldGetAnyHostForGlobalStoreByKeyIfMyHostUnknown() {
-        final StreamsMetadataState streamsMetadataState = new 
StreamsMetadataState(StreamsBuilderTest.internalTopologyBuilder(builder), 
StreamsMetadataState.UNKNOWN_HOST);
+        final StreamsMetadataState streamsMetadataState = new 
StreamsMetadataState(TopologyWrapper.getInternalTopologyBuilder(builder.build()),
 StreamsMetadataState.UNKNOWN_HOST);
         streamsMetadataState.onChange(hostToPartitions, cluster);
         assertNotNull(streamsMetadataState.getMetadataWithKey(globalTable, 
"key", Serdes.String().serializer()));
     }
@@ -314,7 +314,7 @@ public void 
shouldGetMyMetadataForGlobalStoreWithKeyAndPartitioner() {
 
     @Test
     public void 
shouldGetAnyHostForGlobalStoreByKeyAndPartitionerIfMyHostUnknown() {
-        final StreamsMetadataState streamsMetadataState = new 
StreamsMetadataState(StreamsBuilderTest.internalTopologyBuilder(builder), 
StreamsMetadataState.UNKNOWN_HOST);
+        final StreamsMetadataState streamsMetadataState = new 
StreamsMetadataState(TopologyWrapper.getInternalTopologyBuilder(builder.build()),
 StreamsMetadataState.UNKNOWN_HOST);
         streamsMetadataState.onChange(hostToPartitions, cluster);
         assertNotNull(streamsMetadataState.getMetadataWithKey(globalTable, 
"key", partitioner));
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 981215839e4..cc507d60ca5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -27,8 +27,8 @@
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
@@ -727,7 +727,7 @@ public void 
testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() {
     public void shouldGenerateTasksForAllCreatedPartitions() {
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final InternalTopologyBuilder internalTopologyBuilder = 
StreamsBuilderTest.internalTopologyBuilder(builder);
+        final InternalTopologyBuilder internalTopologyBuilder = 
TopologyWrapper.getInternalTopologyBuilder(builder.build());
         internalTopologyBuilder.setApplicationId(applicationId);
 
         // KStream with 3 partitions
@@ -796,7 +796,7 @@ public Object apply(final Object value1, final Object 
value2) {
         expectedCreatedInternalTopics.put(applicationId + 
"-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 4);
         expectedCreatedInternalTopics.put(applicationId + 
"-KTABLE-AGGREGATE-STATE-STORE-0000000006-changelog", 4);
         expectedCreatedInternalTopics.put(applicationId + 
"-KSTREAM-MAP-0000000001-repartition", 4);
-        expectedCreatedInternalTopics.put(applicationId + 
"-topic3-STATE-STORE-0000000002-changelog", 4);
+        expectedCreatedInternalTopics.put("topic3", 4);     // the source 
topic is reused as changelog topics
 
         // check if all internal topics were created as expected
         assertThat(mockInternalTopicManager.readyTopics, 
equalTo(expectedCreatedInternalTopics));
@@ -906,7 +906,7 @@ public void 
shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() {
     public void 
shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTasks() {
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final InternalTopologyBuilder internalTopologyBuilder = 
StreamsBuilderTest.internalTopologyBuilder(builder);
+        final InternalTopologyBuilder internalTopologyBuilder = 
TopologyWrapper.getInternalTopologyBuilder(builder.build());
         internalTopologyBuilder.setApplicationId(applicationId);
 
         KStream<Object, Object> stream1 = builder
@@ -1026,7 +1026,7 @@ public void 
shouldUpdateClusterMetadataAndHostInfoOnAssignment() {
     public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() {
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final InternalTopologyBuilder internalTopologyBuilder = 
StreamsBuilderTest.internalTopologyBuilder(builder);
+        final InternalTopologyBuilder internalTopologyBuilder = 
TopologyWrapper.getInternalTopologyBuilder(builder.build());
         internalTopologyBuilder.setApplicationId(applicationId);
 
         builder.stream("topic1").groupByKey().count();
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java 
b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 033b68d3cc0..2c3461a8ad4 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -22,7 +22,7 @@
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
@@ -82,7 +82,7 @@ public void setUp(final StreamsBuilder builder,
                       final Serde<?> keySerde,
                       final Serde<?> valSerde,
                       final long cacheSize) {
-        final InternalTopologyBuilder internalTopologyBuilder = 
StreamsBuilderTest.internalTopologyBuilder(builder);
+        final InternalTopologyBuilder internalTopologyBuilder = 
TopologyWrapper.getInternalTopologyBuilder(builder.build());
 
         internalTopologyBuilder.setApplicationId("TestDriver");
         topology = internalTopologyBuilder.build(null);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KTable should use user source topics if possible and not create changelog 
> topic
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-6729
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6729
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Matthias J. Sax
>            Assignee: Guozhang Wang
>            Priority: Blocker
>             Fix For: 2.0.0
>
>
> With KIP-182 we reworked Streams API largely and introduced a regression into 
> 1.0 code base. If a KTable is populated from a source topic, ie, 
> StreamsBuilder.table() -- the KTable does create its own changelog topic. 
> However, in older releases (0.11 or older), we don't create a changelog topic 
> but use the user specified source topic instead.
> We want to reintroduce this optimization to reduce the load (storage and 
> write) on the broker side for this case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to