[ 
https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15359835#comment-15359835
 ] 

Guozhang Wang commented on KAFKA-3705:
--------------------------------------

Yes, this does finally clarify your scenario, thanks!.

I think the change<> pair can still help in your case, because it has the 
benefit that for aggregations for example, you have the clear information that 
"subtract the old value, and add the new value" instead of depending on whether 
the returned value is null. For example, the Streams DSL defines the 
aggregation operator in the following way (note that in your customized 
implementation you do not need to strictly follow the same pattern, but just to 
illustrate this idea):

{code}
<T> KTable<K, T> aggregate(Initializer<T> initializer,
                               Aggregator<K, V, T> adder,
                               Aggregator<K, V, T> substractor, ...);
{code}


Let me try again with your example code and the aggregation pattern in the 
above Streams DSL, and if you do not agree let's have a small skype chat :)


1. Suppose your current value in Table B is

B.PK => B.V.old, which contains A.PK.old, C.PK, etc.

And when you join tables A and B, you repartition the stream B by A.PK while 
still maintaining the message format as B.BK => B.V, and the join result is in 
the format of: 

B.PK => join<B.V, A.V>


2. Now suppose you have an update on Table B, as B.PK => B.V.new, which 
contains A.PK.new, C.PK (same value), etc. And suppose it is represented as a 
change pair of {old, new}, i.e.

B.PK => {B.V.old, B.V.new}, or more specifically:

B.PK => {<A.PK.old. C.PK, ...>, <A.PK.new, C.PK, ...>}


3. When you repartition it based on A.PK value, this will result in two pairs 
sending to potentially two different partitions, as:

B.PK => {B.V.old, null}   (sent to partition1)

B.PK => {null, B.V.new}    (sent to partition2)


4. These two records will be joined independently at two processors, each 
fetching one of the re-partitioned topic partition, and the result is:

B.PK => {joined(B.V.old, A.V.old), null}   (here A.V.old corresponds to the 
value for key A.PK.old in Table A)

B.PK => {null, joined(B.V.new, A.V.new)}   (here A.V.new corresponds to the 
value for key A.PK.new in Table A)


and then they will be sent to the second topic that is partitioned on C.PK, and 
since their C.PK value is the same, they will be sent to the same partition, 
but in arbitrary order.


5. The aggregation function consumes from the second re-partition topic based 
on C.PK, and does the aggregation by 1) call a subtract function on the old 
value of the pair, and then 2) call an add function on the new value of the 
pair, and if the value is null, skip that call. And more specifically the 
subtract / add functions look like:

{code}
List<Joined<A,B>> subtractor.apply(C key, Joined<A,B> value, List<Joined<A,B>> 
current)
{
   current.remove(key)

   return m.entrySet.asList
}

List<Joined<A,B>> adder.apply(C key, Joined<A,B> value, List<Joined<A,B>> 
current)
{
   current.put(key, value)

   return m.entrySet.asList
}
{code}

And based on the order these two records are received, we will either call 
{{subtract(C.PK, joined(B.V.old, A.V.old), current)}} first, and then 
{{add(C.PK, joined(B.V.new, A.V.new), current}}, or vice versa, and either way 
it is correct, since {{B.V.old}} and {{B.V.new}} are different keys.

> Support non-key joining in KTable
> ---------------------------------
>
>                 Key: KAFKA-3705
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3705
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Liquan Pei
>              Labels: api
>             Fix For: 0.10.1.0
>
>
> Today in Kafka Streams DSL, KTable joins are only based on keys. If users 
> want to join a KTable A by key {{a}} with another KTable B by key {{b}} but 
> with a "foreign key" {{a}}, and assuming they are read from two topics which 
> are partitioned on {{a}} and {{b}} respectively, they need to do the 
> following pattern:
> {code}
> tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' 
> is partitioned on "a"
> tableA.join(tableB', joiner);
> {code}
> Even if these two tables are read from two topics which are already 
> partitioned on {{a}}, users still need to do the pre-aggregation in order to 
> make the two joining streams to be on the same key. This is a draw-back from 
> programability and we should fix it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to