Yea thats a good way to look at it.
I have seen this type of functionality in a couple other platforms like
spark and pig.
https://spark.apache.org/docs/0.6.2/api/core/spark/PairRDDFunctions.html
https://www.tutorialspoint.com/apache_pig/apache_pig_cogroup_operator.htm


On May 5, 2017 7:43 AM, "Damian Guy" <damian....@gmail.com> wrote:

> Hi Kyle,
>
> If i'm reading this correctly it is like an N way outer join? So an input
> on any stream will always produce a new aggregated value - is that correct?
> Effectively, each Aggregator just looks up the current value, aggregates
> and forwards the result.
> I need to look into it and think about it a bit more, but it seems like it
> could be a useful optimization.
>
> On Thu, 4 May 2017 at 23:21 Kyle Winkelman <winkelman.k...@gmail.com>
> wrote:
>
> > I sure can. I have added the following description to my KIP. If this
> > doesn't help let me know and I will take some more time to build a
> diagram
> > and make more of a step by step description:
> >
> > Example with Current API:
> >
> > KTable<K, V1> table1 =
> > builder.stream("topic1").groupByKey().aggregate(initializer1,
> aggregator1,
> > aggValueSerde1, storeName1);
> > KTable<K, V2> table2 =
> > builder.stream("topic2").groupByKey().aggregate(initializer2,
> aggregator2,
> > aggValueSerde2, storeName2);
> > KTable<K, V3> table3 =
> > builder.stream("topic3").groupByKey().aggregate(initializer3,
> aggregator3,
> > aggValueSerde3, storeName3);
> > KTable<K, CG> cogrouped = table1.outerJoin(table2,
> > joinerOneAndTwo).outerJoin(table3, joinerOneTwoAndThree);
> >
> > As you can see this creates 3 StateStores, requires 3 initializers, and 3
> > aggValueSerdes. This also adds the pressure to user to define what the
> > intermediate values are going to be (V1, V2, V3). They are left with a
> > couple choices, first to make V1, V2, and V3 all the same as CG and the
> two
> > joiners are more like mergers, or second make them intermediate states
> such
> > as Topic1Map, Topic2Map, and Topic3Map and the joiners use those to build
> > the final aggregate CG value. This is something the user could avoid
> > thinking about with this KIP.
> >
> > When a new input arrives lets say at "topic1" it will first go through a
> > KStreamAggregate grabbing the current aggregate from storeName1. It will
> > produce this in the form of the first intermediate value and get sent
> > through a KTableKTableOuterJoin where it will look up the current value
> of
> > the key in storeName2. It will use the first joiner to calculate the
> second
> > intermediate value, which will go through an additional
> > KTableKTableOuterJoin. Here it will look up the current value of the key
> in
> > storeName3 and use the second joiner to build the final aggregate value.
> >
> > If you think through all possibilities for incoming topics you will see
> > that no matter which topic it comes in through all three stores are
> queried
> > and all of the joiners must get used.
> >
> > Topology wise for N incoming streams this creates N
> > KStreamAggregates, 2*(N-1) KTableKTableOuterJoins, and N-1
> > KTableKTableJoinMergers.
> >
> >
> >
> > Example with Proposed API:
> >
> > KGroupedStream<K, V1> grouped1 = builder.stream("topic1").groupByKey();
> > KGroupedStream<K, V2> grouped2 = builder.stream("topic2").groupByKey();
> > KGroupedStream<K, V3> grouped3 = builder.stream("topic3").groupByKey();
> > KTable<K, CG> cogrouped = grouped1.cogroup(initializer1, aggregator1,
> > aggValueSerde1, storeName1)
> >         .cogroup(grouped2, aggregator2)
> >         .cogroup(grouped3, aggregator3)
> >         .aggregate();
> >
> > As you can see this creates 1 StateStore, requires 1 initializer, and 1
> > aggValueSerde. The user no longer has to worry about the intermediate
> > values and the joiners. All they have to think about is how each stream
> > impacts the creation of the final CG object.
> >
> > When a new input arrives lets say at "topic1" it will first go through a
> > KStreamAggreagte and grab the current aggregate from storeName1. It will
> > add its incoming object to the aggregate, update the store and pass the
> new
> > aggregate on. This new aggregate goes through the KStreamCogroup which is
> > pretty much just a pass through processor and you are done.
> >
> > Topology wise for N incoming streams the new api will only every create N
> > KStreamAggregates and 1 KStreamCogroup.
> >
> > On Thu, May 4, 2017 at 4:42 PM, Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> > > Kyle,
> > >
> > > thanks a lot for the KIP. Maybe I am a little slow, but I could not
> > > follow completely. Could you maybe add a more concrete example, like 3
> > > streams with 3 records each (plus expected result), and show the
> > > difference between current way to to implement it and the proposed API?
> > > This could also cover the internal processing to see what store calls
> > > would be required for both approaches etc.
> > >
> > > I think, it's pretty advanced stuff you propose, and it would help to
> > > understand it better.
> > >
> > > Thanks a lot!
> > >
> > >
> > > -Matthias
> > >
> > >
> > >
> > > On 5/4/17 11:39 AM, Kyle Winkelman wrote:
> > > > I have made a pull request. It can be found here.
> > > >
> > > > https://github.com/apache/kafka/pull/2975
> > > >
> > > > I plan to write some more unit tests for my classes and get around to
> > > > writing documentation for the public api additions.
> > > >
> > > > One thing I was curious about is during the
> > > KCogroupedStreamImpl#aggregate
> > > > method I pass null to the KGroupedStream#repartitionIfRequired
> method.
> > I
> > > > can't supply the store name because if more than one grouped stream
> > > > repartitions an error is thrown. Is there some name that someone can
> > > > recommend or should I leave the null and allow it to fall back to the
> > > > KGroupedStream.name?
> > > >
> > > > Should this be expanded to handle grouped tables? This would be
> pretty
> > > easy
> > > > for a normal aggregate but one allowing session stores and windowed
> > > stores
> > > > would required KTableSessionWindowAggregate and KTableWindowAggregate
> > > > implementations.
> > > >
> > > > Thanks,
> > > > Kyle
> > > >
> > > > On May 4, 2017 1:24 PM, "Eno Thereska" <eno.there...@gmail.com>
> wrote:
> > > >
> > > >> I’ll look as well asap, sorry, been swamped.
> > > >>
> > > >> Eno
> > > >>> On May 4, 2017, at 6:17 PM, Damian Guy <damian....@gmail.com>
> wrote:
> > > >>>
> > > >>> Hi Kyle,
> > > >>>
> > > >>> Thanks for the KIP. I apologize that i haven't had the chance to
> look
> > > at
> > > >>> the KIP yet, but will schedule some time to look into it tomorrow.
> > For
> > > >> the
> > > >>> implementation, can you raise a PR against kafka trunk and mark it
> as
> > > >> WIP?
> > > >>> It will be easier to review what you have done.
> > > >>>
> > > >>> Thanks,
> > > >>> Damian
> > > >>>
> > > >>> On Thu, 4 May 2017 at 11:50 Kyle Winkelman <
> winkelman.k...@gmail.com
> > >
> > > >> wrote:
> > > >>>
> > > >>>> I am replying to this in hopes it will draw some attention to my
> KIP
> > > as
> > > >> I
> > > >>>> haven't heard from anyone in a couple days. This is my first KIP
> and
> > > my
> > > >>>> first large contribution to the project so I'm sure I did
> something
> > > >> wrong.
> > > >>>> ;)
> > > >>>>
> > > >>>> On May 1, 2017 4:18 PM, "Kyle Winkelman" <
> winkelman.k...@gmail.com>
> > > >> wrote:
> > > >>>>
> > > >>>>> Hello all,
> > > >>>>>
> > > >>>>> I have created KIP-150 to facilitate discussion about adding
> > cogroup
> > > to
> > > >>>>> the streams DSL.
> > > >>>>>
> > > >>>>> Please find the KIP here:
> > > >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > >>>>> 150+-+Kafka-Streams+Cogroup
> > > >>>>>
> > > >>>>> Please find my initial implementation here:
> > > >>>>> https://github.com/KyleWinkelman/kafka
> > > >>>>>
> > > >>>>> Thanks,
> > > >>>>> Kyle Winkelman
> > > >>>>>
> > > >>>>
> > > >>
> > > >>
> > > >
> > >
> > >
> >
>

Reply via email to