No problem, I just wanted to make sure people still had more to say. I will
wait another week.

Thanks,
Kyle

On May 16, 2017 4:25 AM, "Eno Thereska" <eno.there...@gmail.com> wrote:

> Hi Kyle,
>
> Sorry for the delay in reviews, tomorrow is feature freeze deadline (
> https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.11.0.0 <
> https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.11.0.0>)
> so people are busier than usual. Stay tuned.
>
> Eno
> > On 15 May 2017, at 13:25, Kyle Winkelman <winkelman.k...@gmail.com>
> wrote:
> >
> > Damian Guy, could you let me know if you plan to review this further?
> There
> > is no rush, but if you dont have any additional comments I could start
> the
> > voting and finish my WIP PR.
> >
> > Thanks,
> > Kyle
> >
> > On May 9, 2017 11:07 AM, "Kyle Winkelman" <winkelman.k...@gmail.com>
> wrote:
> >
> >> Eno, is there anyone else that is an expert in the kafka streams realm
> >> that I should reach out to for input?
> >>
> >> I believe Damian Guy is still planning on reviewing this more in depth
> so
> >> I will wait for his inputs before continuing.
> >>
> >> On May 9, 2017 7:30 AM, "Eno Thereska" <eno.there...@gmail.com> wrote:
> >>
> >>> Thanks Kyle, good arguments.
> >>>
> >>> Eno
> >>>
> >>>> On May 7, 2017, at 5:06 PM, Kyle Winkelman <winkelman.k...@gmail.com>
> >>> wrote:
> >>>>
> >>>> *- minor: could you add an exact example (similar to what Jay’s
> example
> >>> is,
> >>>> or like your Spark/Pig pointers had) to make this super concrete?*
> >>>> I have added a more concrete example to the KIP.
> >>>>
> >>>> *- my main concern is that we’re exposing this optimization to the
> DSL.
> >>> In
> >>>> an ideal world, an optimizer would take the existing DSL and do the
> >>> right
> >>>> thing under the covers (create just one state store, arrange the nodes
> >>>> etc). The original DSL had a bunch of small, composable pieces (group,
> >>>> aggregate, join) that this proposal groups together. I’d like to hear
> >>> your
> >>>> thoughts on whether it’s possible to do this optimization with the
> >>> current
> >>>> DSL, at the topology builder level.*
> >>>> You would have to make a lot of checks to understand if it is even
> >>> possible
> >>>> to make this optimization:
> >>>> 1. Make sure they are all KTableKTableOuterJoins
> >>>> 2. None of the intermediate KTables are used for anything else.
> >>>> 3. None of the intermediate stores are used. (This may be impossible
> >>>> especially if they use KafkaStreams#store after the topology has
> already
> >>>> been built.)
> >>>> You would then need to make decisions during the optimization:
> >>>> 1. Your new initializer would the composite of all the individual
> >>>> initializers and the valueJoiners.
> >>>> 2. I am having a hard time thinking about how you would turn the
> >>>> aggregators and valueJoiners into an aggregator that would work on the
> >>>> final object, but this may be possible.
> >>>> 3. Which state store would you use? The ones declared would be for the
> >>>> aggregate values. None of the declared ones would be guaranteed to
> hold
> >>> the
> >>>> final object. This would mean you must created a new state store and
> not
> >>>> created any of the declared ones.
> >>>>
> >>>> The main argument I have against it is even if it could be done I
> don't
> >>>> know that we would want to have this be an optimization in the
> >>> background
> >>>> because the user would still be required to think about all of the
> >>>> intermediate values that they shouldn't need to worry about if they
> only
> >>>> care about the final object.
> >>>>
> >>>> In my opinion cogroup is a common enough case that it should be part
> of
> >>> the
> >>>> composable pieces (group, aggregate, join) because we want to allow
> >>> people
> >>>> to join more than 2 or more streams in an easy way. Right now I don't
> >>> think
> >>>> we give them ways of handling this use case easily.
> >>>>
> >>>> *-I think there will be scope for several such optimizations in the
> >>> future
> >>>> and perhaps at some point we need to think about decoupling the 1:1
> >>> mapping
> >>>> from the DSL into the physical topology.*
> >>>> I would argue that cogroup is not just an optimization it is a new way
> >>> for
> >>>> the users to look at accomplishing a problem that requires multiple
> >>>> streams. I may sound like a broken record but I don't think users
> should
> >>>> have to build the N-1 intermediate tables and deal with their
> >>> initializers,
> >>>> serdes and stores if all they care about is the final object.
> >>>> Now if for example someone uses cogroup but doesn't supply additional
> >>>> streams and aggregators this case is equivalent to a single grouped
> >>> stream
> >>>> making an aggregate call. This case is what I view an optimization as,
> >>> we
> >>>> could remove the KStreamCogroup and act as if there was just a call to
> >>>> KGroupedStream#aggregate instead of calling KGroupedStream#cogroup. (I
> >>>> would prefer to just write a warning saying that this is not how
> >>> cogroup is
> >>>> to be used.)
> >>>>
> >>>> Thanks,
> >>>> Kyle
> >>>>
> >>>> On Sun, May 7, 2017 at 5:41 AM, Eno Thereska <eno.there...@gmail.com>
> >>> wrote:
> >>>>
> >>>>> Hi Kyle,
> >>>>>
> >>>>> Thanks for the KIP again. A couple of comments:
> >>>>>
> >>>>> - minor: could you add an exact example (similar to what Jay’s
> example
> >>> is,
> >>>>> or like your Spark/Pig pointers had) to make this super concrete?
> >>>>>
> >>>>> - my main concern is that we’re exposing this optimization to the
> DSL.
> >>> In
> >>>>> an ideal world, an optimizer would take the existing DSL and do the
> >>> right
> >>>>> thing under the covers (create just one state store, arrange the
> nodes
> >>>>> etc). The original DSL had a bunch of small, composable pieces
> (group,
> >>>>> aggregate, join) that this proposal groups together. I’d like to hear
> >>> your
> >>>>> thoughts on whether it’s possible to do this optimization with the
> >>> current
> >>>>> DSL, at the topology builder level.
> >>>>>
> >>>>> I think there will be scope for several such optimizations in the
> >>> future
> >>>>> and perhaps at some point we need to think about decoupling the 1:1
> >>> mapping
> >>>>> from the DSL into the physical topology.
> >>>>>
> >>>>> Thanks
> >>>>> Eno
> >>>>>
> >>>>>> On May 5, 2017, at 4:39 PM, Jay Kreps <j...@confluent.io> wrote:
> >>>>>>
> >>>>>> I haven't digested the proposal but the use case is pretty common.
> An
> >>>>>> example would be the "customer 360" or "unified customer profile"
> use
> >>>>> case
> >>>>>> we often use. In that use case you have a dozen systems each of
> which
> >>> has
> >>>>>> some information about your customer (account details, settings,
> >>> billing
> >>>>>> info, customer service contacts, purchase history, etc). Your goal
> is
> >>> to
> >>>>>> join/munge these into a single profile record for each customer that
> >>> has
> >>>>>> all the relevant info in a usable form and is up-to-date with all
> the
> >>>>>> source systems. If you implement that with kstreams as a sequence of
> >>>>> joins
> >>>>>> i think today we'd fully materialize N-1 intermediate tables. But
> >>> clearly
> >>>>>> you only need a single stage to group all these things that are
> >>> already
> >>>>>> co-partitioned. A distributed database would do this under the
> covers
> >>>>> which
> >>>>>> is arguably better (at least when it does the right thing) and
> >>> perhaps we
> >>>>>> could do the same thing but I'm not sure we know the partitioning so
> >>> we
> >>>>> may
> >>>>>> need an explicit cogroup command that impllies they are already
> >>>>>> co-partitioned.
> >>>>>>
> >>>>>> -Jay
> >>>>>>
> >>>>>> On Fri, May 5, 2017 at 5:56 AM, Kyle Winkelman <
> >>> winkelman.k...@gmail.com
> >>>>>>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Yea thats a good way to look at it.
> >>>>>>> I have seen this type of functionality in a couple other platforms
> >>> like
> >>>>>>> spark and pig.
> >>>>>>> https://spark.apache.org/docs/0.6.2/api/core/spark/
> >>>>> PairRDDFunctions.html
> >>>>>>> https://www.tutorialspoint.com/apache_pig/apache_pig_
> >>>>> cogroup_operator.htm
> >>>>>>>
> >>>>>>>
> >>>>>>> On May 5, 2017 7:43 AM, "Damian Guy" <damian....@gmail.com> wrote:
> >>>>>>>
> >>>>>>>> Hi Kyle,
> >>>>>>>>
> >>>>>>>> If i'm reading this correctly it is like an N way outer join? So
> an
> >>>>> input
> >>>>>>>> on any stream will always produce a new aggregated value - is that
> >>>>>>> correct?
> >>>>>>>> Effectively, each Aggregator just looks up the current value,
> >>>>> aggregates
> >>>>>>>> and forwards the result.
> >>>>>>>> I need to look into it and think about it a bit more, but it seems
> >>> like
> >>>>>>> it
> >>>>>>>> could be a useful optimization.
> >>>>>>>>
> >>>>>>>> On Thu, 4 May 2017 at 23:21 Kyle Winkelman <
> >>> winkelman.k...@gmail.com>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> I sure can. I have added the following description to my KIP. If
> >>> this
> >>>>>>>>> doesn't help let me know and I will take some more time to build
> a
> >>>>>>>> diagram
> >>>>>>>>> and make more of a step by step description:
> >>>>>>>>>
> >>>>>>>>> Example with Current API:
> >>>>>>>>>
> >>>>>>>>> KTable<K, V1> table1 =
> >>>>>>>>> builder.stream("topic1").groupByKey().aggregate(initializer1,
> >>>>>>>> aggregator1,
> >>>>>>>>> aggValueSerde1, storeName1);
> >>>>>>>>> KTable<K, V2> table2 =
> >>>>>>>>> builder.stream("topic2").groupByKey().aggregate(initializer2,
> >>>>>>>> aggregator2,
> >>>>>>>>> aggValueSerde2, storeName2);
> >>>>>>>>> KTable<K, V3> table3 =
> >>>>>>>>> builder.stream("topic3").groupByKey().aggregate(initializer3,
> >>>>>>>> aggregator3,
> >>>>>>>>> aggValueSerde3, storeName3);
> >>>>>>>>> KTable<K, CG> cogrouped = table1.outerJoin(table2,
> >>>>>>>>> joinerOneAndTwo).outerJoin(table3, joinerOneTwoAndThree);
> >>>>>>>>>
> >>>>>>>>> As you can see this creates 3 StateStores, requires 3
> initializers,
> >>>>>>> and 3
> >>>>>>>>> aggValueSerdes. This also adds the pressure to user to define
> what
> >>> the
> >>>>>>>>> intermediate values are going to be (V1, V2, V3). They are left
> >>> with a
> >>>>>>>>> couple choices, first to make V1, V2, and V3 all the same as CG
> and
> >>>>> the
> >>>>>>>> two
> >>>>>>>>> joiners are more like mergers, or second make them intermediate
> >>> states
> >>>>>>>> such
> >>>>>>>>> as Topic1Map, Topic2Map, and Topic3Map and the joiners use those
> to
> >>>>>>> build
> >>>>>>>>> the final aggregate CG value. This is something the user could
> >>> avoid
> >>>>>>>>> thinking about with this KIP.
> >>>>>>>>>
> >>>>>>>>> When a new input arrives lets say at "topic1" it will first go
> >>> through
> >>>>>>> a
> >>>>>>>>> KStreamAggregate grabbing the current aggregate from storeName1.
> It
> >>>>>>> will
> >>>>>>>>> produce this in the form of the first intermediate value and get
> >>> sent
> >>>>>>>>> through a KTableKTableOuterJoin where it will look up the current
> >>>>> value
> >>>>>>>> of
> >>>>>>>>> the key in storeName2. It will use the first joiner to calculate
> >>> the
> >>>>>>>> second
> >>>>>>>>> intermediate value, which will go through an additional
> >>>>>>>>> KTableKTableOuterJoin. Here it will look up the current value of
> >>> the
> >>>>>>> key
> >>>>>>>> in
> >>>>>>>>> storeName3 and use the second joiner to build the final aggregate
> >>>>>>> value.
> >>>>>>>>>
> >>>>>>>>> If you think through all possibilities for incoming topics you
> will
> >>>>> see
> >>>>>>>>> that no matter which topic it comes in through all three stores
> are
> >>>>>>>> queried
> >>>>>>>>> and all of the joiners must get used.
> >>>>>>>>>
> >>>>>>>>> Topology wise for N incoming streams this creates N
> >>>>>>>>> KStreamAggregates, 2*(N-1) KTableKTableOuterJoins, and N-1
> >>>>>>>>> KTableKTableJoinMergers.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Example with Proposed API:
> >>>>>>>>>
> >>>>>>>>> KGroupedStream<K, V1> grouped1 = builder.stream("topic1").
> >>>>>>> groupByKey();
> >>>>>>>>> KGroupedStream<K, V2> grouped2 = builder.stream("topic2").
> >>>>>>> groupByKey();
> >>>>>>>>> KGroupedStream<K, V3> grouped3 = builder.stream("topic3").
> >>>>>>> groupByKey();
> >>>>>>>>> KTable<K, CG> cogrouped = grouped1.cogroup(initializer1,
> >>> aggregator1,
> >>>>>>>>> aggValueSerde1, storeName1)
> >>>>>>>>>      .cogroup(grouped2, aggregator2)
> >>>>>>>>>      .cogroup(grouped3, aggregator3)
> >>>>>>>>>      .aggregate();
> >>>>>>>>>
> >>>>>>>>> As you can see this creates 1 StateStore, requires 1 initializer,
> >>> and
> >>>>> 1
> >>>>>>>>> aggValueSerde. The user no longer has to worry about the
> >>> intermediate
> >>>>>>>>> values and the joiners. All they have to think about is how each
> >>>>> stream
> >>>>>>>>> impacts the creation of the final CG object.
> >>>>>>>>>
> >>>>>>>>> When a new input arrives lets say at "topic1" it will first go
> >>> through
> >>>>>>> a
> >>>>>>>>> KStreamAggreagte and grab the current aggregate from storeName1.
> It
> >>>>>>> will
> >>>>>>>>> add its incoming object to the aggregate, update the store and
> pass
> >>>>> the
> >>>>>>>> new
> >>>>>>>>> aggregate on. This new aggregate goes through the KStreamCogroup
> >>> which
> >>>>>>> is
> >>>>>>>>> pretty much just a pass through processor and you are done.
> >>>>>>>>>
> >>>>>>>>> Topology wise for N incoming streams the new api will only every
> >>>>>>> create N
> >>>>>>>>> KStreamAggregates and 1 KStreamCogroup.
> >>>>>>>>>
> >>>>>>>>> On Thu, May 4, 2017 at 4:42 PM, Matthias J. Sax <
> >>>>> matth...@confluent.io
> >>>>>>>>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Kyle,
> >>>>>>>>>>
> >>>>>>>>>> thanks a lot for the KIP. Maybe I am a little slow, but I could
> >>> not
> >>>>>>>>>> follow completely. Could you maybe add a more concrete example,
> >>> like
> >>>>>>> 3
> >>>>>>>>>> streams with 3 records each (plus expected result), and show the
> >>>>>>>>>> difference between current way to to implement it and the
> proposed
> >>>>>>> API?
> >>>>>>>>>> This could also cover the internal processing to see what store
> >>> calls
> >>>>>>>>>> would be required for both approaches etc.
> >>>>>>>>>>
> >>>>>>>>>> I think, it's pretty advanced stuff you propose, and it would
> >>> help to
> >>>>>>>>>> understand it better.
> >>>>>>>>>>
> >>>>>>>>>> Thanks a lot!
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> -Matthias
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On 5/4/17 11:39 AM, Kyle Winkelman wrote:
> >>>>>>>>>>> I have made a pull request. It can be found here.
> >>>>>>>>>>>
> >>>>>>>>>>> https://github.com/apache/kafka/pull/2975
> >>>>>>>>>>>
> >>>>>>>>>>> I plan to write some more unit tests for my classes and get
> >>> around
> >>>>>>> to
> >>>>>>>>>>> writing documentation for the public api additions.
> >>>>>>>>>>>
> >>>>>>>>>>> One thing I was curious about is during the
> >>>>>>>>>> KCogroupedStreamImpl#aggregate
> >>>>>>>>>>> method I pass null to the KGroupedStream#repartitionIfRequired
> >>>>>>>> method.
> >>>>>>>>> I
> >>>>>>>>>>> can't supply the store name because if more than one grouped
> >>> stream
> >>>>>>>>>>> repartitions an error is thrown. Is there some name that
> someone
> >>>>>>> can
> >>>>>>>>>>> recommend or should I leave the null and allow it to fall back
> to
> >>>>>>> the
> >>>>>>>>>>> KGroupedStream.name?
> >>>>>>>>>>>
> >>>>>>>>>>> Should this be expanded to handle grouped tables? This would be
> >>>>>>>> pretty
> >>>>>>>>>> easy
> >>>>>>>>>>> for a normal aggregate but one allowing session stores and
> >>> windowed
> >>>>>>>>>> stores
> >>>>>>>>>>> would required KTableSessionWindowAggregate and
> >>>>>>> KTableWindowAggregate
> >>>>>>>>>>> implementations.
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks,
> >>>>>>>>>>> Kyle
> >>>>>>>>>>>
> >>>>>>>>>>> On May 4, 2017 1:24 PM, "Eno Thereska" <eno.there...@gmail.com
> >
> >>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> I’ll look as well asap, sorry, been swamped.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Eno
> >>>>>>>>>>>>> On May 4, 2017, at 6:17 PM, Damian Guy <damian....@gmail.com
> >
> >>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Hi Kyle,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for the KIP. I apologize that i haven't had the chance
> >>> to
> >>>>>>>> look
> >>>>>>>>>> at
> >>>>>>>>>>>>> the KIP yet, but will schedule some time to look into it
> >>>>>>> tomorrow.
> >>>>>>>>> For
> >>>>>>>>>>>> the
> >>>>>>>>>>>>> implementation, can you raise a PR against kafka trunk and
> mark
> >>>>>>> it
> >>>>>>>> as
> >>>>>>>>>>>> WIP?
> >>>>>>>>>>>>> It will be easier to review what you have done.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>> Damian
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Thu, 4 May 2017 at 11:50 Kyle Winkelman <
> >>>>>>>> winkelman.k...@gmail.com
> >>>>>>>>>>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> I am replying to this in hopes it will draw some attention
> to
> >>> my
> >>>>>>>> KIP
> >>>>>>>>>> as
> >>>>>>>>>>>> I
> >>>>>>>>>>>>>> haven't heard from anyone in a couple days. This is my first
> >>> KIP
> >>>>>>>> and
> >>>>>>>>>> my
> >>>>>>>>>>>>>> first large contribution to the project so I'm sure I did
> >>>>>>>> something
> >>>>>>>>>>>> wrong.
> >>>>>>>>>>>>>> ;)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On May 1, 2017 4:18 PM, "Kyle Winkelman" <
> >>>>>>>> winkelman.k...@gmail.com>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hello all,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I have created KIP-150 to facilitate discussion about
> adding
> >>>>>>>>> cogroup
> >>>>>>>>>> to
> >>>>>>>>>>>>>>> the streams DSL.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Please find the KIP here:
> >>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>>>>>>>>>>>> 150+-+Kafka-Streams+Cogroup
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Please find my initial implementation here:
> >>>>>>>>>>>>>>> https://github.com/KyleWinkelman/kafka
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>> Kyle Winkelman
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>>>
> >>>
> >>>
>
>

Reply via email to