Thanks Kyle, good arguments.

Eno

> On May 7, 2017, at 5:06 PM, Kyle Winkelman <winkelman.k...@gmail.com> wrote:
> 
> *- minor: could you add an exact example (similar to what Jay’s example is,
> or like your Spark/Pig pointers had) to make this super concrete?*
> I have added a more concrete example to the KIP.
> 
> *- my main concern is that we’re exposing this optimization to the DSL. In
> an ideal world, an optimizer would take the existing DSL and do the right
> thing under the covers (create just one state store, arrange the nodes
> etc). The original DSL had a bunch of small, composable pieces (group,
> aggregate, join) that this proposal groups together. I’d like to hear your
> thoughts on whether it’s possible to do this optimization with the current
> DSL, at the topology builder level.*
> You would have to make a lot of checks to understand if it is even possible
> to make this optimization:
> 1. Make sure they are all KTableKTableOuterJoins
> 2. None of the intermediate KTables are used for anything else.
> 3. None of the intermediate stores are used. (This may be impossible
> especially if they use KafkaStreams#store after the topology has already
> been built.)
> You would then need to make decisions during the optimization:
> 1. Your new initializer would the composite of all the individual
> initializers and the valueJoiners.
> 2. I am having a hard time thinking about how you would turn the
> aggregators and valueJoiners into an aggregator that would work on the
> final object, but this may be possible.
> 3. Which state store would you use? The ones declared would be for the
> aggregate values. None of the declared ones would be guaranteed to hold the
> final object. This would mean you must created a new state store and not
> created any of the declared ones.
> 
> The main argument I have against it is even if it could be done I don't
> know that we would want to have this be an optimization in the background
> because the user would still be required to think about all of the
> intermediate values that they shouldn't need to worry about if they only
> care about the final object.
> 
> In my opinion cogroup is a common enough case that it should be part of the
> composable pieces (group, aggregate, join) because we want to allow people
> to join more than 2 or more streams in an easy way. Right now I don't think
> we give them ways of handling this use case easily.
> 
> *-I think there will be scope for several such optimizations in the future
> and perhaps at some point we need to think about decoupling the 1:1 mapping
> from the DSL into the physical topology.*
> I would argue that cogroup is not just an optimization it is a new way for
> the users to look at accomplishing a problem that requires multiple
> streams. I may sound like a broken record but I don't think users should
> have to build the N-1 intermediate tables and deal with their initializers,
> serdes and stores if all they care about is the final object.
> Now if for example someone uses cogroup but doesn't supply additional
> streams and aggregators this case is equivalent to a single grouped stream
> making an aggregate call. This case is what I view an optimization as, we
> could remove the KStreamCogroup and act as if there was just a call to
> KGroupedStream#aggregate instead of calling KGroupedStream#cogroup. (I
> would prefer to just write a warning saying that this is not how cogroup is
> to be used.)
> 
> Thanks,
> Kyle
> 
> On Sun, May 7, 2017 at 5:41 AM, Eno Thereska <eno.there...@gmail.com> wrote:
> 
>> Hi Kyle,
>> 
>> Thanks for the KIP again. A couple of comments:
>> 
>> - minor: could you add an exact example (similar to what Jay’s example is,
>> or like your Spark/Pig pointers had) to make this super concrete?
>> 
>> - my main concern is that we’re exposing this optimization to the DSL. In
>> an ideal world, an optimizer would take the existing DSL and do the right
>> thing under the covers (create just one state store, arrange the nodes
>> etc). The original DSL had a bunch of small, composable pieces (group,
>> aggregate, join) that this proposal groups together. I’d like to hear your
>> thoughts on whether it’s possible to do this optimization with the current
>> DSL, at the topology builder level.
>> 
>> I think there will be scope for several such optimizations in the future
>> and perhaps at some point we need to think about decoupling the 1:1 mapping
>> from the DSL into the physical topology.
>> 
>> Thanks
>> Eno
>> 
>>> On May 5, 2017, at 4:39 PM, Jay Kreps <j...@confluent.io> wrote:
>>> 
>>> I haven't digested the proposal but the use case is pretty common. An
>>> example would be the "customer 360" or "unified customer profile" use
>> case
>>> we often use. In that use case you have a dozen systems each of which has
>>> some information about your customer (account details, settings, billing
>>> info, customer service contacts, purchase history, etc). Your goal is to
>>> join/munge these into a single profile record for each customer that has
>>> all the relevant info in a usable form and is up-to-date with all the
>>> source systems. If you implement that with kstreams as a sequence of
>> joins
>>> i think today we'd fully materialize N-1 intermediate tables. But clearly
>>> you only need a single stage to group all these things that are already
>>> co-partitioned. A distributed database would do this under the covers
>> which
>>> is arguably better (at least when it does the right thing) and perhaps we
>>> could do the same thing but I'm not sure we know the partitioning so we
>> may
>>> need an explicit cogroup command that impllies they are already
>>> co-partitioned.
>>> 
>>> -Jay
>>> 
>>> On Fri, May 5, 2017 at 5:56 AM, Kyle Winkelman <winkelman.k...@gmail.com
>>> 
>>> wrote:
>>> 
>>>> Yea thats a good way to look at it.
>>>> I have seen this type of functionality in a couple other platforms like
>>>> spark and pig.
>>>> https://spark.apache.org/docs/0.6.2/api/core/spark/
>> PairRDDFunctions.html
>>>> https://www.tutorialspoint.com/apache_pig/apache_pig_
>> cogroup_operator.htm
>>>> 
>>>> 
>>>> On May 5, 2017 7:43 AM, "Damian Guy" <damian....@gmail.com> wrote:
>>>> 
>>>>> Hi Kyle,
>>>>> 
>>>>> If i'm reading this correctly it is like an N way outer join? So an
>> input
>>>>> on any stream will always produce a new aggregated value - is that
>>>> correct?
>>>>> Effectively, each Aggregator just looks up the current value,
>> aggregates
>>>>> and forwards the result.
>>>>> I need to look into it and think about it a bit more, but it seems like
>>>> it
>>>>> could be a useful optimization.
>>>>> 
>>>>> On Thu, 4 May 2017 at 23:21 Kyle Winkelman <winkelman.k...@gmail.com>
>>>>> wrote:
>>>>> 
>>>>>> I sure can. I have added the following description to my KIP. If this
>>>>>> doesn't help let me know and I will take some more time to build a
>>>>> diagram
>>>>>> and make more of a step by step description:
>>>>>> 
>>>>>> Example with Current API:
>>>>>> 
>>>>>> KTable<K, V1> table1 =
>>>>>> builder.stream("topic1").groupByKey().aggregate(initializer1,
>>>>> aggregator1,
>>>>>> aggValueSerde1, storeName1);
>>>>>> KTable<K, V2> table2 =
>>>>>> builder.stream("topic2").groupByKey().aggregate(initializer2,
>>>>> aggregator2,
>>>>>> aggValueSerde2, storeName2);
>>>>>> KTable<K, V3> table3 =
>>>>>> builder.stream("topic3").groupByKey().aggregate(initializer3,
>>>>> aggregator3,
>>>>>> aggValueSerde3, storeName3);
>>>>>> KTable<K, CG> cogrouped = table1.outerJoin(table2,
>>>>>> joinerOneAndTwo).outerJoin(table3, joinerOneTwoAndThree);
>>>>>> 
>>>>>> As you can see this creates 3 StateStores, requires 3 initializers,
>>>> and 3
>>>>>> aggValueSerdes. This also adds the pressure to user to define what the
>>>>>> intermediate values are going to be (V1, V2, V3). They are left with a
>>>>>> couple choices, first to make V1, V2, and V3 all the same as CG and
>> the
>>>>> two
>>>>>> joiners are more like mergers, or second make them intermediate states
>>>>> such
>>>>>> as Topic1Map, Topic2Map, and Topic3Map and the joiners use those to
>>>> build
>>>>>> the final aggregate CG value. This is something the user could avoid
>>>>>> thinking about with this KIP.
>>>>>> 
>>>>>> When a new input arrives lets say at "topic1" it will first go through
>>>> a
>>>>>> KStreamAggregate grabbing the current aggregate from storeName1. It
>>>> will
>>>>>> produce this in the form of the first intermediate value and get sent
>>>>>> through a KTableKTableOuterJoin where it will look up the current
>> value
>>>>> of
>>>>>> the key in storeName2. It will use the first joiner to calculate the
>>>>> second
>>>>>> intermediate value, which will go through an additional
>>>>>> KTableKTableOuterJoin. Here it will look up the current value of the
>>>> key
>>>>> in
>>>>>> storeName3 and use the second joiner to build the final aggregate
>>>> value.
>>>>>> 
>>>>>> If you think through all possibilities for incoming topics you will
>> see
>>>>>> that no matter which topic it comes in through all three stores are
>>>>> queried
>>>>>> and all of the joiners must get used.
>>>>>> 
>>>>>> Topology wise for N incoming streams this creates N
>>>>>> KStreamAggregates, 2*(N-1) KTableKTableOuterJoins, and N-1
>>>>>> KTableKTableJoinMergers.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> Example with Proposed API:
>>>>>> 
>>>>>> KGroupedStream<K, V1> grouped1 = builder.stream("topic1").
>>>> groupByKey();
>>>>>> KGroupedStream<K, V2> grouped2 = builder.stream("topic2").
>>>> groupByKey();
>>>>>> KGroupedStream<K, V3> grouped3 = builder.stream("topic3").
>>>> groupByKey();
>>>>>> KTable<K, CG> cogrouped = grouped1.cogroup(initializer1, aggregator1,
>>>>>> aggValueSerde1, storeName1)
>>>>>>       .cogroup(grouped2, aggregator2)
>>>>>>       .cogroup(grouped3, aggregator3)
>>>>>>       .aggregate();
>>>>>> 
>>>>>> As you can see this creates 1 StateStore, requires 1 initializer, and
>> 1
>>>>>> aggValueSerde. The user no longer has to worry about the intermediate
>>>>>> values and the joiners. All they have to think about is how each
>> stream
>>>>>> impacts the creation of the final CG object.
>>>>>> 
>>>>>> When a new input arrives lets say at "topic1" it will first go through
>>>> a
>>>>>> KStreamAggreagte and grab the current aggregate from storeName1. It
>>>> will
>>>>>> add its incoming object to the aggregate, update the store and pass
>> the
>>>>> new
>>>>>> aggregate on. This new aggregate goes through the KStreamCogroup which
>>>> is
>>>>>> pretty much just a pass through processor and you are done.
>>>>>> 
>>>>>> Topology wise for N incoming streams the new api will only every
>>>> create N
>>>>>> KStreamAggregates and 1 KStreamCogroup.
>>>>>> 
>>>>>> On Thu, May 4, 2017 at 4:42 PM, Matthias J. Sax <
>> matth...@confluent.io
>>>>> 
>>>>>> wrote:
>>>>>> 
>>>>>>> Kyle,
>>>>>>> 
>>>>>>> thanks a lot for the KIP. Maybe I am a little slow, but I could not
>>>>>>> follow completely. Could you maybe add a more concrete example, like
>>>> 3
>>>>>>> streams with 3 records each (plus expected result), and show the
>>>>>>> difference between current way to to implement it and the proposed
>>>> API?
>>>>>>> This could also cover the internal processing to see what store calls
>>>>>>> would be required for both approaches etc.
>>>>>>> 
>>>>>>> I think, it's pretty advanced stuff you propose, and it would help to
>>>>>>> understand it better.
>>>>>>> 
>>>>>>> Thanks a lot!
>>>>>>> 
>>>>>>> 
>>>>>>> -Matthias
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> On 5/4/17 11:39 AM, Kyle Winkelman wrote:
>>>>>>>> I have made a pull request. It can be found here.
>>>>>>>> 
>>>>>>>> https://github.com/apache/kafka/pull/2975
>>>>>>>> 
>>>>>>>> I plan to write some more unit tests for my classes and get around
>>>> to
>>>>>>>> writing documentation for the public api additions.
>>>>>>>> 
>>>>>>>> One thing I was curious about is during the
>>>>>>> KCogroupedStreamImpl#aggregate
>>>>>>>> method I pass null to the KGroupedStream#repartitionIfRequired
>>>>> method.
>>>>>> I
>>>>>>>> can't supply the store name because if more than one grouped stream
>>>>>>>> repartitions an error is thrown. Is there some name that someone
>>>> can
>>>>>>>> recommend or should I leave the null and allow it to fall back to
>>>> the
>>>>>>>> KGroupedStream.name?
>>>>>>>> 
>>>>>>>> Should this be expanded to handle grouped tables? This would be
>>>>> pretty
>>>>>>> easy
>>>>>>>> for a normal aggregate but one allowing session stores and windowed
>>>>>>> stores
>>>>>>>> would required KTableSessionWindowAggregate and
>>>> KTableWindowAggregate
>>>>>>>> implementations.
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> Kyle
>>>>>>>> 
>>>>>>>> On May 4, 2017 1:24 PM, "Eno Thereska" <eno.there...@gmail.com>
>>>>> wrote:
>>>>>>>> 
>>>>>>>>> I’ll look as well asap, sorry, been swamped.
>>>>>>>>> 
>>>>>>>>> Eno
>>>>>>>>>> On May 4, 2017, at 6:17 PM, Damian Guy <damian....@gmail.com>
>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>> Hi Kyle,
>>>>>>>>>> 
>>>>>>>>>> Thanks for the KIP. I apologize that i haven't had the chance to
>>>>> look
>>>>>>> at
>>>>>>>>>> the KIP yet, but will schedule some time to look into it
>>>> tomorrow.
>>>>>> For
>>>>>>>>> the
>>>>>>>>>> implementation, can you raise a PR against kafka trunk and mark
>>>> it
>>>>> as
>>>>>>>>> WIP?
>>>>>>>>>> It will be easier to review what you have done.
>>>>>>>>>> 
>>>>>>>>>> Thanks,
>>>>>>>>>> Damian
>>>>>>>>>> 
>>>>>>>>>> On Thu, 4 May 2017 at 11:50 Kyle Winkelman <
>>>>> winkelman.k...@gmail.com
>>>>>>> 
>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> I am replying to this in hopes it will draw some attention to my
>>>>> KIP
>>>>>>> as
>>>>>>>>> I
>>>>>>>>>>> haven't heard from anyone in a couple days. This is my first KIP
>>>>> and
>>>>>>> my
>>>>>>>>>>> first large contribution to the project so I'm sure I did
>>>>> something
>>>>>>>>> wrong.
>>>>>>>>>>> ;)
>>>>>>>>>>> 
>>>>>>>>>>> On May 1, 2017 4:18 PM, "Kyle Winkelman" <
>>>>> winkelman.k...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> Hello all,
>>>>>>>>>>>> 
>>>>>>>>>>>> I have created KIP-150 to facilitate discussion about adding
>>>>>> cogroup
>>>>>>> to
>>>>>>>>>>>> the streams DSL.
>>>>>>>>>>>> 
>>>>>>>>>>>> Please find the KIP here:
>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>>> 150+-+Kafka-Streams+Cogroup
>>>>>>>>>>>> 
>>>>>>>>>>>> Please find my initial implementation here:
>>>>>>>>>>>> https://github.com/KyleWinkelman/kafka
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Kyle Winkelman
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> 
>> 

Reply via email to