Hi James,

The proposal we are discussing is
https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable

I'm not sure if it's been updated to reflect current thinking.

-John

On Fri, Sep 7, 2018 at 8:49 AM James Kwan <jwkwan2...@gmail.com> wrote:

> I am new to this group and I found this subject interesting.  Sounds like
> you guys want to implement a join table of two streams? Is there somewhere
> I can see the original requirement or proposal?
>
> > On Sep 7, 2018, at 8:13 AM, Jan Filipiak <jan.filip...@trivago.com>
> wrote:
> >
> >
> > On 05.09.2018 22:17, Adam Bellemare wrote:
> >> I'm currently testing using a Windowed Store to store the highwater
> mark.
> >> By all indications this should work fine, with the caveat being that it
> can
> >> only resolve out-of-order arrival for up to the size of the window (ie:
> >> 24h, 72h, etc). This would remove the possibility of it being unbounded
> in
> >> size.
> >>
> >> With regards to Jan's suggestion, I believe this is where we will have
> to
> >> remain in disagreement. While I do not disagree with your statement
> about
> >> there likely to be additional joins done in a real-world workflow, I do
> not
> >> see how you can conclusively deal with out-of-order arrival of
> foreign-key
> >> changes and subsequent joins. I have attempted what I think you have
> >> proposed (without a high-water, using groupBy and reduce) and found
> that if
> >> the foreign key changes too quickly, or the load on a stream thread is
> too
> >> high, the joined messages will arrive out-of-order and be incorrectly
> >> propagated, such that an intermediate event is represented as the final
> >> event.
> > Can you shed some light on your groupBy implementation. There must be
> some sort of flaw in it.
> > I have a suspicion where it is, I would just like to confirm. The idea
> is bullet proof and it must be
> > an implementation mess up. I would like to clarify before we draw a
> conclusion.
> >
> >>  Repartitioning the scattered events back to their original
> >> partitions is the only way I know how to conclusively deal with
> >> out-of-order events in a given time frame, and to ensure that the data
> is
> >> eventually consistent with the input events.
> >>
> >> If you have some code to share that illustrates your approach, I would
> be
> >> very grateful as it would remove any misunderstandings that I may have.
> >
> > ah okay you were looking for my code. I don't have something easily
> readable here as its bloated with OO-patterns.
> >
> > its anyhow trivial:
> >
> > @Override
> >    public T apply(K aggKey, V value, T aggregate)
> >    {
> >        Map<U, V> currentStateAsMap = asMap(aggregate); << imaginary
> >        U toModifyKey = mapper.apply(value);
> >            << this is the place where people actually gonna have issues
> and why you probably couldn't do it. we would need to find a solution here.
> I didn't realize that yet.
> >            << we propagate the field in the joiner, so that we can pick
> it up in an aggregate. Probably you have not thought of this in your
> approach right?
> >            << I am very open to find a generic solution here. In my
> honest opinion this is broken in KTableImpl.GroupBy that it looses the keys
> and only maintains the aggregate key.
> >            << I abstracted it away back then way before i was thinking
> of oneToMany join. That is why I didn't realize its significance here.
> >            << Opinions?
> >
> >        for (V m : current)
> >        {
> >            currentStateAsMap.put(mapper.apply(m), m);
> >        }
> >        if (isAdder)
> >        {
> >            currentStateAsMap.put(toModifyKey, value);
> >        }
> >        else
> >        {
> >            currentStateAsMap.remove(toModifyKey);
> >            if(currentStateAsMap.isEmpty()){
> >                return null;
> >            }
> >        }
> >        retrun asAggregateType(currentStateAsMap)
> >    }
> >
> >
> >
> >
> >
> >>
> >> Thanks,
> >>
> >> Adam
> >>
> >>
> >>
> >>
> >>
> >> On Wed, Sep 5, 2018 at 3:35 PM, Jan Filipiak <jan.filip...@trivago.com>
> >> wrote:
> >>
> >>> Thanks Adam for bringing Matthias to speed!
> >>>
> >>> about the differences. I think re-keying back should be optional at
> best.
> >>> I would say we return a KScatteredTable with reshuffle() returning
> >>> KTable<originalKey,Joined> to make the backwards repartitioning
> optional.
> >>> I am also in a big favour of doing the out of order processing using
> group
> >>> by instead high water mark tracking.
> >>> Just because unbounded growth is just scary + It saves us the header
> stuff.
> >>>
> >>> I think the abstraction of always repartitioning back is just not so
> >>> strong. Like the work has been done before we partition back and
> grouping
> >>> by something else afterwards is really common.
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> On 05.09.2018 13:49, Adam Bellemare wrote:
> >>>
> >>>> Hi Matthias
> >>>>
> >>>> Thank you for your feedback, I do appreciate it!
> >>>>
> >>>> While name spacing would be possible, it would require to deserialize
> >>>>> user headers what implies a runtime overhead. I would suggest to no
> >>>>> namespace for now to avoid the overhead. If this becomes a problem in
> >>>>> the future, we can still add name spacing later on.
> >>>>>
> >>>> Agreed. I will go with using a reserved string and document it.
> >>>>
> >>>>
> >>>>
> >>>> My main concern about the design it the type of the result KTable: If
> I
> >>>> understood the proposal correctly,
> >>>>
> >>>>
> >>>> In your example, you have table1 and table2 swapped. Here is how it
> works
> >>>> currently:
> >>>>
> >>>> 1) table1 has the records that contain the foreign key within their
> value.
> >>>> table1 input stream: <a,(fk=A,bar=1)>, <b,(fk=A,bar=2)>,
> <c,(fk=B,bar=3)>
> >>>> table2 input stream: <A,X>, <B,Y>
> >>>>
> >>>> 2) A Value mapper is required to extract the foreign key.
> >>>> table1 foreign key mapper: ( value => value.fk )
> >>>>
> >>>> The mapper is applied to each element in table1, and a new combined
> key is
> >>>> made:
> >>>> table1 mapped: <A-a, (fk=A,bar=1)>, <A-b, (fk=A,bar=2)>, <B-c,
> >>>> (fk=B,bar=3)>
> >>>>
> >>>> 3) The rekeyed events are copartitioned with table2:
> >>>>
> >>>> a) Stream Thread with Partition 0:
> >>>> RepartitionedTable1: <A-a, (fk=A,bar=1)>, <A-b, (fk=A,bar=2)>
> >>>> Table2: <A,X>
> >>>>
> >>>> b) Stream Thread with Partition 1:
> >>>> RepartitionedTable1: <B-c, (fk=B,bar=3)>
> >>>> Table2: <B,Y>
> >>>>
> >>>> 4) From here, they can be joined together locally by applying the
> joiner
> >>>> function.
> >>>>
> >>>>
> >>>>
> >>>> At this point, Jan's design and my design deviate. My design goes on
> to
> >>>> repartition the data post-join and resolve out-of-order arrival of
> >>>> records,
> >>>> finally returning the data keyed just the original key. I do not
> expose
> >>>> the
> >>>> CombinedKey or any of the internals outside of the joinOnForeignKey
> >>>> function. This does make for larger footprint, but it removes all
> agency
> >>>> for resolving out-of-order arrivals and handling CombinedKeys from the
> >>>> user. I believe that this makes the function much easier to use.
> >>>>
> >>>> Let me know if this helps resolve your questions, and please feel
> free to
> >>>> add anything else on your mind.
> >>>>
> >>>> Thanks again,
> >>>> Adam
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> On Tue, Sep 4, 2018 at 8:36 PM, Matthias J. Sax <
> matth...@confluent.io>
> >>>> wrote:
> >>>>
> >>>> Hi,
> >>>>> I am just catching up on this thread. I did not read everything so
> far,
> >>>>> but want to share couple of initial thoughts:
> >>>>>
> >>>>>
> >>>>>
> >>>>> Headers: I think there is a fundamental difference between header
> usage
> >>>>> in this KIP and KP-258. For 258, we add headers to changelog topic
> that
> >>>>> are owned by Kafka Streams and nobody else is supposed to write into
> >>>>> them. In fact, no user header are written into the changelog topic
> and
> >>>>> thus, there are not conflicts.
> >>>>>
> >>>>> Nevertheless, I don't see a big issue with using headers within
> Streams.
> >>>>> As long as we document it, we can have some "reserved" header keys
> and
> >>>>> users are not allowed to use when processing data with Kafka Streams.
> >>>>> IMHO, this should be ok.
> >>>>>
> >>>>> I think there is a safe way to avoid conflicts, since these headers
> are
> >>>>>> only needed in internal topics (I think):
> >>>>>> For internal and changelog topics, we can namespace all headers:
> >>>>>> * user-defined headers are namespaced as "external." + headerKey
> >>>>>> * internal headers are namespaced as "internal." + headerKey
> >>>>>>
> >>>>> While name spacing would be possible, it would require to deserialize
> >>>>> user headers what implies a runtime overhead. I would suggest to no
> >>>>> namespace for now to avoid the overhead. If this becomes a problem in
> >>>>> the future, we can still add name spacing later on.
> >>>>>
> >>>>>
> >>>>>
> >>>>> My main concern about the design it the type of the result KTable:
> If I
> >>>>> understood the proposal correctly,
> >>>>>
> >>>>> KTable<K1,V1> table1 = ...
> >>>>> KTable<K2,V2> table2 = ...
> >>>>>
> >>>>> KTable<K1,V3> joinedTable = table1.join(table2,...);
> >>>>>
> >>>>> implies that the `joinedTable` has the same key as the left input
> table.
> >>>>> IMHO, this does not work because if table2 contains multiple rows
> that
> >>>>> join with a record in table1 (what is the main purpose of a foreign
> key
> >>>>> join), the result table would only contain a single join result, but
> not
> >>>>> multiple.
> >>>>>
> >>>>> Example:
> >>>>>
> >>>>> table1 input stream: <A,X>
> >>>>> table2 input stream: <a,(A,1)>, <b,(A,2)>
> >>>>>
> >>>>> We use table2 value a foreign key to table1 key (ie, "A" joins). If
> the
> >>>>> result key is the same key as key of table1, this implies that the
> >>>>> result can either be <A, join(X,1)> or <A, join(X,2)> but not both.
> >>>>> Because the share the same key, whatever result record we emit later,
> >>>>> overwrite the previous result.
> >>>>>
> >>>>> This is the reason why Jan originally proposed to use a combination
> of
> >>>>> both primary keys of the input tables as key of the output table.
> This
> >>>>> makes the keys of the output table unique and we can store both in
> the
> >>>>> output table:
> >>>>>
> >>>>> Result would be <A-a, join(X,1)>, <A-b, join(X,2)>
> >>>>>
> >>>>>
> >>>>> Thoughts?
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> On 9/4/18 1:36 PM, Jan Filipiak wrote:
> >>>>>
> >>>>>> Just on remark here.
> >>>>>> The high-watermark could be disregarded. The decision about the
> forward
> >>>>>> depends on the size of the aggregated map.
> >>>>>> Only 1 element long maps would be unpacked and forwarded. 0 element
> maps
> >>>>>> would be published as delete. Any other count
> >>>>>> of map entries is in "waiting for correct deletes to arrive"-state.
> >>>>>>
> >>>>>> On 04.09.2018 21:29, Adam Bellemare wrote:
> >>>>>>
> >>>>>>> It does look like I could replace the second repartition store and
> >>>>>>> highwater store with a groupBy and reduce.  However, it looks like
> I
> >>>>>>> would
> >>>>>>> still need to store the highwater value within the materialized
> store,
> >>>>>>>
> >>>>>> to
> >>>>>> compare the arrival of out-of-order records (assuming my
> understanding
> >>>>>> of
> >>>>>> THIS is correct...). This in effect is the same as the design I have
> >>>>>> now,
> >>>>>> just with the two tables merged together.
> >
>
>

Reply via email to