[ https://issues.apache.org/jira/browse/KAFKA-6729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16481999#comment-16481999 ]
ASF GitHub Bot commented on KAFKA-6729: --------------------------------------- guozhangwang closed pull request #5038: KAFKA-6729: Follow up; disable logging for source KTable. URL: https://github.com/apache/kafka/pull/5038 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index 480794c5955..0a19b4eb0c0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -72,6 +72,9 @@ public InternalStreamsBuilder(final InternalTopologyBuilder internalTopologyBuil public <K, V> KTable<K, V> table(final String topic, final ConsumedInternal<K, V> consumed, final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) { + // explicitly disable logging for source table materialized stores + materialized.withLoggingDisabled(); + final StoreBuilder<KeyValueStore<K, V>> storeBuilder = new KeyValueStoreMaterializer<>(materialized) .materialize(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 575ac012bf5..1651bbd90b3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -120,7 +120,7 @@ private Map<Integer, Set<String>> nodeGroups = null; - interface StateStoreFactory { + public interface StateStoreFactory { Set<String> users(); boolean loggingEnabled(); StateStore build(); @@ -1799,4 +1799,8 @@ public void updateSubscribedTopics(final Set<String> topics, final String logPre public synchronized Set<String> getSourceTopicNames() { return sourceTopicNames; } + + public synchronized Map<String, StateStoreFactory> getStateStores() { + return stateFactories; + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java index 0a1e6df3622..37101de344a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java @@ -279,7 +279,11 @@ public void shouldReuseSourceTopicAsChangelogs() { final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(builder.build()); - assertThat(internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.keySet(), equalTo(Collections.singleton("topic"))); + assertThat(internalTopologyBuilder.getStateStores().keySet(), equalTo(Collections.singleton("store"))); + + assertThat(internalTopologyBuilder.getStateStores().get("store").loggingEnabled(), equalTo(false)); + + assertThat(internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.isEmpty(), equalTo(true)); } @Test(expected = TopologyException.class) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index cc507d60ca5..37b03fa3418 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -796,7 +796,6 @@ public Object apply(final Object value1, final Object value2) { expectedCreatedInternalTopics.put(applicationId + "-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 4); expectedCreatedInternalTopics.put(applicationId + "-KTABLE-AGGREGATE-STATE-STORE-0000000006-changelog", 4); expectedCreatedInternalTopics.put(applicationId + "-KSTREAM-MAP-0000000001-repartition", 4); - expectedCreatedInternalTopics.put("topic3", 4); // the source topic is reused as changelog topics // check if all internal topics were created as expected assertThat(mockInternalTopicManager.readyTopics, equalTo(expectedCreatedInternalTopics)); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > KTable should use user source topics if possible and not create changelog > topic > ------------------------------------------------------------------------------- > > Key: KAFKA-6729 > URL: https://issues.apache.org/jira/browse/KAFKA-6729 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.0.0 > Reporter: Matthias J. Sax > Assignee: Guozhang Wang > Priority: Blocker > Fix For: 2.0.0 > > > With KIP-182 we reworked Streams API largely and introduced a regression into > 1.0 code base. If a KTable is populated from a source topic, ie, > StreamsBuilder.table() -- the KTable does create its own changelog topic. > However, in older releases (0.11 or older), we don't create a changelog topic > but use the user specified source topic instead. > We want to reintroduce this optimization to reduce the load (storage and > write) on the broker side for this case. -- This message was sent by Atlassian JIRA (v7.6.3#76005)