Hi Kyle,

Sorry for the delay in reviews, tomorrow is feature freeze deadline 
(https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.11.0.0 
<https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.11.0.0>) so 
people are busier than usual. Stay tuned.

Eno
> On 15 May 2017, at 13:25, Kyle Winkelman <winkelman.k...@gmail.com> wrote:
> 
> Damian Guy, could you let me know if you plan to review this further? There
> is no rush, but if you dont have any additional comments I could start the
> voting and finish my WIP PR.
> 
> Thanks,
> Kyle
> 
> On May 9, 2017 11:07 AM, "Kyle Winkelman" <winkelman.k...@gmail.com> wrote:
> 
>> Eno, is there anyone else that is an expert in the kafka streams realm
>> that I should reach out to for input?
>> 
>> I believe Damian Guy is still planning on reviewing this more in depth so
>> I will wait for his inputs before continuing.
>> 
>> On May 9, 2017 7:30 AM, "Eno Thereska" <eno.there...@gmail.com> wrote:
>> 
>>> Thanks Kyle, good arguments.
>>> 
>>> Eno
>>> 
>>>> On May 7, 2017, at 5:06 PM, Kyle Winkelman <winkelman.k...@gmail.com>
>>> wrote:
>>>> 
>>>> *- minor: could you add an exact example (similar to what Jay’s example
>>> is,
>>>> or like your Spark/Pig pointers had) to make this super concrete?*
>>>> I have added a more concrete example to the KIP.
>>>> 
>>>> *- my main concern is that we’re exposing this optimization to the DSL.
>>> In
>>>> an ideal world, an optimizer would take the existing DSL and do the
>>> right
>>>> thing under the covers (create just one state store, arrange the nodes
>>>> etc). The original DSL had a bunch of small, composable pieces (group,
>>>> aggregate, join) that this proposal groups together. I’d like to hear
>>> your
>>>> thoughts on whether it’s possible to do this optimization with the
>>> current
>>>> DSL, at the topology builder level.*
>>>> You would have to make a lot of checks to understand if it is even
>>> possible
>>>> to make this optimization:
>>>> 1. Make sure they are all KTableKTableOuterJoins
>>>> 2. None of the intermediate KTables are used for anything else.
>>>> 3. None of the intermediate stores are used. (This may be impossible
>>>> especially if they use KafkaStreams#store after the topology has already
>>>> been built.)
>>>> You would then need to make decisions during the optimization:
>>>> 1. Your new initializer would the composite of all the individual
>>>> initializers and the valueJoiners.
>>>> 2. I am having a hard time thinking about how you would turn the
>>>> aggregators and valueJoiners into an aggregator that would work on the
>>>> final object, but this may be possible.
>>>> 3. Which state store would you use? The ones declared would be for the
>>>> aggregate values. None of the declared ones would be guaranteed to hold
>>> the
>>>> final object. This would mean you must created a new state store and not
>>>> created any of the declared ones.
>>>> 
>>>> The main argument I have against it is even if it could be done I don't
>>>> know that we would want to have this be an optimization in the
>>> background
>>>> because the user would still be required to think about all of the
>>>> intermediate values that they shouldn't need to worry about if they only
>>>> care about the final object.
>>>> 
>>>> In my opinion cogroup is a common enough case that it should be part of
>>> the
>>>> composable pieces (group, aggregate, join) because we want to allow
>>> people
>>>> to join more than 2 or more streams in an easy way. Right now I don't
>>> think
>>>> we give them ways of handling this use case easily.
>>>> 
>>>> *-I think there will be scope for several such optimizations in the
>>> future
>>>> and perhaps at some point we need to think about decoupling the 1:1
>>> mapping
>>>> from the DSL into the physical topology.*
>>>> I would argue that cogroup is not just an optimization it is a new way
>>> for
>>>> the users to look at accomplishing a problem that requires multiple
>>>> streams. I may sound like a broken record but I don't think users should
>>>> have to build the N-1 intermediate tables and deal with their
>>> initializers,
>>>> serdes and stores if all they care about is the final object.
>>>> Now if for example someone uses cogroup but doesn't supply additional
>>>> streams and aggregators this case is equivalent to a single grouped
>>> stream
>>>> making an aggregate call. This case is what I view an optimization as,
>>> we
>>>> could remove the KStreamCogroup and act as if there was just a call to
>>>> KGroupedStream#aggregate instead of calling KGroupedStream#cogroup. (I
>>>> would prefer to just write a warning saying that this is not how
>>> cogroup is
>>>> to be used.)
>>>> 
>>>> Thanks,
>>>> Kyle
>>>> 
>>>> On Sun, May 7, 2017 at 5:41 AM, Eno Thereska <eno.there...@gmail.com>
>>> wrote:
>>>> 
>>>>> Hi Kyle,
>>>>> 
>>>>> Thanks for the KIP again. A couple of comments:
>>>>> 
>>>>> - minor: could you add an exact example (similar to what Jay’s example
>>> is,
>>>>> or like your Spark/Pig pointers had) to make this super concrete?
>>>>> 
>>>>> - my main concern is that we’re exposing this optimization to the DSL.
>>> In
>>>>> an ideal world, an optimizer would take the existing DSL and do the
>>> right
>>>>> thing under the covers (create just one state store, arrange the nodes
>>>>> etc). The original DSL had a bunch of small, composable pieces (group,
>>>>> aggregate, join) that this proposal groups together. I’d like to hear
>>> your
>>>>> thoughts on whether it’s possible to do this optimization with the
>>> current
>>>>> DSL, at the topology builder level.
>>>>> 
>>>>> I think there will be scope for several such optimizations in the
>>> future
>>>>> and perhaps at some point we need to think about decoupling the 1:1
>>> mapping
>>>>> from the DSL into the physical topology.
>>>>> 
>>>>> Thanks
>>>>> Eno
>>>>> 
>>>>>> On May 5, 2017, at 4:39 PM, Jay Kreps <j...@confluent.io> wrote:
>>>>>> 
>>>>>> I haven't digested the proposal but the use case is pretty common. An
>>>>>> example would be the "customer 360" or "unified customer profile" use
>>>>> case
>>>>>> we often use. In that use case you have a dozen systems each of which
>>> has
>>>>>> some information about your customer (account details, settings,
>>> billing
>>>>>> info, customer service contacts, purchase history, etc). Your goal is
>>> to
>>>>>> join/munge these into a single profile record for each customer that
>>> has
>>>>>> all the relevant info in a usable form and is up-to-date with all the
>>>>>> source systems. If you implement that with kstreams as a sequence of
>>>>> joins
>>>>>> i think today we'd fully materialize N-1 intermediate tables. But
>>> clearly
>>>>>> you only need a single stage to group all these things that are
>>> already
>>>>>> co-partitioned. A distributed database would do this under the covers
>>>>> which
>>>>>> is arguably better (at least when it does the right thing) and
>>> perhaps we
>>>>>> could do the same thing but I'm not sure we know the partitioning so
>>> we
>>>>> may
>>>>>> need an explicit cogroup command that impllies they are already
>>>>>> co-partitioned.
>>>>>> 
>>>>>> -Jay
>>>>>> 
>>>>>> On Fri, May 5, 2017 at 5:56 AM, Kyle Winkelman <
>>> winkelman.k...@gmail.com
>>>>>> 
>>>>>> wrote:
>>>>>> 
>>>>>>> Yea thats a good way to look at it.
>>>>>>> I have seen this type of functionality in a couple other platforms
>>> like
>>>>>>> spark and pig.
>>>>>>> https://spark.apache.org/docs/0.6.2/api/core/spark/
>>>>> PairRDDFunctions.html
>>>>>>> https://www.tutorialspoint.com/apache_pig/apache_pig_
>>>>> cogroup_operator.htm
>>>>>>> 
>>>>>>> 
>>>>>>> On May 5, 2017 7:43 AM, "Damian Guy" <damian....@gmail.com> wrote:
>>>>>>> 
>>>>>>>> Hi Kyle,
>>>>>>>> 
>>>>>>>> If i'm reading this correctly it is like an N way outer join? So an
>>>>> input
>>>>>>>> on any stream will always produce a new aggregated value - is that
>>>>>>> correct?
>>>>>>>> Effectively, each Aggregator just looks up the current value,
>>>>> aggregates
>>>>>>>> and forwards the result.
>>>>>>>> I need to look into it and think about it a bit more, but it seems
>>> like
>>>>>>> it
>>>>>>>> could be a useful optimization.
>>>>>>>> 
>>>>>>>> On Thu, 4 May 2017 at 23:21 Kyle Winkelman <
>>> winkelman.k...@gmail.com>
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> I sure can. I have added the following description to my KIP. If
>>> this
>>>>>>>>> doesn't help let me know and I will take some more time to build a
>>>>>>>> diagram
>>>>>>>>> and make more of a step by step description:
>>>>>>>>> 
>>>>>>>>> Example with Current API:
>>>>>>>>> 
>>>>>>>>> KTable<K, V1> table1 =
>>>>>>>>> builder.stream("topic1").groupByKey().aggregate(initializer1,
>>>>>>>> aggregator1,
>>>>>>>>> aggValueSerde1, storeName1);
>>>>>>>>> KTable<K, V2> table2 =
>>>>>>>>> builder.stream("topic2").groupByKey().aggregate(initializer2,
>>>>>>>> aggregator2,
>>>>>>>>> aggValueSerde2, storeName2);
>>>>>>>>> KTable<K, V3> table3 =
>>>>>>>>> builder.stream("topic3").groupByKey().aggregate(initializer3,
>>>>>>>> aggregator3,
>>>>>>>>> aggValueSerde3, storeName3);
>>>>>>>>> KTable<K, CG> cogrouped = table1.outerJoin(table2,
>>>>>>>>> joinerOneAndTwo).outerJoin(table3, joinerOneTwoAndThree);
>>>>>>>>> 
>>>>>>>>> As you can see this creates 3 StateStores, requires 3 initializers,
>>>>>>> and 3
>>>>>>>>> aggValueSerdes. This also adds the pressure to user to define what
>>> the
>>>>>>>>> intermediate values are going to be (V1, V2, V3). They are left
>>> with a
>>>>>>>>> couple choices, first to make V1, V2, and V3 all the same as CG and
>>>>> the
>>>>>>>> two
>>>>>>>>> joiners are more like mergers, or second make them intermediate
>>> states
>>>>>>>> such
>>>>>>>>> as Topic1Map, Topic2Map, and Topic3Map and the joiners use those to
>>>>>>> build
>>>>>>>>> the final aggregate CG value. This is something the user could
>>> avoid
>>>>>>>>> thinking about with this KIP.
>>>>>>>>> 
>>>>>>>>> When a new input arrives lets say at "topic1" it will first go
>>> through
>>>>>>> a
>>>>>>>>> KStreamAggregate grabbing the current aggregate from storeName1. It
>>>>>>> will
>>>>>>>>> produce this in the form of the first intermediate value and get
>>> sent
>>>>>>>>> through a KTableKTableOuterJoin where it will look up the current
>>>>> value
>>>>>>>> of
>>>>>>>>> the key in storeName2. It will use the first joiner to calculate
>>> the
>>>>>>>> second
>>>>>>>>> intermediate value, which will go through an additional
>>>>>>>>> KTableKTableOuterJoin. Here it will look up the current value of
>>> the
>>>>>>> key
>>>>>>>> in
>>>>>>>>> storeName3 and use the second joiner to build the final aggregate
>>>>>>> value.
>>>>>>>>> 
>>>>>>>>> If you think through all possibilities for incoming topics you will
>>>>> see
>>>>>>>>> that no matter which topic it comes in through all three stores are
>>>>>>>> queried
>>>>>>>>> and all of the joiners must get used.
>>>>>>>>> 
>>>>>>>>> Topology wise for N incoming streams this creates N
>>>>>>>>> KStreamAggregates, 2*(N-1) KTableKTableOuterJoins, and N-1
>>>>>>>>> KTableKTableJoinMergers.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Example with Proposed API:
>>>>>>>>> 
>>>>>>>>> KGroupedStream<K, V1> grouped1 = builder.stream("topic1").
>>>>>>> groupByKey();
>>>>>>>>> KGroupedStream<K, V2> grouped2 = builder.stream("topic2").
>>>>>>> groupByKey();
>>>>>>>>> KGroupedStream<K, V3> grouped3 = builder.stream("topic3").
>>>>>>> groupByKey();
>>>>>>>>> KTable<K, CG> cogrouped = grouped1.cogroup(initializer1,
>>> aggregator1,
>>>>>>>>> aggValueSerde1, storeName1)
>>>>>>>>>      .cogroup(grouped2, aggregator2)
>>>>>>>>>      .cogroup(grouped3, aggregator3)
>>>>>>>>>      .aggregate();
>>>>>>>>> 
>>>>>>>>> As you can see this creates 1 StateStore, requires 1 initializer,
>>> and
>>>>> 1
>>>>>>>>> aggValueSerde. The user no longer has to worry about the
>>> intermediate
>>>>>>>>> values and the joiners. All they have to think about is how each
>>>>> stream
>>>>>>>>> impacts the creation of the final CG object.
>>>>>>>>> 
>>>>>>>>> When a new input arrives lets say at "topic1" it will first go
>>> through
>>>>>>> a
>>>>>>>>> KStreamAggreagte and grab the current aggregate from storeName1. It
>>>>>>> will
>>>>>>>>> add its incoming object to the aggregate, update the store and pass
>>>>> the
>>>>>>>> new
>>>>>>>>> aggregate on. This new aggregate goes through the KStreamCogroup
>>> which
>>>>>>> is
>>>>>>>>> pretty much just a pass through processor and you are done.
>>>>>>>>> 
>>>>>>>>> Topology wise for N incoming streams the new api will only every
>>>>>>> create N
>>>>>>>>> KStreamAggregates and 1 KStreamCogroup.
>>>>>>>>> 
>>>>>>>>> On Thu, May 4, 2017 at 4:42 PM, Matthias J. Sax <
>>>>> matth...@confluent.io
>>>>>>>> 
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Kyle,
>>>>>>>>>> 
>>>>>>>>>> thanks a lot for the KIP. Maybe I am a little slow, but I could
>>> not
>>>>>>>>>> follow completely. Could you maybe add a more concrete example,
>>> like
>>>>>>> 3
>>>>>>>>>> streams with 3 records each (plus expected result), and show the
>>>>>>>>>> difference between current way to to implement it and the proposed
>>>>>>> API?
>>>>>>>>>> This could also cover the internal processing to see what store
>>> calls
>>>>>>>>>> would be required for both approaches etc.
>>>>>>>>>> 
>>>>>>>>>> I think, it's pretty advanced stuff you propose, and it would
>>> help to
>>>>>>>>>> understand it better.
>>>>>>>>>> 
>>>>>>>>>> Thanks a lot!
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> -Matthias
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> On 5/4/17 11:39 AM, Kyle Winkelman wrote:
>>>>>>>>>>> I have made a pull request. It can be found here.
>>>>>>>>>>> 
>>>>>>>>>>> https://github.com/apache/kafka/pull/2975
>>>>>>>>>>> 
>>>>>>>>>>> I plan to write some more unit tests for my classes and get
>>> around
>>>>>>> to
>>>>>>>>>>> writing documentation for the public api additions.
>>>>>>>>>>> 
>>>>>>>>>>> One thing I was curious about is during the
>>>>>>>>>> KCogroupedStreamImpl#aggregate
>>>>>>>>>>> method I pass null to the KGroupedStream#repartitionIfRequired
>>>>>>>> method.
>>>>>>>>> I
>>>>>>>>>>> can't supply the store name because if more than one grouped
>>> stream
>>>>>>>>>>> repartitions an error is thrown. Is there some name that someone
>>>>>>> can
>>>>>>>>>>> recommend or should I leave the null and allow it to fall back to
>>>>>>> the
>>>>>>>>>>> KGroupedStream.name?
>>>>>>>>>>> 
>>>>>>>>>>> Should this be expanded to handle grouped tables? This would be
>>>>>>>> pretty
>>>>>>>>>> easy
>>>>>>>>>>> for a normal aggregate but one allowing session stores and
>>> windowed
>>>>>>>>>> stores
>>>>>>>>>>> would required KTableSessionWindowAggregate and
>>>>>>> KTableWindowAggregate
>>>>>>>>>>> implementations.
>>>>>>>>>>> 
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Kyle
>>>>>>>>>>> 
>>>>>>>>>>> On May 4, 2017 1:24 PM, "Eno Thereska" <eno.there...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> I’ll look as well asap, sorry, been swamped.
>>>>>>>>>>>> 
>>>>>>>>>>>> Eno
>>>>>>>>>>>>> On May 4, 2017, at 6:17 PM, Damian Guy <damian....@gmail.com>
>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi Kyle,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks for the KIP. I apologize that i haven't had the chance
>>> to
>>>>>>>> look
>>>>>>>>>> at
>>>>>>>>>>>>> the KIP yet, but will schedule some time to look into it
>>>>>>> tomorrow.
>>>>>>>>> For
>>>>>>>>>>>> the
>>>>>>>>>>>>> implementation, can you raise a PR against kafka trunk and mark
>>>>>>> it
>>>>>>>> as
>>>>>>>>>>>> WIP?
>>>>>>>>>>>>> It will be easier to review what you have done.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Damian
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Thu, 4 May 2017 at 11:50 Kyle Winkelman <
>>>>>>>> winkelman.k...@gmail.com
>>>>>>>>>> 
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I am replying to this in hopes it will draw some attention to
>>> my
>>>>>>>> KIP
>>>>>>>>>> as
>>>>>>>>>>>> I
>>>>>>>>>>>>>> haven't heard from anyone in a couple days. This is my first
>>> KIP
>>>>>>>> and
>>>>>>>>>> my
>>>>>>>>>>>>>> first large contribution to the project so I'm sure I did
>>>>>>>> something
>>>>>>>>>>>> wrong.
>>>>>>>>>>>>>> ;)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On May 1, 2017 4:18 PM, "Kyle Winkelman" <
>>>>>>>> winkelman.k...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I have created KIP-150 to facilitate discussion about adding
>>>>>>>>> cogroup
>>>>>>>>>> to
>>>>>>>>>>>>>>> the streams DSL.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Please find the KIP here:
>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>>>>>> 150+-+Kafka-Streams+Cogroup
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Please find my initial implementation here:
>>>>>>>>>>>>>>> https://github.com/KyleWinkelman/kafka
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>> Kyle Winkelman
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>>>> 
>>> 
>>> 

Reply via email to