[ https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15359835#comment-15359835 ]
Guozhang Wang commented on KAFKA-3705: -------------------------------------- Yes, this does finally clarify your scenario, thanks!. I think the change<> pair can still help in your case, because it has the benefit that for aggregations for example, you have the clear information that "subtract the old value, and add the new value" instead of depending on whether the returned value is null. For example, the Streams DSL defines the aggregation operator in the following way (note that in your customized implementation you do not need to strictly follow the same pattern, but just to illustrate this idea): {code} <T> KTable<K, T> aggregate(Initializer<T> initializer, Aggregator<K, V, T> adder, Aggregator<K, V, T> substractor, ...); {code} Let me try again with your example code and the aggregation pattern in the above Streams DSL, and if you do not agree let's have a small skype chat :) 1. Suppose your current value in Table B is B.PK => B.V.old, which contains A.PK.old, C.PK, etc. And when you join tables A and B, you repartition the stream B by A.PK while still maintaining the message format as B.BK => B.V, and the join result is in the format of: B.PK => join<B.V, A.V> 2. Now suppose you have an update on Table B, as B.PK => B.V.new, which contains A.PK.new, C.PK (same value), etc. And suppose it is represented as a change pair of {old, new}, i.e. B.PK => {B.V.old, B.V.new}, or more specifically: B.PK => {<A.PK.old. C.PK, ...>, <A.PK.new, C.PK, ...>} 3. When you repartition it based on A.PK value, this will result in two pairs sending to potentially two different partitions, as: B.PK => {B.V.old, null} (sent to partition1) B.PK => {null, B.V.new} (sent to partition2) 4. These two records will be joined independently at two processors, each fetching one of the re-partitioned topic partition, and the result is: B.PK => {joined(B.V.old, A.V.old), null} (here A.V.old corresponds to the value for key A.PK.old in Table A) B.PK => {null, joined(B.V.new, A.V.new)} (here A.V.new corresponds to the value for key A.PK.new in Table A) and then they will be sent to the second topic that is partitioned on C.PK, and since their C.PK value is the same, they will be sent to the same partition, but in arbitrary order. 5. The aggregation function consumes from the second re-partition topic based on C.PK, and does the aggregation by 1) call a subtract function on the old value of the pair, and then 2) call an add function on the new value of the pair, and if the value is null, skip that call. And more specifically the subtract / add functions look like: {code} List<Joined<A,B>> subtractor.apply(C key, Joined<A,B> value, List<Joined<A,B>> current) { current.remove(key) return m.entrySet.asList } List<Joined<A,B>> adder.apply(C key, Joined<A,B> value, List<Joined<A,B>> current) { current.put(key, value) return m.entrySet.asList } {code} And based on the order these two records are received, we will either call {{subtract(C.PK, joined(B.V.old, A.V.old), current)}} first, and then {{add(C.PK, joined(B.V.new, A.V.new), current}}, or vice versa, and either way it is correct, since {{B.V.old}} and {{B.V.new}} are different keys. > Support non-key joining in KTable > --------------------------------- > > Key: KAFKA-3705 > URL: https://issues.apache.org/jira/browse/KAFKA-3705 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Guozhang Wang > Assignee: Liquan Pei > Labels: api > Fix For: 0.10.1.0 > > > Today in Kafka Streams DSL, KTable joins are only based on keys. If users > want to join a KTable A by key {{a}} with another KTable B by key {{b}} but > with a "foreign key" {{a}}, and assuming they are read from two topics which > are partitioned on {{a}} and {{b}} respectively, they need to do the > following pattern: > {code} > tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' > is partitioned on "a" > tableA.join(tableB', joiner); > {code} > Even if these two tables are read from two topics which are already > partitioned on {{a}}, users still need to do the pre-aggregation in order to > make the two joining streams to be on the same key. This is a draw-back from > programability and we should fix it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)