[ https://issues.apache.org/jira/browse/KAFKA-6761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16451258#comment-16451258 ]
ASF GitHub Bot commented on KAFKA-6761: --------------------------------------- bbejeck opened a new pull request #4923: KAFKA-6761: Part 1 of 3; Graph nodes URL: https://github.com/apache/kafka/pull/4923 This PR supersedes PR #4654 as it was growing too large. All comments in that PR should be addressed here. I will attempt to break the PRs for the topology optimization effort into 3 PRs total and will follow this general plan: 1. This PR only adds the graph nodes and graph. The graph nodes will hold the information used to make calls to the `InternalTopologyBuilder` when using the DSL. Graph nodes are stored in the `StreamsTopologyGraph` until the final topology needs building then the graph is traversed and optimizations are made at that point. There are no tests in this PR relying on the follow-up PR to use all current streams tests, which should suffice. 2. PR 2 will intercept all DSL calls and build the graph. The `InternalStreamsBuilder` uses the graph to provide the required info `InternalTopologyBuilder` and build a topology. The condition of satisfaction for this PR is that all current unit, integration and system tests pass using the graph. 3. PR 3 adds some optimizations mainly automically repartitioning for operations that may modify a key and have child operations that would normally create a separate repartition topic, saving possible unnecessary repartiton topics. For example the following topology: ``` KStream<String, String> mappedStreamOther = inputStream.map(new KeyValueMapper<String, String, KeyValue<? extends String, ? extends String>>() { @Override public KeyValue<? extends String, ? extends String> apply(String key, String value) { return KeyValue.pair(key.substring(0, 3), value); } }); mappedStreamOther.groupByKey().windowedBy(TimeWindows.of(5000)).count().toStream().to("count-one-out"); mappedStreamOther.groupByKey().windowedBy(TimeWindows.of(10000)).count().toStream().to("count-two-out"); mappedStreamOther.groupByKey().windowedBy(TimeWindows.of(15000)).count().toStream().to("count-three-out"); ``` would create 3 repartion topics, but after applying an optimization strategy, only one is created. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce Kafka Streams Footprint > ------------------------------ > > Key: KAFKA-6761 > URL: https://issues.apache.org/jira/browse/KAFKA-6761 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Bill Bejeck > Assignee: Bill Bejeck > Priority: Major > Fix For: 1.2.0 > > > The persistent storage footprint of a Kafka Streams application contains the > following aspects: > # The internal topics created on the Kafka cluster side. > # The materialized state stores on the Kafka Streams application instances > side. > There have been some questions about reducing these footprints, especially > since many of them are not necessary. For example, there are redundant > internal topics, as well as unnecessary state stores that takes up space but > also affect performance. When people are pushing Streams to production with > high traffic, this issue would be more common and severe. Reducing the > footprint of Streams have clear benefits for reducing resource utilization of > Kafka Streams applications, and also not creating pressure on broker's > capacities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)