[ 
https://issues.apache.org/jira/browse/KAFKA-6729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16481999#comment-16481999
 ] 

ASF GitHub Bot commented on KAFKA-6729:
---------------------------------------

guozhangwang closed pull request #5038: KAFKA-6729: Follow up; disable logging 
for source KTable.
URL: https://github.com/apache/kafka/pull/5038
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 480794c5955..0a19b4eb0c0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -72,6 +72,9 @@ public InternalStreamsBuilder(final InternalTopologyBuilder 
internalTopologyBuil
     public <K, V> KTable<K, V> table(final String topic,
                                      final ConsumedInternal<K, V> consumed,
                                      final MaterializedInternal<K, V, 
KeyValueStore<Bytes, byte[]>> materialized) {
+        // explicitly disable logging for source table materialized stores
+        materialized.withLoggingDisabled();
+
         final StoreBuilder<KeyValueStore<K, V>> storeBuilder = new 
KeyValueStoreMaterializer<>(materialized)
                 .materialize();
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 575ac012bf5..1651bbd90b3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -120,7 +120,7 @@
 
     private Map<Integer, Set<String>> nodeGroups = null;
 
-    interface StateStoreFactory {
+    public interface StateStoreFactory {
         Set<String> users();
         boolean loggingEnabled();
         StateStore build();
@@ -1799,4 +1799,8 @@ public void updateSubscribedTopics(final Set<String> 
topics, final String logPre
     public synchronized Set<String> getSourceTopicNames() {
         return sourceTopicNames;
     }
+
+    public synchronized Map<String, StateStoreFactory> getStateStores() {
+        return stateFactories;
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 0a1e6df3622..37101de344a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -279,7 +279,11 @@ public void shouldReuseSourceTopicAsChangelogs() {
 
         final InternalTopologyBuilder internalTopologyBuilder = 
TopologyWrapper.getInternalTopologyBuilder(builder.build());
 
-        
assertThat(internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.keySet(),
 equalTo(Collections.singleton("topic")));
+        assertThat(internalTopologyBuilder.getStateStores().keySet(), 
equalTo(Collections.singleton("store")));
+
+        
assertThat(internalTopologyBuilder.getStateStores().get("store").loggingEnabled(),
 equalTo(false));
+
+        
assertThat(internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.isEmpty(),
 equalTo(true));
     }
     
     @Test(expected = TopologyException.class)
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index cc507d60ca5..37b03fa3418 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -796,7 +796,6 @@ public Object apply(final Object value1, final Object 
value2) {
         expectedCreatedInternalTopics.put(applicationId + 
"-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 4);
         expectedCreatedInternalTopics.put(applicationId + 
"-KTABLE-AGGREGATE-STATE-STORE-0000000006-changelog", 4);
         expectedCreatedInternalTopics.put(applicationId + 
"-KSTREAM-MAP-0000000001-repartition", 4);
-        expectedCreatedInternalTopics.put("topic3", 4);     // the source 
topic is reused as changelog topics
 
         // check if all internal topics were created as expected
         assertThat(mockInternalTopicManager.readyTopics, 
equalTo(expectedCreatedInternalTopics));


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KTable should use user source topics if possible and not create changelog 
> topic
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-6729
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6729
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Matthias J. Sax
>            Assignee: Guozhang Wang
>            Priority: Blocker
>             Fix For: 2.0.0
>
>
> With KIP-182 we reworked Streams API largely and introduced a regression into 
> 1.0 code base. If a KTable is populated from a source topic, ie, 
> StreamsBuilder.table() -- the KTable does create its own changelog topic. 
> However, in older releases (0.11 or older), we don't create a changelog topic 
> but use the user specified source topic instead.
> We want to reintroduce this optimization to reduce the load (storage and 
> write) on the broker side for this case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to