This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a0e0028 MINOR: add test for repartition/source-topic/changelog
optimization (#9668)
a0e0028 is described below
commit a0e0028b16ae1b7b4a1dca1715b5b130187b334a
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Dec 23 11:56:55 2020 -0800
MINOR: add test for repartition/source-topic/changelog optimization (#9668)
If topology optimization is enabled, KafkaStreams does not create store
changelog topics but re-uses source input topics if possible. However, this
optimization should not be applied to internal repartition topics, because
those are actively purged.
Reviewers: A. Sophie Blee-Goldman <[email protected]>
---
.../apache/kafka/streams/StreamsBuilderTest.java | 31 +++++++++++++++++++++-
1 file changed, 30 insertions(+), 1 deletion(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 8a9c165..d687e15 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams;
-import java.util.regex.Pattern;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
@@ -62,6 +61,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.regex.Pattern;
import static java.util.Arrays.asList;
import static org.hamcrest.CoreMatchers.equalTo;
@@ -463,6 +463,35 @@ public class StreamsBuilderTest {
}
@Test
+ public void shouldNotReuseRepartitionTopicAsChangelogs() {
+ final String topic = "topic";
+ builder.<Long,
String>stream(topic).repartition().toTable(Materialized.as("store"));
+ final Properties props = StreamsTestUtils.getStreamsConfig("appId");
+ props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG,
StreamsConfig.OPTIMIZE);
+ final Topology topology = builder.build(props);
+
+ final InternalTopologyBuilder internalTopologyBuilder =
TopologyWrapper.getInternalTopologyBuilder(topology);
+ internalTopologyBuilder.rewriteTopology(new StreamsConfig(props));
+
+ assertThat(
+ internalTopologyBuilder.buildTopology().storeToChangelogTopic(),
+ equalTo(Collections.singletonMap("store", "appId-store-changelog"))
+ );
+ assertThat(
+ internalTopologyBuilder.stateStores().keySet(),
+ equalTo(Collections.singleton("store"))
+ );
+ assertThat(
+
internalTopologyBuilder.stateStores().get("store").loggingEnabled(),
+ equalTo(true)
+ );
+ assertThat(
+
internalTopologyBuilder.topicGroups().get(1).stateChangelogTopics.keySet(),
+ equalTo(Collections.singleton("appId-store-changelog"))
+ );
+ }
+
+ @Test
public void shouldNotReuseSourceTopicAsChangelogsByDefault() {
final String topic = "topic";
builder.table(topic, Materialized.<Long, String, KeyValueStore<Bytes,
byte[]>>as("store"));