This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a0e0028  MINOR: add test for repartition/source-topic/changelog 
optimization (#9668)
a0e0028 is described below

commit a0e0028b16ae1b7b4a1dca1715b5b130187b334a
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Dec 23 11:56:55 2020 -0800

    MINOR: add test for repartition/source-topic/changelog optimization (#9668)
    
    If topology optimization is enabled, KafkaStreams does not create store 
changelog topics but re-uses source input topics if possible. However, this 
optimization should not be applied to internal repartition topics, because 
those are actively purged.
    
    Reviewers: A. Sophie Blee-Goldman <[email protected]>
---
 .../apache/kafka/streams/StreamsBuilderTest.java   | 31 +++++++++++++++++++++-
 1 file changed, 30 insertions(+), 1 deletion(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 8a9c165..d687e15 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams;
 
-import java.util.regex.Pattern;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -62,6 +61,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.regex.Pattern;
 
 import static java.util.Arrays.asList;
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -463,6 +463,35 @@ public class StreamsBuilderTest {
     }
 
     @Test
+    public void shouldNotReuseRepartitionTopicAsChangelogs() {
+        final String topic = "topic";
+        builder.<Long, 
String>stream(topic).repartition().toTable(Materialized.as("store"));
+        final Properties props = StreamsTestUtils.getStreamsConfig("appId");
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.OPTIMIZE);
+        final Topology topology = builder.build(props);
+
+        final InternalTopologyBuilder internalTopologyBuilder = 
TopologyWrapper.getInternalTopologyBuilder(topology);
+        internalTopologyBuilder.rewriteTopology(new StreamsConfig(props));
+
+        assertThat(
+            internalTopologyBuilder.buildTopology().storeToChangelogTopic(),
+            equalTo(Collections.singletonMap("store", "appId-store-changelog"))
+        );
+        assertThat(
+            internalTopologyBuilder.stateStores().keySet(),
+            equalTo(Collections.singleton("store"))
+        );
+        assertThat(
+            
internalTopologyBuilder.stateStores().get("store").loggingEnabled(),
+            equalTo(true)
+        );
+        assertThat(
+            
internalTopologyBuilder.topicGroups().get(1).stateChangelogTopics.keySet(),
+            equalTo(Collections.singleton("appId-store-changelog"))
+        );
+    }
+
+    @Test
     public void shouldNotReuseSourceTopicAsChangelogsByDefault() {
         final String topic = "topic";
         builder.table(topic, Materialized.<Long, String, KeyValueStore<Bytes, 
byte[]>>as("store"));

Reply via email to