[ 
https://issues.apache.org/jira/browse/KAFKA-4835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-4835.
--------------------------------
    Resolution: Fixed

> Allow users control over repartitioning
> ---------------------------------------
>
>                 Key: KAFKA-4835
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4835
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: streams
>    Affects Versions: 0.10.2.0
>            Reporter: Michal Borowiecki
>            Priority: Major
>              Labels: needs-kip
>
> From 
> https://issues.apache.org/jira/browse/KAFKA-4601?focusedCommentId=15881030&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15881030
> ...it would be good to provide users more control over the repartitioning. 
> My use case is as follows (unrelated bits omitted for brevity):
> {code}
>               KTable<String, Activity> loggedInCustomers = builder
>                       .stream("customerLogins")
>                       .groupBy((key, activity) -> 
>                               activity.getCustomerRef())
>                       .reduce((first,second) -> second, loginStore());
>               
>               builder
>                       .stream("balanceUpdates")
>                       .map((key, activity) -> new KeyValue<>(
>                               activity.getCustomerRef(),
>                               activity))
>                       .join(loggedInCustomers, (activity, session) -> ...
>                       .to("sessions");
> {code}
> Both "groupBy" and "map" in the underlying implementation set the 
> repartitionRequired flag (since the key changes), and the aggregation/join 
> that follows will create the repartitioned topic.
> However, in our case I know that both input streams are already partitioned 
> by the customerRef value, which I'm mapping into the key (because it's 
> required by the join operation).
> So there are 2 unnecessary intermediate topics created with their associated 
> overhead, while the ultimate goal is simply to do a join on a value that we 
> already use to partition the original streams anyway.
> (Note, we don't have the option to re-implement the original input streams to 
> make customerRef the message key.)
> I think it would be better to allow the user to decide (from their knowledge 
> of the incoming streams) whether a repartition is mandatory on aggregation 
> and join operations (overloaded version of the methods with the 
> repartitionRequired flag exposed maybe?)
> An alternative would be to allow users to perform a join on a value other 
> than the key (a keyValueMapper parameter to join, like the one used for joins 
> with global tables), but I expect that to be more involved and error-prone to 
> use for people who don't understand the partitioning requirements well 
> (whereas it's safe for global tables).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to