[ https://issues.apache.org/jira/browse/KAFKA-6729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16479521#comment-16479521 ]
ASF GitHub Bot commented on KAFKA-6729: --------------------------------------- guozhangwang closed pull request #5017: KAFKA-6729: Reuse source topics for source KTable's materialized store's changelog URL: https://github.com/apache/kafka/pull/5017 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index b8195a03b9a..5331a958ac0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -53,8 +53,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V> { - // TODO: change to package-private after removing KStreamBuilder - public static final String SOURCE_NAME = "KSTREAM-SOURCE-"; + static final String SOURCE_NAME = "KSTREAM-SOURCE-"; static final String SINK_NAME = "KSTREAM-SINK-"; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 1c5ad4d19c9..c1f0f7ac3fe 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -45,10 +45,9 @@ */ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, V> { - // TODO: these two fields can be package-private after KStreamBuilder is removed - public static final String SOURCE_NAME = "KTABLE-SOURCE-"; + static final String SOURCE_NAME = "KTABLE-SOURCE-"; - public static final String STATE_STORE_NAME = "STATE-STORE-"; + static final String STATE_STORE_NAME = "STATE-STORE-"; private static final String FILTER_NAME = "KTABLE-FILTER-"; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 70437e91592..575ac012bf5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -96,8 +96,7 @@ // are connected to these state stores private final Map<String, Set<Pattern>> stateStoreNameToSourceRegex = new HashMap<>(); - // map from state store names to this state store's corresponding changelog topic if possible, - // this is used in the extended KStreamBuilder. + // map from state store names to this state store's corresponding changelog topic if possible private final Map<String, String> storeToChangelogTopic = new HashMap<>(); // all global topics @@ -1013,12 +1012,16 @@ private void buildProcessorNode(final Map<String, ProcessorNode> processorMap, } } - // if the node is connected to a state, add to the state topics + // if the node is connected to a state store whose changelog topics are not predefined, add to the changelog topics for (final StateStoreFactory stateFactory : stateFactories.values()) { if (stateFactory.loggingEnabled() && stateFactory.users().contains(node)) { - final String name = ProcessorStateManager.storeChangelogTopic(applicationId, stateFactory.name()); - final InternalTopicConfig internalTopicConfig = createChangelogTopicConfig(stateFactory, name); - stateChangelogTopics.put(name, internalTopicConfig); + final String topicName = storeToChangelogTopic.containsKey(stateFactory.name()) ? + storeToChangelogTopic.get(stateFactory.name()) : + ProcessorStateManager.storeChangelogTopic(applicationId, stateFactory.name()); + if (!stateChangelogTopics.containsKey(topicName)) { + final InternalTopicConfig internalTopicConfig = createChangelogTopicConfig(stateFactory, topicName); + stateChangelogTopics.put(topicName, internalTopicConfig); + } } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java index 7c2bfa6b16a..0a1e6df3622 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java @@ -26,7 +26,6 @@ import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; -import org.apache.kafka.streams.kstream.internals.KStreamImpl; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.streams.state.KeyValueStore; @@ -39,13 +38,11 @@ import org.junit.Test; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Properties; -import java.util.Set; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; @@ -58,13 +55,6 @@ private final StreamsBuilder builder = new StreamsBuilder(); private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String()); - @Test(expected = TopologyException.class) - public void testFrom() { - builder.stream(Arrays.asList("topic-1", "topic-2")); - - builder.build().addSource(KStreamImpl.SOURCE_NAME + "0000000000", "topic-3"); - } - @Test public void shouldAllowJoinUnmaterializedFilteredKTable() { final KTable<Bytes, String> filteredKTable = builder.<Bytes, String>table("table-topic").filter(MockPredicate.<Bytes, String>allGoodPredicate()); @@ -192,7 +182,7 @@ public void shouldProcessViaThroughTopic() { } @Test - public void testMerge() { + public void shouldMergeStreams() { final String topic1 = "topic-1"; final String topic2 = "topic-2"; @@ -281,6 +271,16 @@ public void shouldUseDefaultNodeAndStoreNames() { assertFalse(stores.hasNext()); assertFalse(subtopologies.hasNext()); } + + @Test + public void shouldReuseSourceTopicAsChangelogs() { + final String topic = "topic"; + builder.table(topic, Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as("store")); + + final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(builder.build()); + + assertThat(internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.keySet(), equalTo(Collections.singleton("topic"))); + } @Test(expected = TopologyException.class) public void shouldThrowExceptionWhenNoTopicPresent() { @@ -291,14 +291,4 @@ public void shouldThrowExceptionWhenNoTopicPresent() { public void shouldThrowExceptionWhenTopicNamesAreNull() { builder.stream(Arrays.<String>asList(null, null)); } - - // TODO: these two static functions are added because some non-TopologyBuilder unit tests need to access the internal topology builder, - // which is usually a bad sign of design patterns between TopologyBuilder and StreamThread. We need to consider getting rid of them later - public static InternalTopologyBuilder internalTopologyBuilder(final StreamsBuilder builder) { - return builder.internalTopologyBuilder; - } - - public static Collection<Set<String>> getCopartitionedGroups(final StreamsBuilder builder) { - return builder.internalTopologyBuilder.copartitionGroups(); - } -} \ No newline at end of file +} diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java index c37e8a954ea..7a65c4a1156 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java @@ -21,8 +21,8 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsBuilderTest; import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.TopologyWrapper; import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KeyValueMapper; @@ -112,7 +112,7 @@ private void pushNullValueToGlobalTable(final int messageCount) { @Test public void shouldNotRequireCopartitioning() { - final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder); + final Collection<Set<String>> copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); assertEquals("KStream-GlobalKTable joins do not need to be co-partitioned", 0, copartitionGroups.size()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java index eb0775a0847..d6196c5e864 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java @@ -21,8 +21,8 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsBuilderTest; import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.TopologyWrapper; import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KeyValueMapper; @@ -114,7 +114,7 @@ private void pushNullValueToGlobalTable(final int messageCount) { @Test public void shouldNotRequireCopartitioning() { - final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder); + final Collection<Set<String>> copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); assertEquals("KStream-GlobalKTable joins do not need to be co-partitioned", 0, copartitionGroups.size()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index 463afb86d85..ebf3f36d1f9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -22,7 +22,6 @@ import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsBuilderTest; import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.TopologyWrapper; import org.apache.kafka.streams.kstream.GlobalKTable; @@ -174,7 +173,7 @@ public Integer apply(Integer value1, Integer value2) { 1 + // to 2 + // through 1, // process - StreamsBuilderTest.internalTopologyBuilder(builder).setApplicationId("X").build(null).processors().size()); + TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").build(null).processors().size()); } @Test @@ -186,7 +185,7 @@ public void shouldUseRecordMetadataTimestampExtractorWithThrough() { stream1.to("topic-5"); stream2.through("topic-6"); - ProcessorTopology processorTopology = StreamsBuilderTest.internalTopologyBuilder(builder).setApplicationId("X").build(null); + ProcessorTopology processorTopology = TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").build(null); assertThat(processorTopology.source("topic-6").getTimestampExtractor(), instanceOf(FailOnInvalidTimestamp.class)); assertEquals(processorTopology.source("topic-4").getTimestampExtractor(), null); assertEquals(processorTopology.source("topic-3").getTimestampExtractor(), null); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java index de3446c1a08..59f09530ca7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java @@ -21,8 +21,8 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsBuilderTest; import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.TopologyWrapper; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.Joined; import org.apache.kafka.streams.kstream.KStream; @@ -105,7 +105,7 @@ public void testJoin() { Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String())); joined.process(supplier); - final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder); + final Collection<Set<String>> copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); assertEquals(1, copartitionGroups.size()); assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); @@ -207,7 +207,7 @@ public void testOuterJoin() { JoinWindows.of(100), Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String())); joined.process(supplier); - final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder); + final Collection<Set<String>> copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); assertEquals(1, copartitionGroups.size()); assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); @@ -312,7 +312,7 @@ public void testWindowing() { Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String())); joined.process(supplier); - final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder); + final Collection<Set<String>> copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); assertEquals(1, copartitionGroups.size()); assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); @@ -535,7 +535,7 @@ public void testAsymmetricWindowingAfter() { Serdes.String())); joined.process(supplier); - final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder); + final Collection<Set<String>> copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); assertEquals(1, copartitionGroups.size()); assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); @@ -644,7 +644,7 @@ public void testAsymmetricWindowingBefore() { Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String())); joined.process(supplier); - final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder); + final Collection<Set<String>> copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); assertEquals(1, copartitionGroups.size()); assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java index 11c5c5b9852..8535a040a33 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java @@ -21,8 +21,8 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsBuilderTest; import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.TopologyWrapper; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.Joined; import org.apache.kafka.streams.kstream.KStream; @@ -69,7 +69,7 @@ public void testLeftJoin() { Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String())); joined.process(supplier); - final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder); + final Collection<Set<String>> copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); assertEquals(1, copartitionGroups.size()); assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); @@ -155,7 +155,7 @@ public void testWindowing() { Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String())); joined.process(supplier); - final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder); + final Collection<Set<String>> copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); assertEquals(1, copartitionGroups.size()); assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java index 0ce27ab51cb..55635fa6517 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java @@ -21,8 +21,8 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsBuilderTest; import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.TopologyWrapper; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; @@ -104,7 +104,7 @@ private void pushNullValueToTable() { @Test public void shouldRequireCopartitionedStreams() { - final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder); + final Collection<Set<String>> copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); assertEquals(1, copartitionGroups.size()); assertEquals(new HashSet<>(Arrays.asList(streamTopic, tableTopic)), copartitionGroups.iterator().next()); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java index eedda074a41..98fc5001789 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java @@ -21,8 +21,8 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsBuilderTest; import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.TopologyWrapper; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.test.ConsumerRecordFactory; @@ -100,7 +100,7 @@ private void pushNullValueToTable(final int messageCount) { @Test public void shouldRequireCopartitionedStreams() { - final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder); + final Collection<Set<String>> copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); assertEquals(1, copartitionGroups.size()); assertEquals(new HashSet<>(Arrays.asList(streamTopic, tableTopic)), copartitionGroups.iterator().next()); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java index 7ed8b6aeeef..2efdd8523f0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java @@ -22,7 +22,7 @@ import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsBuilderTest; +import org.apache.kafka.streams.TopologyWrapper; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.state.KeyValueStore; @@ -75,7 +75,7 @@ private void doTestJoin(final StreamsBuilder builder, final int[] expectedKeys, final MockProcessorSupplier<Integer, String> supplier, final KTable<Integer, String> joined) { - final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder); + final Collection<Set<String>> copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); assertEquals(1, copartitionGroups.size()); assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java index 51fd8390241..79e5f0e2d89 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java @@ -22,7 +22,7 @@ import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsBuilderTest; +import org.apache.kafka.streams.TopologyWrapper; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Materialized; @@ -87,7 +87,7 @@ public void testJoin() { final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>(); joined.toStream().process(supplier); - final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder); + final Collection<Set<String>> copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); assertEquals(1, copartitionGroups.size()); assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java index cf3321f8f4b..8cee72ffc7b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java @@ -21,7 +21,7 @@ import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsBuilderTest; +import org.apache.kafka.streams.TopologyWrapper; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.processor.MockProcessorContext; import org.apache.kafka.streams.processor.Processor; @@ -83,7 +83,7 @@ public void testJoin() { joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER); joined.toStream().process(supplier); - final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder); + final Collection<Set<String>> copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); assertEquals(1, copartitionGroups.size()); assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java index e9bb2a396ea..39f848f0dc9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java @@ -26,7 +26,7 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsBuilderTest; +import org.apache.kafka.streams.TopologyWrapper; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Predicate; @@ -96,7 +96,7 @@ public Object apply(final Object value) { Consumed.with(null, null), Materialized.<Object, Object, KeyValueStore<Bytes, byte[]>>as(globalTable)); - StreamsBuilderTest.internalTopologyBuilder(builder).setApplicationId("appId"); + TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("appId"); topic1P0 = new TopicPartition("topic-one", 0); topic1P1 = new TopicPartition("topic-one", 1); @@ -122,7 +122,7 @@ public Object apply(final Object value) { new PartitionInfo("topic-four", 0, null, null, null)); cluster = new Cluster(null, Collections.<Node>emptyList(), partitionInfos, Collections.<String>emptySet(), Collections.<String>emptySet()); - discovery = new StreamsMetadataState(StreamsBuilderTest.internalTopologyBuilder(builder), hostOne); + discovery = new StreamsMetadataState(TopologyWrapper.getInternalTopologyBuilder(builder.build()), hostOne); discovery.onChange(hostToPartitions, cluster); partitioner = new StreamPartitioner<String, Object>() { @Override @@ -134,7 +134,7 @@ public Integer partition(final String key, final Object value, final int numPart @Test public void shouldNotThrowNPEWhenOnChangeNotCalled() { - new StreamsMetadataState(StreamsBuilderTest.internalTopologyBuilder(builder), hostOne).getAllMetadataForStore("store"); + new StreamsMetadataState(TopologyWrapper.getInternalTopologyBuilder(builder.build()), hostOne).getAllMetadataForStore("store"); } @Test @@ -301,7 +301,7 @@ public void shouldGetMyMetadataForGlobalStoreWithKey() { @Test public void shouldGetAnyHostForGlobalStoreByKeyIfMyHostUnknown() { - final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(StreamsBuilderTest.internalTopologyBuilder(builder), StreamsMetadataState.UNKNOWN_HOST); + final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(TopologyWrapper.getInternalTopologyBuilder(builder.build()), StreamsMetadataState.UNKNOWN_HOST); streamsMetadataState.onChange(hostToPartitions, cluster); assertNotNull(streamsMetadataState.getMetadataWithKey(globalTable, "key", Serdes.String().serializer())); } @@ -314,7 +314,7 @@ public void shouldGetMyMetadataForGlobalStoreWithKeyAndPartitioner() { @Test public void shouldGetAnyHostForGlobalStoreByKeyAndPartitionerIfMyHostUnknown() { - final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(StreamsBuilderTest.internalTopologyBuilder(builder), StreamsMetadataState.UNKNOWN_HOST); + final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(TopologyWrapper.getInternalTopologyBuilder(builder.build()), StreamsMetadataState.UNKNOWN_HOST); streamsMetadataState.onChange(hostToPartitions, cluster); assertNotNull(streamsMetadataState.getMetadataWithKey(globalTable, "key", partitioner)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index 981215839e4..cc507d60ca5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -27,8 +27,8 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsBuilderTest; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TopologyWrapper; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; @@ -727,7 +727,7 @@ public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() { public void shouldGenerateTasksForAllCreatedPartitions() { final StreamsBuilder builder = new StreamsBuilder(); - final InternalTopologyBuilder internalTopologyBuilder = StreamsBuilderTest.internalTopologyBuilder(builder); + final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(builder.build()); internalTopologyBuilder.setApplicationId(applicationId); // KStream with 3 partitions @@ -796,7 +796,7 @@ public Object apply(final Object value1, final Object value2) { expectedCreatedInternalTopics.put(applicationId + "-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 4); expectedCreatedInternalTopics.put(applicationId + "-KTABLE-AGGREGATE-STATE-STORE-0000000006-changelog", 4); expectedCreatedInternalTopics.put(applicationId + "-KSTREAM-MAP-0000000001-repartition", 4); - expectedCreatedInternalTopics.put(applicationId + "-topic3-STATE-STORE-0000000002-changelog", 4); + expectedCreatedInternalTopics.put("topic3", 4); // the source topic is reused as changelog topics // check if all internal topics were created as expected assertThat(mockInternalTopicManager.readyTopics, equalTo(expectedCreatedInternalTopics)); @@ -906,7 +906,7 @@ public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() { public void shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTasks() { final StreamsBuilder builder = new StreamsBuilder(); - final InternalTopologyBuilder internalTopologyBuilder = StreamsBuilderTest.internalTopologyBuilder(builder); + final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(builder.build()); internalTopologyBuilder.setApplicationId(applicationId); KStream<Object, Object> stream1 = builder @@ -1026,7 +1026,7 @@ public void shouldUpdateClusterMetadataAndHostInfoOnAssignment() { public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() { final StreamsBuilder builder = new StreamsBuilder(); - final InternalTopologyBuilder internalTopologyBuilder = StreamsBuilderTest.internalTopologyBuilder(builder); + final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(builder.build()); internalTopologyBuilder.setApplicationId(applicationId); builder.stream("topic1").groupByKey().count(); diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java index 033b68d3cc0..2c3461a8ad4 100644 --- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java @@ -22,7 +22,7 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsBuilderTest; +import org.apache.kafka.streams.TopologyWrapper; import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; @@ -82,7 +82,7 @@ public void setUp(final StreamsBuilder builder, final Serde<?> keySerde, final Serde<?> valSerde, final long cacheSize) { - final InternalTopologyBuilder internalTopologyBuilder = StreamsBuilderTest.internalTopologyBuilder(builder); + final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(builder.build()); internalTopologyBuilder.setApplicationId("TestDriver"); topology = internalTopologyBuilder.build(null); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > KTable should use user source topics if possible and not create changelog > topic > ------------------------------------------------------------------------------- > > Key: KAFKA-6729 > URL: https://issues.apache.org/jira/browse/KAFKA-6729 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.0.0 > Reporter: Matthias J. Sax > Assignee: Guozhang Wang > Priority: Blocker > Fix For: 2.0.0 > > > With KIP-182 we reworked Streams API largely and introduced a regression into > 1.0 code base. If a KTable is populated from a source topic, ie, > StreamsBuilder.table() -- the KTable does create its own changelog topic. > However, in older releases (0.11 or older), we don't create a changelog topic > but use the user specified source topic instead. > We want to reintroduce this optimization to reduce the load (storage and > write) on the broker side for this case. -- This message was sent by Atlassian JIRA (v7.6.3#76005)