Thanks for the proposal Kyle, this is a quite common use case to support
such multi-way table join (i.e. N source tables with N aggregate func) with
a single store and N+1 serdes, I have seen lots of people using the
low-level PAPI to achieve this goal.


On Fri, May 19, 2017 at 10:04 AM, Kyle Winkelman <winkelman.k...@gmail.com>
wrote:

> I like your point about not handling other cases such as count and reduce.
>
> I think that reduce may not make sense because reduce assumes that the
> input values are the same as the output values. With cogroup there may be
> multiple different input types and then your output type cant be multiple
> different things. In the case where you have all matching value types you
> can do KStreamBuilder#merge followed by the reduce.
>
> As for count I think it is possible to call count on all the individual
> grouped streams and then do joins. Otherwise we could maybe make a special
> call in groupedstream for this case. Because in this case we dont need to
> do type checking on the values. It could be similar to the current count
> methods but accept a var args of additonal grouped streams as well and make
> sure they have a key type of K.
>
> The way I have put the kip together is to ensure that we do type checking.
> I don't see a way we could group them all first and then make a call to
> count, reduce, or aggregate because with aggregate they would need to pass
> a list of aggregators and we would have no way of type checking that they
> match the grouped streams.
>
> Thanks,
> Kyle
>
> On May 19, 2017 11:42 AM, "Xavier Léauté" <xav...@confluent.io> wrote:
>
> > Sorry to jump on this thread so late. I agree this is a very useful
> > addition and wanted to provide an additional use-case and some more
> > comments.
> >
> > This is actually a very common analytics use-case in the ad-tech
> industry.
> > The typical setup will have an auction stream, an impression stream, and
> a
> > click stream. Those three streams need to be combined to compute
> aggregate
> > statistics (e.g. impression statistics, and click-through rates), since
> > most of the attributes of interest are only present the auction stream.
> >
> > A simple way to do this is to co-group all the streams by the auction
> key,
> > and process updates to the co-group as events for each stream come in,
> > keeping only one value from each stream before sending downstream for
> > further processing / aggregation.
> >
> > One could view the result of that co-group operation as a "KTable" with
> > multiple values per key. The key being the grouping key, and the values
> > consisting of one value per stream.
> >
> > What I like about Kyle's approach is that allows elegant co-grouping of
> > multiple streams without having to worry about the number of streams, and
> > avoids dealing with Tuple types or other generic interfaces that could
> get
> > messy if we wanted to preserve all the value types in the resulting
> > co-grouped stream.
> >
> > My only concern is that we only allow the cogroup + aggregate combined
> > operation. This forces the user to build their own tuple serialization
> > format if they want to preserve the individual input stream values as a
> > group. It also deviates quite a bit from our approach in KGroupedStream
> > which offers other operations, such as count and reduce, which should
> also
> > be applicable to a co-grouped stream.
> >
> > Overall I still think this is a really useful addition, but I feel we
> > haven't spend much time trying to explore alternative DSLs that could
> maybe
> > generalize better or match our existing syntax more closely.
> >
> > On Tue, May 9, 2017 at 8:08 AM Kyle Winkelman <winkelman.k...@gmail.com>
> > wrote:
> >
> > > Eno, is there anyone else that is an expert in the kafka streams realm
> > that
> > > I should reach out to for input?
> > >
> > > I believe Damian Guy is still planning on reviewing this more in depth
> > so I
> > > will wait for his inputs before continuing.
> > >
> > > On May 9, 2017 7:30 AM, "Eno Thereska" <eno.there...@gmail.com> wrote:
> > >
> > > > Thanks Kyle, good arguments.
> > > >
> > > > Eno
> > > >
> > > > > On May 7, 2017, at 5:06 PM, Kyle Winkelman <
> winkelman.k...@gmail.com
> > >
> > > > wrote:
> > > > >
> > > > > *- minor: could you add an exact example (similar to what Jay’s
> > example
> > > > is,
> > > > > or like your Spark/Pig pointers had) to make this super concrete?*
> > > > > I have added a more concrete example to the KIP.
> > > > >
> > > > > *- my main concern is that we’re exposing this optimization to the
> > DSL.
> > > > In
> > > > > an ideal world, an optimizer would take the existing DSL and do the
> > > right
> > > > > thing under the covers (create just one state store, arrange the
> > nodes
> > > > > etc). The original DSL had a bunch of small, composable pieces
> > (group,
> > > > > aggregate, join) that this proposal groups together. I’d like to
> hear
> > > > your
> > > > > thoughts on whether it’s possible to do this optimization with the
> > > > current
> > > > > DSL, at the topology builder level.*
> > > > > You would have to make a lot of checks to understand if it is even
> > > > possible
> > > > > to make this optimization:
> > > > > 1. Make sure they are all KTableKTableOuterJoins
> > > > > 2. None of the intermediate KTables are used for anything else.
> > > > > 3. None of the intermediate stores are used. (This may be
> impossible
> > > > > especially if they use KafkaStreams#store after the topology has
> > > already
> > > > > been built.)
> > > > > You would then need to make decisions during the optimization:
> > > > > 1. Your new initializer would the composite of all the individual
> > > > > initializers and the valueJoiners.
> > > > > 2. I am having a hard time thinking about how you would turn the
> > > > > aggregators and valueJoiners into an aggregator that would work on
> > the
> > > > > final object, but this may be possible.
> > > > > 3. Which state store would you use? The ones declared would be for
> > the
> > > > > aggregate values. None of the declared ones would be guaranteed to
> > hold
> > > > the
> > > > > final object. This would mean you must created a new state store
> and
> > > not
> > > > > created any of the declared ones.
> > > > >
> > > > > The main argument I have against it is even if it could be done I
> > don't
> > > > > know that we would want to have this be an optimization in the
> > > background
> > > > > because the user would still be required to think about all of the
> > > > > intermediate values that they shouldn't need to worry about if they
> > > only
> > > > > care about the final object.
> > > > >
> > > > > In my opinion cogroup is a common enough case that it should be
> part
> > of
> > > > the
> > > > > composable pieces (group, aggregate, join) because we want to allow
> > > > people
> > > > > to join more than 2 or more streams in an easy way. Right now I
> don't
> > > > think
> > > > > we give them ways of handling this use case easily.
> > > > >
> > > > > *-I think there will be scope for several such optimizations in the
> > > > future
> > > > > and perhaps at some point we need to think about decoupling the 1:1
> > > > mapping
> > > > > from the DSL into the physical topology.*
> > > > > I would argue that cogroup is not just an optimization it is a new
> > way
> > > > for
> > > > > the users to look at accomplishing a problem that requires multiple
> > > > > streams. I may sound like a broken record but I don't think users
> > > should
> > > > > have to build the N-1 intermediate tables and deal with their
> > > > initializers,
> > > > > serdes and stores if all they care about is the final object.
> > > > > Now if for example someone uses cogroup but doesn't supply
> additional
> > > > > streams and aggregators this case is equivalent to a single grouped
> > > > stream
> > > > > making an aggregate call. This case is what I view an optimization
> > as,
> > > we
> > > > > could remove the KStreamCogroup and act as if there was just a call
> > to
> > > > > KGroupedStream#aggregate instead of calling KGroupedStream#cogroup.
> > (I
> > > > > would prefer to just write a warning saying that this is not how
> > > cogroup
> > > > is
> > > > > to be used.)
> > > > >
> > > > > Thanks,
> > > > > Kyle
> > > > >
> > > > > On Sun, May 7, 2017 at 5:41 AM, Eno Thereska <
> eno.there...@gmail.com
> > >
> > > > wrote:
> > > > >
> > > > >> Hi Kyle,
> > > > >>
> > > > >> Thanks for the KIP again. A couple of comments:
> > > > >>
> > > > >> - minor: could you add an exact example (similar to what Jay’s
> > example
> > > > is,
> > > > >> or like your Spark/Pig pointers had) to make this super concrete?
> > > > >>
> > > > >> - my main concern is that we’re exposing this optimization to the
> > DSL.
> > > > In
> > > > >> an ideal world, an optimizer would take the existing DSL and do
> the
> > > > right
> > > > >> thing under the covers (create just one state store, arrange the
> > nodes
> > > > >> etc). The original DSL had a bunch of small, composable pieces
> > (group,
> > > > >> aggregate, join) that this proposal groups together. I’d like to
> > hear
> > > > your
> > > > >> thoughts on whether it’s possible to do this optimization with the
> > > > current
> > > > >> DSL, at the topology builder level.
> > > > >>
> > > > >> I think there will be scope for several such optimizations in the
> > > future
> > > > >> and perhaps at some point we need to think about decoupling the
> 1:1
> > > > mapping
> > > > >> from the DSL into the physical topology.
> > > > >>
> > > > >> Thanks
> > > > >> Eno
> > > > >>
> > > > >>> On May 5, 2017, at 4:39 PM, Jay Kreps <j...@confluent.io> wrote:
> > > > >>>
> > > > >>> I haven't digested the proposal but the use case is pretty
> common.
> > An
> > > > >>> example would be the "customer 360" or "unified customer profile"
> > use
> > > > >> case
> > > > >>> we often use. In that use case you have a dozen systems each of
> > which
> > > > has
> > > > >>> some information about your customer (account details, settings,
> > > > billing
> > > > >>> info, customer service contacts, purchase history, etc). Your
> goal
> > is
> > > > to
> > > > >>> join/munge these into a single profile record for each customer
> > that
> > > > has
> > > > >>> all the relevant info in a usable form and is up-to-date with all
> > the
> > > > >>> source systems. If you implement that with kstreams as a sequence
> > of
> > > > >> joins
> > > > >>> i think today we'd fully materialize N-1 intermediate tables. But
> > > > clearly
> > > > >>> you only need a single stage to group all these things that are
> > > already
> > > > >>> co-partitioned. A distributed database would do this under the
> > covers
> > > > >> which
> > > > >>> is arguably better (at least when it does the right thing) and
> > > perhaps
> > > > we
> > > > >>> could do the same thing but I'm not sure we know the partitioning
> > so
> > > we
> > > > >> may
> > > > >>> need an explicit cogroup command that impllies they are already
> > > > >>> co-partitioned.
> > > > >>>
> > > > >>> -Jay
> > > > >>>
> > > > >>> On Fri, May 5, 2017 at 5:56 AM, Kyle Winkelman <
> > > > winkelman.k...@gmail.com
> > > > >>>
> > > > >>> wrote:
> > > > >>>
> > > > >>>> Yea thats a good way to look at it.
> > > > >>>> I have seen this type of functionality in a couple other
> platforms
> > > > like
> > > > >>>> spark and pig.
> > > > >>>> https://spark.apache.org/docs/0.6.2/api/core/spark/
> > > > >> PairRDDFunctions.html
> > > > >>>> https://www.tutorialspoint.com/apache_pig/apache_pig_
> > > > >> cogroup_operator.htm
> > > > >>>>
> > > > >>>>
> > > > >>>> On May 5, 2017 7:43 AM, "Damian Guy" <damian....@gmail.com>
> > wrote:
> > > > >>>>
> > > > >>>>> Hi Kyle,
> > > > >>>>>
> > > > >>>>> If i'm reading this correctly it is like an N way outer join?
> So
> > an
> > > > >> input
> > > > >>>>> on any stream will always produce a new aggregated value - is
> > that
> > > > >>>> correct?
> > > > >>>>> Effectively, each Aggregator just looks up the current value,
> > > > >> aggregates
> > > > >>>>> and forwards the result.
> > > > >>>>> I need to look into it and think about it a bit more, but it
> > seems
> > > > like
> > > > >>>> it
> > > > >>>>> could be a useful optimization.
> > > > >>>>>
> > > > >>>>> On Thu, 4 May 2017 at 23:21 Kyle Winkelman <
> > > winkelman.k...@gmail.com
> > > > >
> > > > >>>>> wrote:
> > > > >>>>>
> > > > >>>>>> I sure can. I have added the following description to my KIP.
> If
> > > > this
> > > > >>>>>> doesn't help let me know and I will take some more time to
> > build a
> > > > >>>>> diagram
> > > > >>>>>> and make more of a step by step description:
> > > > >>>>>>
> > > > >>>>>> Example with Current API:
> > > > >>>>>>
> > > > >>>>>> KTable<K, V1> table1 =
> > > > >>>>>> builder.stream("topic1").groupByKey().aggregate(initializer1,
> > > > >>>>> aggregator1,
> > > > >>>>>> aggValueSerde1, storeName1);
> > > > >>>>>> KTable<K, V2> table2 =
> > > > >>>>>> builder.stream("topic2").groupByKey().aggregate(initializer2,
> > > > >>>>> aggregator2,
> > > > >>>>>> aggValueSerde2, storeName2);
> > > > >>>>>> KTable<K, V3> table3 =
> > > > >>>>>> builder.stream("topic3").groupByKey().aggregate(initializer3,
> > > > >>>>> aggregator3,
> > > > >>>>>> aggValueSerde3, storeName3);
> > > > >>>>>> KTable<K, CG> cogrouped = table1.outerJoin(table2,
> > > > >>>>>> joinerOneAndTwo).outerJoin(table3, joinerOneTwoAndThree);
> > > > >>>>>>
> > > > >>>>>> As you can see this creates 3 StateStores, requires 3
> > > initializers,
> > > > >>>> and 3
> > > > >>>>>> aggValueSerdes. This also adds the pressure to user to define
> > what
> > > > the
> > > > >>>>>> intermediate values are going to be (V1, V2, V3). They are
> left
> > > > with a
> > > > >>>>>> couple choices, first to make V1, V2, and V3 all the same as
> CG
> > > and
> > > > >> the
> > > > >>>>> two
> > > > >>>>>> joiners are more like mergers, or second make them
> intermediate
> > > > states
> > > > >>>>> such
> > > > >>>>>> as Topic1Map, Topic2Map, and Topic3Map and the joiners use
> those
> > > to
> > > > >>>> build
> > > > >>>>>> the final aggregate CG value. This is something the user could
> > > avoid
> > > > >>>>>> thinking about with this KIP.
> > > > >>>>>>
> > > > >>>>>> When a new input arrives lets say at "topic1" it will first go
> > > > through
> > > > >>>> a
> > > > >>>>>> KStreamAggregate grabbing the current aggregate from
> storeName1.
> > > It
> > > > >>>> will
> > > > >>>>>> produce this in the form of the first intermediate value and
> get
> > > > sent
> > > > >>>>>> through a KTableKTableOuterJoin where it will look up the
> > current
> > > > >> value
> > > > >>>>> of
> > > > >>>>>> the key in storeName2. It will use the first joiner to
> calculate
> > > the
> > > > >>>>> second
> > > > >>>>>> intermediate value, which will go through an additional
> > > > >>>>>> KTableKTableOuterJoin. Here it will look up the current value
> of
> > > the
> > > > >>>> key
> > > > >>>>> in
> > > > >>>>>> storeName3 and use the second joiner to build the final
> > aggregate
> > > > >>>> value.
> > > > >>>>>>
> > > > >>>>>> If you think through all possibilities for incoming topics you
> > > will
> > > > >> see
> > > > >>>>>> that no matter which topic it comes in through all three
> stores
> > > are
> > > > >>>>> queried
> > > > >>>>>> and all of the joiners must get used.
> > > > >>>>>>
> > > > >>>>>> Topology wise for N incoming streams this creates N
> > > > >>>>>> KStreamAggregates, 2*(N-1) KTableKTableOuterJoins, and N-1
> > > > >>>>>> KTableKTableJoinMergers.
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> Example with Proposed API:
> > > > >>>>>>
> > > > >>>>>> KGroupedStream<K, V1> grouped1 = builder.stream("topic1").
> > > > >>>> groupByKey();
> > > > >>>>>> KGroupedStream<K, V2> grouped2 = builder.stream("topic2").
> > > > >>>> groupByKey();
> > > > >>>>>> KGroupedStream<K, V3> grouped3 = builder.stream("topic3").
> > > > >>>> groupByKey();
> > > > >>>>>> KTable<K, CG> cogrouped = grouped1.cogroup(initializer1,
> > > > aggregator1,
> > > > >>>>>> aggValueSerde1, storeName1)
> > > > >>>>>>       .cogroup(grouped2, aggregator2)
> > > > >>>>>>       .cogroup(grouped3, aggregator3)
> > > > >>>>>>       .aggregate();
> > > > >>>>>>
> > > > >>>>>> As you can see this creates 1 StateStore, requires 1
> > initializer,
> > > > and
> > > > >> 1
> > > > >>>>>> aggValueSerde. The user no longer has to worry about the
> > > > intermediate
> > > > >>>>>> values and the joiners. All they have to think about is how
> each
> > > > >> stream
> > > > >>>>>> impacts the creation of the final CG object.
> > > > >>>>>>
> > > > >>>>>> When a new input arrives lets say at "topic1" it will first go
> > > > through
> > > > >>>> a
> > > > >>>>>> KStreamAggreagte and grab the current aggregate from
> storeName1.
> > > It
> > > > >>>> will
> > > > >>>>>> add its incoming object to the aggregate, update the store and
> > > pass
> > > > >> the
> > > > >>>>> new
> > > > >>>>>> aggregate on. This new aggregate goes through the
> KStreamCogroup
> > > > which
> > > > >>>> is
> > > > >>>>>> pretty much just a pass through processor and you are done.
> > > > >>>>>>
> > > > >>>>>> Topology wise for N incoming streams the new api will only
> every
> > > > >>>> create N
> > > > >>>>>> KStreamAggregates and 1 KStreamCogroup.
> > > > >>>>>>
> > > > >>>>>> On Thu, May 4, 2017 at 4:42 PM, Matthias J. Sax <
> > > > >> matth...@confluent.io
> > > > >>>>>
> > > > >>>>>> wrote:
> > > > >>>>>>
> > > > >>>>>>> Kyle,
> > > > >>>>>>>
> > > > >>>>>>> thanks a lot for the KIP. Maybe I am a little slow, but I
> could
> > > not
> > > > >>>>>>> follow completely. Could you maybe add a more concrete
> example,
> > > > like
> > > > >>>> 3
> > > > >>>>>>> streams with 3 records each (plus expected result), and show
> > the
> > > > >>>>>>> difference between current way to to implement it and the
> > > proposed
> > > > >>>> API?
> > > > >>>>>>> This could also cover the internal processing to see what
> store
> > > > calls
> > > > >>>>>>> would be required for both approaches etc.
> > > > >>>>>>>
> > > > >>>>>>> I think, it's pretty advanced stuff you propose, and it would
> > > help
> > > > to
> > > > >>>>>>> understand it better.
> > > > >>>>>>>
> > > > >>>>>>> Thanks a lot!
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> -Matthias
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> On 5/4/17 11:39 AM, Kyle Winkelman wrote:
> > > > >>>>>>>> I have made a pull request. It can be found here.
> > > > >>>>>>>>
> > > > >>>>>>>> https://github.com/apache/kafka/pull/2975
> > > > >>>>>>>>
> > > > >>>>>>>> I plan to write some more unit tests for my classes and get
> > > around
> > > > >>>> to
> > > > >>>>>>>> writing documentation for the public api additions.
> > > > >>>>>>>>
> > > > >>>>>>>> One thing I was curious about is during the
> > > > >>>>>>> KCogroupedStreamImpl#aggregate
> > > > >>>>>>>> method I pass null to the KGroupedStream#
> > repartitionIfRequired
> > > > >>>>> method.
> > > > >>>>>> I
> > > > >>>>>>>> can't supply the store name because if more than one grouped
> > > > stream
> > > > >>>>>>>> repartitions an error is thrown. Is there some name that
> > someone
> > > > >>>> can
> > > > >>>>>>>> recommend or should I leave the null and allow it to fall
> back
> > > to
> > > > >>>> the
> > > > >>>>>>>> KGroupedStream.name?
> > > > >>>>>>>>
> > > > >>>>>>>> Should this be expanded to handle grouped tables? This would
> > be
> > > > >>>>> pretty
> > > > >>>>>>> easy
> > > > >>>>>>>> for a normal aggregate but one allowing session stores and
> > > > windowed
> > > > >>>>>>> stores
> > > > >>>>>>>> would required KTableSessionWindowAggregate and
> > > > >>>> KTableWindowAggregate
> > > > >>>>>>>> implementations.
> > > > >>>>>>>>
> > > > >>>>>>>> Thanks,
> > > > >>>>>>>> Kyle
> > > > >>>>>>>>
> > > > >>>>>>>> On May 4, 2017 1:24 PM, "Eno Thereska" <
> > eno.there...@gmail.com>
> > > > >>>>> wrote:
> > > > >>>>>>>>
> > > > >>>>>>>>> I’ll look as well asap, sorry, been swamped.
> > > > >>>>>>>>>
> > > > >>>>>>>>> Eno
> > > > >>>>>>>>>> On May 4, 2017, at 6:17 PM, Damian Guy <
> > damian....@gmail.com>
> > > > >>>>> wrote:
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Hi Kyle,
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Thanks for the KIP. I apologize that i haven't had the
> > chance
> > > to
> > > > >>>>> look
> > > > >>>>>>> at
> > > > >>>>>>>>>> the KIP yet, but will schedule some time to look into it
> > > > >>>> tomorrow.
> > > > >>>>>> For
> > > > >>>>>>>>> the
> > > > >>>>>>>>>> implementation, can you raise a PR against kafka trunk and
> > > mark
> > > > >>>> it
> > > > >>>>> as
> > > > >>>>>>>>> WIP?
> > > > >>>>>>>>>> It will be easier to review what you have done.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Thanks,
> > > > >>>>>>>>>> Damian
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> On Thu, 4 May 2017 at 11:50 Kyle Winkelman <
> > > > >>>>> winkelman.k...@gmail.com
> > > > >>>>>>>
> > > > >>>>>>>>> wrote:
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> I am replying to this in hopes it will draw some
> attention
> > to
> > > > my
> > > > >>>>> KIP
> > > > >>>>>>> as
> > > > >>>>>>>>> I
> > > > >>>>>>>>>>> haven't heard from anyone in a couple days. This is my
> > first
> > > > KIP
> > > > >>>>> and
> > > > >>>>>>> my
> > > > >>>>>>>>>>> first large contribution to the project so I'm sure I did
> > > > >>>>> something
> > > > >>>>>>>>> wrong.
> > > > >>>>>>>>>>> ;)
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> On May 1, 2017 4:18 PM, "Kyle Winkelman" <
> > > > >>>>> winkelman.k...@gmail.com>
> > > > >>>>>>>>> wrote:
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>> Hello all,
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> I have created KIP-150 to facilitate discussion about
> > adding
> > > > >>>>>> cogroup
> > > > >>>>>>> to
> > > > >>>>>>>>>>>> the streams DSL.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> Please find the KIP here:
> > > > >>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > >>>>>>>>>>>> 150+-+Kafka-Streams+Cogroup
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> Please find my initial implementation here:
> > > > >>>>>>>>>>>> https://github.com/KyleWinkelman/kafka
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> Thanks,
> > > > >>>>>>>>>>>> Kyle Winkelman
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>
> > > > >>
> > > >
> > > >
> > >
> >
>



-- 
-- Guozhang

Reply via email to