[ 
https://issues.apache.org/jira/browse/KAFKA-4601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15802904#comment-15802904
 ] 

Guozhang Wang commented on KAFKA-4601:
--------------------------------------

[~bbejeck] Note that it may be more complex than it sounds: today we translate 
the DSL operators to underlying processor nodes in topology independently (i.e. 
one at a time), so when we are translating a join after an aggregate, we do not 
know what processors have been created so far. To solve this specific issue we 
can make some workaround, but a general solution would be extending the 
translation mechanism to be "global": this can be as complex as query 
optimization.

> Avoid duplicated repartitioning in KStream DSL
> ----------------------------------------------
>
>                 Key: KAFKA-4601
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4601
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Bill Bejeck
>              Labels: performance
>
> Consider the following DSL:
> {code}
> Stream<String, String> source = builder.stream(Serdes.String(), 
> Serdes.String(), "topic1").map(..);
>         KTable<String, Long> counts = source
>                 .groupByKey()
>                 .count("Counts");
>         KStream<String, String> sink = source.leftJoin(counts, ..);
> {code}
> The resulted topology looks like this:
> {code}
> ProcessorTopology:
>                               KSTREAM-SOURCE-0000000000:
>                                       topics:         [topic1]
>                                       children:       [KSTREAM-MAP-0000000001]
>                               KSTREAM-MAP-0000000001:
>                                       children:       
> [KSTREAM-FILTER-0000000004, KSTREAM-FILTER-0000000007]
>                               KSTREAM-FILTER-0000000004:
>                                       children:       
> [KSTREAM-SINK-0000000003]
>                               KSTREAM-SINK-0000000003:
>                                       topic:          X-Counts-repartition
>                               KSTREAM-FILTER-0000000007:
>                                       children:       
> [KSTREAM-SINK-0000000006]
>                               KSTREAM-SINK-0000000006:
>                                       topic:          
> X-KSTREAM-MAP-0000000001-repartition
> ProcessorTopology:
>                               KSTREAM-SOURCE-0000000008:
>                                       topics:         
> [X-KSTREAM-MAP-0000000001-repartition]
>                                       children:       
> [KSTREAM-LEFTJOIN-0000000009]
>                               KSTREAM-LEFTJOIN-0000000009:
>                                       states:         [Counts]
>                               KSTREAM-SOURCE-0000000005:
>                                       topics:         [X-Counts-repartition]
>                                       children:       
> [KSTREAM-AGGREGATE-0000000002]
>                               KSTREAM-AGGREGATE-0000000002:
>                                       states:         [Counts]
> {code}
> I.e. there are two repartition topics, one for the aggregate and one for the 
> join, which not only introduce unnecessary overheads but also mess up the 
> processing ordering (users are expecting each record to go through 
> aggregation first then the join operator). And in order to get the following 
> simpler topology users today need to add a {{through}} operator after {{map}} 
> manually to enforce repartitioning.
> {code}
> ProcessorTopology:
>                               KSTREAM-SOURCE-0000000000:
>                                       topics:         [topic1]
>                                       children:       [KSTREAM-MAP-0000000001]
>                               KSTREAM-MAP-0000000001:
>                                       children:       
> [KSTREAM-SINK-0000000002]
>                               KSTREAM-SINK-0000000002:
>                                       topic:          topic 2
> ProcessorTopology:
>                               KSTREAM-SOURCE-0000000003:
>                                       topics:         [topic 2]
>                                       children:       
> [KSTREAM-AGGREGATE-0000000004, KSTREAM-LEFTJOIN-0000000005]
>                               KSTREAM-AGGREGATE-0000000004:
>                                       states:         [Counts]
>                               KSTREAM-LEFTJOIN-0000000005:
>                                       states:         [Counts]
> {code} 
> This kind of optimization should be automatic in Streams, which we can 
> consider doing when extending from one-operator-at-a-time translation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to