[ 
https://issues.apache.org/jira/browse/KAFKA-4601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15881030#comment-15881030
 ] 

Michal Borowiecki commented on KAFKA-4601:
------------------------------------------

Don't know if this belongs in this ticket or warrants a separate one, but I'd 
suggest, instead of trying to rely on kstreams doing more automatic 
optimization, it would be good to provide users more control over the 
repartitioning. 
My use case is as follows (unrelated bits omitted for brevity):
{code}
                KTable<String, Activity> loggedInCustomers = builder
                        .stream("customerLogins")
                        .groupBy((key, activity) -> 
                                activity.getCustomerRef())
                        .reduce((first,second) -> second, loginStore());
                
                builder
                        .stream("balanceUpdates")
                        .map((key, activity) -> new KeyValue<>(
                                activity.getCustomerRef(),
                                activity))
                        .join(loggedInCustomers, (activity, session) -> ...
                        .to("sessions");
{code}
Both "groupBy" and "map" in the underlying implementation set the 
repartitionRequired flag (since the key changes), and the aggregation/join that 
follows will create the repartitioned topic.
However, in our case I know that both input streams are already partitioned by 
the customerRef value, which I'm mapping into the key (because it's required by 
the join operation).
So there are 2 unnecessary intermediate topics created with their associated 
overhead, while the ultimate goal is simply to do a join on a value that we 
already use to partition the original streams anyway.
(Note, we don't have the option to re-implement the original input streams to 
make customerRef the message key.)

I think it would be better to allow the user to decide (from their knowledge of 
the incoming streams) whether a repartition is mandatory on aggregation and 
join operations (overloaded version of the methods with the repartitionRequired 
flag exposed maybe?)
An alternative would be to allow users to perform a join on a value other than 
the key (a keyValueMapper parameter to join, like the one used for joins with 
global tables), but I expect that to be more involved and error-prone to use 
for people who don't understand the partitioning requirements well (whereas 
it's safe for global tables).


> Avoid duplicated repartitioning in KStream DSL
> ----------------------------------------------
>
>                 Key: KAFKA-4601
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4601
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>              Labels: performance
>
> Consider the following DSL:
> {code}
> Stream<String, String> source = builder.stream(Serdes.String(), 
> Serdes.String(), "topic1").map(..);
>         KTable<String, Long> counts = source
>                 .groupByKey()
>                 .count("Counts");
>         KStream<String, String> sink = source.leftJoin(counts, ..);
> {code}
> The resulted topology looks like this:
> {code}
> ProcessorTopology:
>                               KSTREAM-SOURCE-0000000000:
>                                       topics:         [topic1]
>                                       children:       [KSTREAM-MAP-0000000001]
>                               KSTREAM-MAP-0000000001:
>                                       children:       
> [KSTREAM-FILTER-0000000004, KSTREAM-FILTER-0000000007]
>                               KSTREAM-FILTER-0000000004:
>                                       children:       
> [KSTREAM-SINK-0000000003]
>                               KSTREAM-SINK-0000000003:
>                                       topic:          X-Counts-repartition
>                               KSTREAM-FILTER-0000000007:
>                                       children:       
> [KSTREAM-SINK-0000000006]
>                               KSTREAM-SINK-0000000006:
>                                       topic:          
> X-KSTREAM-MAP-0000000001-repartition
> ProcessorTopology:
>                               KSTREAM-SOURCE-0000000008:
>                                       topics:         
> [X-KSTREAM-MAP-0000000001-repartition]
>                                       children:       
> [KSTREAM-LEFTJOIN-0000000009]
>                               KSTREAM-LEFTJOIN-0000000009:
>                                       states:         [Counts]
>                               KSTREAM-SOURCE-0000000005:
>                                       topics:         [X-Counts-repartition]
>                                       children:       
> [KSTREAM-AGGREGATE-0000000002]
>                               KSTREAM-AGGREGATE-0000000002:
>                                       states:         [Counts]
> {code}
> I.e. there are two repartition topics, one for the aggregate and one for the 
> join, which not only introduce unnecessary overheads but also mess up the 
> processing ordering (users are expecting each record to go through 
> aggregation first then the join operator). And in order to get the following 
> simpler topology users today need to add a {{through}} operator after {{map}} 
> manually to enforce repartitioning.
> {code}
> ProcessorTopology:
>                               KSTREAM-SOURCE-0000000000:
>                                       topics:         [topic1]
>                                       children:       [KSTREAM-MAP-0000000001]
>                               KSTREAM-MAP-0000000001:
>                                       children:       
> [KSTREAM-SINK-0000000002]
>                               KSTREAM-SINK-0000000002:
>                                       topic:          topic 2
> ProcessorTopology:
>                               KSTREAM-SOURCE-0000000003:
>                                       topics:         [topic 2]
>                                       children:       
> [KSTREAM-AGGREGATE-0000000004, KSTREAM-LEFTJOIN-0000000005]
>                               KSTREAM-AGGREGATE-0000000004:
>                                       states:         [Counts]
>                               KSTREAM-LEFTJOIN-0000000005:
>                                       states:         [Counts]
> {code} 
> This kind of optimization should be automatic in Streams, which we can 
> consider doing when extending from one-operator-at-a-time translation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to