Hi James, The proposal we are discussing is https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable
I'm not sure if it's been updated to reflect current thinking. -John On Fri, Sep 7, 2018 at 8:49 AM James Kwan <jwkwan2...@gmail.com> wrote: > I am new to this group and I found this subject interesting. Sounds like > you guys want to implement a join table of two streams? Is there somewhere > I can see the original requirement or proposal? > > > On Sep 7, 2018, at 8:13 AM, Jan Filipiak <jan.filip...@trivago.com> > wrote: > > > > > > On 05.09.2018 22:17, Adam Bellemare wrote: > >> I'm currently testing using a Windowed Store to store the highwater > mark. > >> By all indications this should work fine, with the caveat being that it > can > >> only resolve out-of-order arrival for up to the size of the window (ie: > >> 24h, 72h, etc). This would remove the possibility of it being unbounded > in > >> size. > >> > >> With regards to Jan's suggestion, I believe this is where we will have > to > >> remain in disagreement. While I do not disagree with your statement > about > >> there likely to be additional joins done in a real-world workflow, I do > not > >> see how you can conclusively deal with out-of-order arrival of > foreign-key > >> changes and subsequent joins. I have attempted what I think you have > >> proposed (without a high-water, using groupBy and reduce) and found > that if > >> the foreign key changes too quickly, or the load on a stream thread is > too > >> high, the joined messages will arrive out-of-order and be incorrectly > >> propagated, such that an intermediate event is represented as the final > >> event. > > Can you shed some light on your groupBy implementation. There must be > some sort of flaw in it. > > I have a suspicion where it is, I would just like to confirm. The idea > is bullet proof and it must be > > an implementation mess up. I would like to clarify before we draw a > conclusion. > > > >> Repartitioning the scattered events back to their original > >> partitions is the only way I know how to conclusively deal with > >> out-of-order events in a given time frame, and to ensure that the data > is > >> eventually consistent with the input events. > >> > >> If you have some code to share that illustrates your approach, I would > be > >> very grateful as it would remove any misunderstandings that I may have. > > > > ah okay you were looking for my code. I don't have something easily > readable here as its bloated with OO-patterns. > > > > its anyhow trivial: > > > > @Override > > public T apply(K aggKey, V value, T aggregate) > > { > > Map<U, V> currentStateAsMap = asMap(aggregate); << imaginary > > U toModifyKey = mapper.apply(value); > > << this is the place where people actually gonna have issues > and why you probably couldn't do it. we would need to find a solution here. > I didn't realize that yet. > > << we propagate the field in the joiner, so that we can pick > it up in an aggregate. Probably you have not thought of this in your > approach right? > > << I am very open to find a generic solution here. In my > honest opinion this is broken in KTableImpl.GroupBy that it looses the keys > and only maintains the aggregate key. > > << I abstracted it away back then way before i was thinking > of oneToMany join. That is why I didn't realize its significance here. > > << Opinions? > > > > for (V m : current) > > { > > currentStateAsMap.put(mapper.apply(m), m); > > } > > if (isAdder) > > { > > currentStateAsMap.put(toModifyKey, value); > > } > > else > > { > > currentStateAsMap.remove(toModifyKey); > > if(currentStateAsMap.isEmpty()){ > > return null; > > } > > } > > retrun asAggregateType(currentStateAsMap) > > } > > > > > > > > > > > >> > >> Thanks, > >> > >> Adam > >> > >> > >> > >> > >> > >> On Wed, Sep 5, 2018 at 3:35 PM, Jan Filipiak <jan.filip...@trivago.com> > >> wrote: > >> > >>> Thanks Adam for bringing Matthias to speed! > >>> > >>> about the differences. I think re-keying back should be optional at > best. > >>> I would say we return a KScatteredTable with reshuffle() returning > >>> KTable<originalKey,Joined> to make the backwards repartitioning > optional. > >>> I am also in a big favour of doing the out of order processing using > group > >>> by instead high water mark tracking. > >>> Just because unbounded growth is just scary + It saves us the header > stuff. > >>> > >>> I think the abstraction of always repartitioning back is just not so > >>> strong. Like the work has been done before we partition back and > grouping > >>> by something else afterwards is really common. > >>> > >>> > >>> > >>> > >>> > >>> > >>> On 05.09.2018 13:49, Adam Bellemare wrote: > >>> > >>>> Hi Matthias > >>>> > >>>> Thank you for your feedback, I do appreciate it! > >>>> > >>>> While name spacing would be possible, it would require to deserialize > >>>>> user headers what implies a runtime overhead. I would suggest to no > >>>>> namespace for now to avoid the overhead. If this becomes a problem in > >>>>> the future, we can still add name spacing later on. > >>>>> > >>>> Agreed. I will go with using a reserved string and document it. > >>>> > >>>> > >>>> > >>>> My main concern about the design it the type of the result KTable: If > I > >>>> understood the proposal correctly, > >>>> > >>>> > >>>> In your example, you have table1 and table2 swapped. Here is how it > works > >>>> currently: > >>>> > >>>> 1) table1 has the records that contain the foreign key within their > value. > >>>> table1 input stream: <a,(fk=A,bar=1)>, <b,(fk=A,bar=2)>, > <c,(fk=B,bar=3)> > >>>> table2 input stream: <A,X>, <B,Y> > >>>> > >>>> 2) A Value mapper is required to extract the foreign key. > >>>> table1 foreign key mapper: ( value => value.fk ) > >>>> > >>>> The mapper is applied to each element in table1, and a new combined > key is > >>>> made: > >>>> table1 mapped: <A-a, (fk=A,bar=1)>, <A-b, (fk=A,bar=2)>, <B-c, > >>>> (fk=B,bar=3)> > >>>> > >>>> 3) The rekeyed events are copartitioned with table2: > >>>> > >>>> a) Stream Thread with Partition 0: > >>>> RepartitionedTable1: <A-a, (fk=A,bar=1)>, <A-b, (fk=A,bar=2)> > >>>> Table2: <A,X> > >>>> > >>>> b) Stream Thread with Partition 1: > >>>> RepartitionedTable1: <B-c, (fk=B,bar=3)> > >>>> Table2: <B,Y> > >>>> > >>>> 4) From here, they can be joined together locally by applying the > joiner > >>>> function. > >>>> > >>>> > >>>> > >>>> At this point, Jan's design and my design deviate. My design goes on > to > >>>> repartition the data post-join and resolve out-of-order arrival of > >>>> records, > >>>> finally returning the data keyed just the original key. I do not > expose > >>>> the > >>>> CombinedKey or any of the internals outside of the joinOnForeignKey > >>>> function. This does make for larger footprint, but it removes all > agency > >>>> for resolving out-of-order arrivals and handling CombinedKeys from the > >>>> user. I believe that this makes the function much easier to use. > >>>> > >>>> Let me know if this helps resolve your questions, and please feel > free to > >>>> add anything else on your mind. > >>>> > >>>> Thanks again, > >>>> Adam > >>>> > >>>> > >>>> > >>>> > >>>> On Tue, Sep 4, 2018 at 8:36 PM, Matthias J. Sax < > matth...@confluent.io> > >>>> wrote: > >>>> > >>>> Hi, > >>>>> I am just catching up on this thread. I did not read everything so > far, > >>>>> but want to share couple of initial thoughts: > >>>>> > >>>>> > >>>>> > >>>>> Headers: I think there is a fundamental difference between header > usage > >>>>> in this KIP and KP-258. For 258, we add headers to changelog topic > that > >>>>> are owned by Kafka Streams and nobody else is supposed to write into > >>>>> them. In fact, no user header are written into the changelog topic > and > >>>>> thus, there are not conflicts. > >>>>> > >>>>> Nevertheless, I don't see a big issue with using headers within > Streams. > >>>>> As long as we document it, we can have some "reserved" header keys > and > >>>>> users are not allowed to use when processing data with Kafka Streams. > >>>>> IMHO, this should be ok. > >>>>> > >>>>> I think there is a safe way to avoid conflicts, since these headers > are > >>>>>> only needed in internal topics (I think): > >>>>>> For internal and changelog topics, we can namespace all headers: > >>>>>> * user-defined headers are namespaced as "external." + headerKey > >>>>>> * internal headers are namespaced as "internal." + headerKey > >>>>>> > >>>>> While name spacing would be possible, it would require to deserialize > >>>>> user headers what implies a runtime overhead. I would suggest to no > >>>>> namespace for now to avoid the overhead. If this becomes a problem in > >>>>> the future, we can still add name spacing later on. > >>>>> > >>>>> > >>>>> > >>>>> My main concern about the design it the type of the result KTable: > If I > >>>>> understood the proposal correctly, > >>>>> > >>>>> KTable<K1,V1> table1 = ... > >>>>> KTable<K2,V2> table2 = ... > >>>>> > >>>>> KTable<K1,V3> joinedTable = table1.join(table2,...); > >>>>> > >>>>> implies that the `joinedTable` has the same key as the left input > table. > >>>>> IMHO, this does not work because if table2 contains multiple rows > that > >>>>> join with a record in table1 (what is the main purpose of a foreign > key > >>>>> join), the result table would only contain a single join result, but > not > >>>>> multiple. > >>>>> > >>>>> Example: > >>>>> > >>>>> table1 input stream: <A,X> > >>>>> table2 input stream: <a,(A,1)>, <b,(A,2)> > >>>>> > >>>>> We use table2 value a foreign key to table1 key (ie, "A" joins). If > the > >>>>> result key is the same key as key of table1, this implies that the > >>>>> result can either be <A, join(X,1)> or <A, join(X,2)> but not both. > >>>>> Because the share the same key, whatever result record we emit later, > >>>>> overwrite the previous result. > >>>>> > >>>>> This is the reason why Jan originally proposed to use a combination > of > >>>>> both primary keys of the input tables as key of the output table. > This > >>>>> makes the keys of the output table unique and we can store both in > the > >>>>> output table: > >>>>> > >>>>> Result would be <A-a, join(X,1)>, <A-b, join(X,2)> > >>>>> > >>>>> > >>>>> Thoughts? > >>>>> > >>>>> > >>>>> -Matthias > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> On 9/4/18 1:36 PM, Jan Filipiak wrote: > >>>>> > >>>>>> Just on remark here. > >>>>>> The high-watermark could be disregarded. The decision about the > forward > >>>>>> depends on the size of the aggregated map. > >>>>>> Only 1 element long maps would be unpacked and forwarded. 0 element > maps > >>>>>> would be published as delete. Any other count > >>>>>> of map entries is in "waiting for correct deletes to arrive"-state. > >>>>>> > >>>>>> On 04.09.2018 21:29, Adam Bellemare wrote: > >>>>>> > >>>>>>> It does look like I could replace the second repartition store and > >>>>>>> highwater store with a groupBy and reduce. However, it looks like > I > >>>>>>> would > >>>>>>> still need to store the highwater value within the materialized > store, > >>>>>>> > >>>>>> to > >>>>>> compare the arrival of out-of-order records (assuming my > understanding > >>>>>> of > >>>>>> THIS is correct...). This in effect is the same as the design I have > >>>>>> now, > >>>>>> just with the two tables merged together. > > > >