Hi Gouzhang,

I am not sure what you mean by "Fields from different streams are never
aggregated together", this certainly can be the case but not the general
rule. If we want to take care of the special cases where the key-sets are
disjoint for each stream then they can be given no-op operators. This would
have the same effect as a stitching join as the function to update the
store would have to be defined either way, even to just place it in.

Now if we look at it from the other way, if we only specify the multiway
join then the user will need to aggregate each stream. Then they must do
the join which either will involve aggregators and value joiners or some
questionable optimization that would rely on each aggregator defined for a
grouped stream meshing together. And this would all have to happen inside
KStream.

I do agree that there are optimizations that can be done on joining
multiple tables per your example, in both cases whether it be a "stitching
join" or not. But I do not think the place to do it is in Streams. This
could be relatively easy to accomplish. I think we save ourselves pain if
we consider the tables and streams as separate cases, as aggregating
multiple streams into one KTable can be done more efficiently than making
multiple KTables and then joining them together. We may be able to get
around this in the case of a stitching join but I am not sure how we could
do it safely otherwise.

Walker





On Mon, Oct 28, 2019 at 6:26 PM Guozhang Wang <wangg...@gmail.com> wrote:

> Hi Walker,
>
> This is a good point about compatibility breakage while overloading the
> existing classes; while reading John and your exchanges, I think I still
> need to clarify the motivations a bit more:
>
> 1) Multiple streams need to be aggregated together, inputs are always
> *KStreams* and end result is a *KTable*.
> 2) Fields from different streams are never aggregated together, i.e. on the
> higher level it is more like a "stitching up" the fields and then doing a
> single aggregation.
>
> In this context, I agree with you that it is still a streams-aggregation
> operator that we are trying to optimize (though its a multi-way), not a
> multi-way table-table-join operator that we are tying to optimize here.
>
>
> -----------------
>
> But now taking a step back looking at it, I'm wondering, because of 2) that
> all input streams do not have overlapping fields, we can generalize this to
> a broader scope. Consider this case for example:
>
> table1 = builder.table("topic1");
> table2 = builder.table("topic2");
> table3 = builder.table("topic3");
> table4 = table1.join(table2).join(table3);
>
> Suppose the join operations do not take out any fields or add any new
> fields, i.e. say table1 has fields A, table2 has fields B, and table2 has
> fields C besides the key K, the table 4 has field {A, B, C} --- the join is
> just "stitching up" the fields --- then the above topology can actually be
> optimized in a similar way:
>
> * we only keep one materialized store in the form of K -> {A, B, C} as the
> materialized store of the final join result of table4.
> * when a record comes in from table1/2/3, just query the store on K, and
> then update the corresponding A/B/C field and then writes back to the
> store.
>
>
> Then the above streams-aggregation operator can be treated as a special
> case of this: you first aggregate separately on stream1/2/3 and generate
> table1/2/3, and then do this "stitching join", behind the scene we can
> optimize the topology to do exactly the co-group logic by updating the
> second bullet point above as an aggregation operator:
>
> * when a record comes in from *stream1/2/3*, just query the store on K, and
> then update the corresponding A/B/C field *with an aggregator *and then
> writes back to the store.
>
> -----------------
>
> Personally I think this is better because with 1) larger applicable scope,
> and 2) without introducing new interfaces. But of course on the other side
> it requires us to do this optimization inside the Streams with some syntax
> hint from users (for example, users need to specify it is a "stitching
> join" such that all fields are still preserved in the join result). WDYT?
>
>
> Guozhang
>
>
> On Mon, Oct 28, 2019 at 4:20 PM Walker Carlson <wcarl...@confluent.io>
> wrote:
>
> > Hi John,
> >
> > Thank you for the background information. I think I understand your
> point.
> >
> > I believe that this could be fixed by making the motivation a little
> > clearer in the KIP.  I think that the motivation is when you have
> multiple
> > streams that need to aggregate together to form a single object the
> > current, non optimal, way to do this is through a multiway table join.
> This
> > is a little hacky. There is a slight but significant difference in these
> > cases, as in the null value handling you pointed out.
> >
> > For the example in the motivation, these tables were grouped streams so
> > they already dropped the null values. If we consider Cogroup sitting in
> the
> > same grey area that KGroupedStream does it should also behave this way.
> If
> > you think about it that way it is more of an extension of KGroupedStream
> > than KTable or KStream. Therefore I handle null values the same way
> > KGroupedStream#aggregate does.
> >
> > Looking back I am not sure I understood you previous question fully at
> the
> > time. I am sorry if my answer caused any confusion!
> >
> > Walker
> >
> > On Mon, Oct 28, 2019 at 2:49 PM John Roesler <j...@confluent.io> wrote:
> >
> > > Hi Walker,
> > >
> > > Sorry for the delay in responding. Thanks for your response earlier.
> > >
> > > I think there might be a subtlety getting overlooked in considering
> > > whether we're talking about streams versus tables in cogroup. As I'm
> > > sure you know, Kafka Streams treats "stream" records as independent,
> > > immutable, and opaque "facts", whereas we treat "table" records as a
> > > sequence of updates to an entity identified by the record key (where
> > > "update" means that each record's value represents the new state after
> > > applying the update). For the most part, this is a clean separation,
> > > but there is one special case where records with a "null" value are
> > > interpreted as a tombstone in the table context (i.e., the record
> > > indicates not that the new value of the entity is "null", but rather
> > > that the entity has been deleted). In contrast, a record with a null
> > > value in the stream context is _just_ a record with a null value; no
> > > special semantics.
> > >
> > > The difficulty is that these two semantics clash at the stream/table
> > > boundary. So, operations that convert streams to tables (like
> > > KGroupedStream#aggregate) have to cope with ambiguity about whether to
> > > treat null values opaquely as null values, or as tombstones. I think
> > > I'll make a long story short and just say that this is a very, very
> > > complex issue. As a result (and as a bit of a punt), our
> > > KGroupedStream operations actually just discard null-valued records.
> > > This means that the following are _not_ equivalent programs:
> > >
> > > table1 =
> > >   stream<Id,Record>("records")
> > >     .filter(Record::isOk)
> > >     .groupByKey()
> > >     .aggregate(() -> new Record(), (key, value, agg) -> value)
> > > table2 =
> > >   table<Id,Record>("record")
> > >     .filter(Record::isOk)
> > >
> > > They look about the same, in that they'll both produce a
> > > KTable<Id,Record> with the value being the latest state. But if a
> > > record is deleted in the upstream data (represented as a "null"
> > > value), that record would also be deleted in table2, but not in
> > > table1. Table1 would just perpetually contain the value immediately
> > > prior to the delete.
> > >
> > > This is why it makes me nervous to propose a new kind of _stream_
> > > operation ostensibly in order to solve a problem that presents itself
> > > in the _table_ context.
> > >
> > > If the goal is to provide a more efficient and convenient multi-way
> > > KTable join, I think it would be a good idea to consider an extension
> > > to the KTable API, not the KStream API. On the other hand, if this is
> > > not the goal, then the motivation of the KIP shouldn't say that it is.
> > > Instead, the KIP could provide some other motivation specifically for
> > > augmenting the KStream API.
> > >
> > > There is a third alternative that comes to mind, if you wish to
> > > resolve the long-standing dilemma around this semantic problem and
> > > specify in the KIP how exactly nulls are handled in this operator. But
> > > (although this seems on the face to be a good option), I think it
> > > might be a briarpatch. Even if we are able to reach a suitable design,
> > > we'd have to contend with the fact that it looks like the
> > > KGroupedStream API, but behaves differently.
> > >
> > > What do you think about all this?
> > >
> > > Thanks again for the KIP and the discussion!
> > > -John
> > >
> > > On Mon, Oct 28, 2019 at 3:32 PM Walker Carlson <wcarl...@confluent.io>
> > > wrote:
> > > >
> > > > Hi Gouzhang,
> > > >
> > > > Matthias and I did talk about overloading different a type of
> aggregate
> > > > methods in the cogroup that would take in the windows and returns a
> > > > windowed KTable. We decided that it would break too much with the
> > current
> > > > pattern that was established in the normal KStream. We can revisit
> this
> > > if
> > > > you have a different opinion on the tradeoff.
> > > >
> > > > Walker
> > > >
> > > > On Mon, Oct 28, 2019 at 12:14 PM Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Walker,
> > > > >
> > > > > On Fri, Oct 25, 2019 at 1:34 PM Walker Carlson <
> > wcarl...@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > Hi Guozhang,
> > > > > >
> > > > > > 1. I am familiar with the cogroup of spark, it is very similar to
> > > > > > their join operator but instead it makes the values iterable. I
> > think
> > > > > that
> > > > > > the use cases are different enough that it makes sense to specify
> > the
> > > > > > aggregator when we do.
> > > > > >
> > > > > > I like the idea of "absorb" and I think it could be useful.
> > Although
> > > I do
> > > > > > not think it is as intuitive.
> > > > > >
> > > > > > If we were to go that route we would either use more processors
> or
> > do
> > > > > > essentially the same thing but would have to store the
> information
> > > > > > required to cogroup inside that KTable. I think this would
> violate
> > > some
> > > > > > design principles. I would argue that we should consider adding
> > > absorb as
> > > > > > well and auto re-write it to use cogroup.
> > > > > >
> > > > >
> > > > > Yeah I think I agree with you about the internal design complexity
> > with
> > > > > "absorb"; I was primarily thinking if we can save ourselves from
> > > > > introducing 3 more public classes with co-group. But it seems that
> > > without
> > > > > introducing new classes there's no easy way for us to bound the
> scope
> > > of
> > > > > co-grouping (like how many streams will be co-grouped together).
> > > > >
> > > > > LMK if you have some better ideas: generally speaking the less new
> > > public
> > > > > interfaces we are introducing to fulfill a new feature the better,
> so
> > > I'd
> > > > > push us to think twice and carefully before we go down the route.
> > > > >
> > > > >
> > > > > >
> > > > > > 2. We have not considered this thought that would be a convenient
> > > > > > operation.
> > > > > >
> > > > > > 3. There is only one processor made. We are actually having the
> > > naming
> > > > > > conversation right now in the above thread
> > > > > >
> > > > > > 4, 5. fair points
> > > > > >
> > > > > > Walker
> > > > > >
> > > > > > On Fri, Oct 25, 2019 at 11:58 AM Guozhang Wang <
> wangg...@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Hi Walker, thanks for the KIP! I made a pass on the writeup and
> > > have
> > > > > some
> > > > > > > comments below:
> > > > > > >
> > > > > > > Meta:
> > > > > > >
> > > > > > > 1. Syntax-wise, I'm wondering if we have compared our current
> > > proposal
> > > > > > with
> > > > > > > Spark's co-group syntax (I know they are targeting for
> different
> > > use
> > > > > > cases,
> > > > > > > but wondering if their syntax is closer to the join operator),
> > > what are
> > > > > > the
> > > > > > > syntax / semantics trade-off here?
> > > > > > >
> > > > > > > Just playing a devil's advocate here, if the main motivation is
> > to
> > > > > > provide
> > > > > > > a more convienent multi-way join syntax, and in order to only
> > have
> > > one
> > > > > > > materialized store we need to specify the final joined format
> at
> > > the
> > > > > > > beginning, then what about the following alternative (with the
> > > given
> > > > > > > example in your wiki page):
> > > > > > >
> > > > > > >
> > > > > > > KGroupedStream<K, V1> grouped1 =
> > > builder.stream("topic1").groupByKey();
> > > > > > > KGroupedStream<K, V2> grouped2 =
> > > builder.stream("topic2").groupByKey();
> > > > > > > KGroupedStream<K, V3> grouped3 =
> > > builder.stream("topic3").groupByKey();
> > > > > > >
> > > > > > > KTable<K, CG> aggregated = grouped1.aggregate(initializer,
> > > > > materialized,
> > > > > > > aggregator1);
> > > > > > >
> > > > > > > aggregated.absorb(grouped2, aggregator2);  // I'm just using a
> > > random
> > > > > > name
> > > > > > > on top of my head here
> > > > > > >                   .absorb(grouped3, aggregator3);
> > > > > > >
> > > > > > > In this way, we just add a new API to the KTable to "absorb"
> new
> > > > > streams
> > > > > > as
> > > > > > > aggregated results without needing to introduce new first
> citizen
> > > > > > classes.
> > > > > > >
> > > > > > > 2. From the DSL optimization, have we considered if we can auto
> > > > > re-write
> > > > > > > the user written old fashioned multi-join into this new DSL
> > > operator?
> > > > > > >
> > > > > > > 3. Although it is not needed for the wiki page itself, for
> > internal
> > > > > > > implementation how many processor nodes would we create for the
> > new
> > > > > > > operator, and how we can allow users to name them?
> > > > > > >
> > > > > > > Minor:
> > > > > > >
> > > > > > > 4. In "Public Interfaces", better add the templated generics to
> > > > > > > "KGroupedStream" as "KGroupedStream<K, V>".
> > > > > > >
> > > > > > > 5. Naming wise, I'd suggest we keep the "K" together with
> > > Stream/Table,
> > > > > > > e.g. "TimeWindowed*CogroupedKStream*<K, V>".
> > > > > > >
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Oct 24, 2019 at 11:43 PM Matthias J. Sax <
> > > > > matth...@confluent.io>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Walker,
> > > > > > > >
> > > > > > > > I am not sure if I can follow your argument. What do you
> > exactly
> > > mean
> > > > > > by
> > > > > > > >
> > > > > > > > > I also
> > > > > > > > >> think that in this case it would be better to separate
> the 2
> > > > > option
> > > > > > > out
> > > > > > > > >> into separate overloads.
> > > > > > > >
> > > > > > > > Maybe you can give an example what method signature you have
> in
> > > mind?
> > > > > > > >
> > > > > > > > >> We could take a named parameter from upstream or add an
> > extra
> > > > > naming
> > > > > > > > option
> > > > > > > > >> however I don't really see the advantage that would give.
> > > > > > > >
> > > > > > > > Are you familiar with KIP-307? Before KIP-307, KS generated
> all
> > > names
> > > > > > > > for all Processors. This makes it hard to reason about a
> > > Topology if
> > > > > > > > it's getting complex. Adding `Named` to the new co-group
> > operator
> > > > > would
> > > > > > > > actually align with KIP-307.
> > > > > > > >
> > > > > > > > > It seems to go in
> > > > > > > > >> the opposite direction from the cogroup configuration idea
> > you
> > > > > > > proposed.
> > > > > > > >
> > > > > > > > Can you elaborate? Not sure if I can follow.
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > -Matthias
> > > > > > > >
> > > > > > > >
> > > > > > > > On 10/24/19 10:20 AM, Walker Carlson wrote:
> > > > > > > > > While I like the idea Sophie I don't think that it is
> > > necessary. I
> > > > > > also
> > > > > > > > > think that in this case it would be better to separate the
> 2
> > > option
> > > > > > out
> > > > > > > > > into separate overloads.
> > > > > > > > > We could take a named parameter from upstream or add an
> extra
> > > > > naming
> > > > > > > > option
> > > > > > > > > however I don't really see the advantage that would give.
> It
> > > seems
> > > > > to
> > > > > > > go
> > > > > > > > in
> > > > > > > > > the opposite direction from the cogroup configuration idea
> > you
> > > > > > > proposed.
> > > > > > > > >
> > > > > > > > > John, I think it could be both. It depends on when you
> > > aggregate
> > > > > and
> > > > > > > what
> > > > > > > > > kind of data you have. In the example it is aggregating
> > before
> > > > > > joining,
> > > > > > > > > there are probably some cases where you could join before
> > > > > > aggregating.
> > > > > > > > IMHO
> > > > > > > > > it would be easier to group all the streams together then
> > > perform
> > > > > the
> > > > > > > one
> > > > > > > > > operation that results in a single KTable.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Wed, Oct 23, 2019 at 9:58 PM Sophie Blee-Goldman <
> > > > > > > sop...@confluent.io
> > > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > >>> I can personally not see any need to add other
> > configuration
> > > > > > > > >> Famous last words?
> > > > > > > > >>
> > > > > > > > >> Just kidding, 95% confidence is more than enough to  me
> (and
> > > > > better
> > > > > > to
> > > > > > > > >> optimize for current
> > > > > > > > >> design than for hypothetical future changes).
> > > > > > > > >>
> > > > > > > > >> One last question I have then is about the
> > > > > > operator/store/repartition
> > > > > > > > >> naming -- seems like
> > > > > > > > >> we can name the underlying store/changelog through the
> > > > > Materialized
> > > > > > > > >> parameter, but do we
> > > > > > > > >> also want to include an overload taking a Named parameter
> > for
> > > the
> > > > > > > > operator
> > > > > > > > >> name (as in the
> > > > > > > > >> KTable#join variations)?
> > > > > > > > >>
> > > > > > > > >> On Wed, Oct 23, 2019 at 5:14 PM Matthias J. Sax <
> > > > > > > matth...@confluent.io>
> > > > > > > > >> wrote:
> > > > > > > > >>
> > > > > > > > >>> Interesting idea, Sophie.
> > > > > > > > >>>
> > > > > > > > >>> So far, we tried to reuse existing config objects and
> only
> > > add
> > > > > new
> > > > > > > ones
> > > > > > > > >>> when needed to avoid creating "redundant" classes. This
> is
> > of
> > > > > > course
> > > > > > > a
> > > > > > > > >>> reactive approach (with the drawback to deprecate stuff
> if
> > we
> > > > > > change
> > > > > > > > it,
> > > > > > > > >>> as you described).
> > > > > > > > >>>
> > > > > > > > >>> I can personally not see any need to add other
> > configuration
> > > > > > > parameters
> > > > > > > > >>> atm, so it's a 95% obvious "no" IMHO. The final
> > > `aggregate()` has
> > > > > > > only
> > > > > > > > a
> > > > > > > > >>> single state store that we need to configure, and reusing
> > > > > > > > `Materialized`
> > > > > > > > >>> seems to be appropriate.
> > > > > > > > >>>
> > > > > > > > >>> Also note, that the `Initializer` is a mandatory
> parameter
> > > and
> > > > > not
> > > > > > a
> > > > > > > > >>> configuration and should be passed directly, and not via
> a
> > > > > > > > configuration
> > > > > > > > >>> object.
> > > > > > > > >>>
> > > > > > > > >>>
> > > > > > > > >>> -Matthias
> > > > > > > > >>>
> > > > > > > > >>> On 10/23/19 11:37 AM, Sophie Blee-Goldman wrote:
> > > > > > > > >>>> Thanks for the explanation, makes sense to me! As for
> the
> > > API,
> > > > > one
> > > > > > > > >> other
> > > > > > > > >>>> thought I had is might we ever want or need to introduce
> > any
> > > > > other
> > > > > > > > >>> configs
> > > > > > > > >>>> or parameters in the future? Obviously that's difficult
> to
> > > say
> > > > > now
> > > > > > > (or
> > > > > > > > >>>> maybe the
> > > > > > > > >>>> answer seems obviously "no") but we seem to often end up
> > > needing
> > > > > > to
> > > > > > > > add
> > > > > > > > >>> new
> > > > > > > > >>>> overloads and/or deprecate old ones as new features or
> > > > > > requirements
> > > > > > > > >> come
> > > > > > > > >>>> into
> > > > > > > > >>>> play.
> > > > > > > > >>>>
> > > > > > > > >>>> What do you (and others?) think about wrapping the
> config
> > > > > > parameters
> > > > > > > > >> (ie
> > > > > > > > >>>> everything
> > > > > > > > >>>> except the actual grouped streams) in a new config
> object?
> > > For
> > > > > > > > example,
> > > > > > > > >>> the
> > > > > > > > >>>> CogroupedStream#aggregate field could take a single
> > > Cogrouped
> > > > > > > object,
> > > > > > > > >>>> which itself would have an initializer and a
> materialized.
> > > If we
> > > > > > > ever
> > > > > > > > >>> need
> > > > > > > > >>>> to add
> > > > > > > > >>>> a new parameter, we can just add it to the Cogrouped
> > class.
> > > > > > > > >>>>
> > > > > > > > >>>> Also, will the backing store be available for IQ if a
> > > > > Materialized
> > > > > > > is
> > > > > > > > >>>> passed in?
> > > > > > > > >>>>
> > > > > > > > >>>> On Wed, Oct 23, 2019 at 10:49 AM Walker Carlson <
> > > > > > > > wcarl...@confluent.io
> > > > > > > > >>>
> > > > > > > > >>>> wrote:
> > > > > > > > >>>>
> > > > > > > > >>>>> Hi Sophie,
> > > > > > > > >>>>>
> > > > > > > > >>>>> Thank you for your comments. As for the different
> methods
> > > > > > > signatures
> > > > > > > > I
> > > > > > > > >>> have
> > > > > > > > >>>>> not really considered any other options but  while I do
> > > agree
> > > > > it
> > > > > > is
> > > > > > > > >>>>> confusing, I don't see any obvious solutions. The
> problem
> > > is
> > > > > that
> > > > > > > the
> > > > > > > > >>>>> cogroup essentially pairs a group stream with an
> > > aggregator and
> > > > > > > when
> > > > > > > > >> it
> > > > > > > > >>> is
> > > > > > > > >>>>> first made the method is called on a groupedStream
> > already.
> > > > > > However
> > > > > > > > >> each
> > > > > > > > >>>>> subsequent stream-aggregator pair is added on to a
> > cogroup
> > > > > stream
> > > > > > > so
> > > > > > > > >> it
> > > > > > > > >>>>> needs both arguments.
> > > > > > > > >>>>>
> > > > > > > > >>>>> For the second question you should not need a joiner.
> The
> > > idea
> > > > > is
> > > > > > > > that
> > > > > > > > >>> you
> > > > > > > > >>>>> can collect many grouped streams with overlapping key
> > > spaces
> > > > > and
> > > > > > > any
> > > > > > > > >>> kind
> > > > > > > > >>>>> of value types. Once aggregated its value will be
> reduced
> > > into
> > > > > > one
> > > > > > > > >> type.
> > > > > > > > >>>>> This is why you need only one initializer. Each
> > aggregator
> > > will
> > > > > > > need
> > > > > > > > >> to
> > > > > > > > >>>>> integrate the new value with the new object made in the
> > > > > > > initializer.
> > > > > > > > >>>>> Does that make sense?
> > > > > > > > >>>>>
> > > > > > > > >>>>> This is a good question and I will include this
> > > explanation in
> > > > > > the
> > > > > > > > kip
> > > > > > > > >>> as
> > > > > > > > >>>>> well.
> > > > > > > > >>>>>
> > > > > > > > >>>>> Thanks,
> > > > > > > > >>>>> Walker
> > > > > > > > >>>>>
> > > > > > > > >>>>> On Tue, Oct 22, 2019 at 8:59 PM Sophie Blee-Goldman <
> > > > > > > > >>> sop...@confluent.io>
> > > > > > > > >>>>> wrote:
> > > > > > > > >>>>>
> > > > > > > > >>>>>> Hey Walker,
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> Thanks for the KIP! I have just a couple of questions:
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> 1) It seems a little awkward to me that with the
> current
> > > API,
> > > > > we
> > > > > > > > >> have a
> > > > > > > > >>>>>> nearly identical
> > > > > > > > >>>>>> "add stream to cogroup" method, except for the first
> > which
> > > > > has a
> > > > > > > > >>>>> different
> > > > > > > > >>>>>> signature
> > > > > > > > >>>>>> (ie the first stream is joined as
> > > stream.cogroup(Aggregator)
> > > > > > while
> > > > > > > > >> the
> > > > > > > > >>>>>> subsequent ones
> > > > > > > > >>>>>> are joined as .cogroup(Stream, Aggregator) ). I'm not
> > sure
> > > > > what
> > > > > > it
> > > > > > > > >>> would
> > > > > > > > >>>>>> look like exactly,
> > > > > > > > >>>>>> but I was just wondering if you'd considered and/or
> > > rejected
> > > > > any
> > > > > > > > >>>>>> alternative APIs?
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> 2) This might just be my lack of familiarity with
> > > "cogroup"
> > > > > as a
> > > > > > > > >>> concept,
> > > > > > > > >>>>>> but with the
> > > > > > > > >>>>>> current (non-optimal) API the user seems to have some
> > > control
> > > > > > over
> > > > > > > > >> how
> > > > > > > > >>>>>> exactly
> > > > > > > > >>>>>> the different streams are joined through the
> > ValueJoiners.
> > > > > Would
> > > > > > > > this
> > > > > > > > >>> new
> > > > > > > > >>>>>> cogroup
> > > > > > > > >>>>>> simply concatenate the values from the different
> cogroup
> > > > > > streams,
> > > > > > > or
> > > > > > > > >>>>> could
> > > > > > > > >>>>>> users
> > > > > > > > >>>>>> potentially pass some kind of Joiner to the
> > > cogroup/aggregate
> > > > > > > > >> methods?
> > > > > > > > >>>>> Or,
> > > > > > > > >>>>>> is the
> > > > > > > > >>>>>> whole point of cogroups that you no longer ever need
> to
> > > > > specify
> > > > > > a
> > > > > > > > >>> Joiner?
> > > > > > > > >>>>>> If so, you
> > > > > > > > >>>>>> should add a short line to the KIP explaining that for
> > > those
> > > > > of
> > > > > > us
> > > > > > > > >> who
> > > > > > > > >>>>>> aren't fluent
> > > > > > > > >>>>>> in cogroup semantics :)
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> Cheers,
> > > > > > > > >>>>>> Sophie
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> On Thu, Oct 17, 2019 at 3:06 PM Walker Carlson <
> > > > > > > > >> wcarl...@confluent.io>
> > > > > > > > >>>>>> wrote:
> > > > > > > > >>>>>>
> > > > > > > > >>>>>>> Good catch I updated that.
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> I have made a PR for this KIP
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> I then am splitting it into 3 parts, first cogroup
> for
> > a
> > > > > > > key-value
> > > > > > > > >>>>> store
> > > > > > > > >>>>>> (
> > > > > > > > >>>>>>> here <https://github.com/apache/kafka/pull/7538>),
> > then
> > > for
> > > > > a
> > > > > > > > >>>>>>> timeWindowedStore, and then a sessionWindowedStore +
> > > ensuring
> > > > > > > > >>>>>> partitioning.
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> Walker
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> On Tue, Oct 15, 2019 at 12:47 PM Matthias J. Sax <
> > > > > > > > >>>>> matth...@confluent.io>
> > > > > > > > >>>>>>> wrote:
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>>> Walker,
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> thanks for picking up the KIP and reworking it for
> the
> > > > > changed
> > > > > > > > API.
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> Overall, the updated API suggestions make sense to
> me.
> > > The
> > > > > > seem
> > > > > > > to
> > > > > > > > >>>>>> align
> > > > > > > > >>>>>>>> quite nicely with our current API design.
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> One nit: In `CogroupedKStream#aggregate(...)` the
> type
> > > > > > parameter
> > > > > > > > of
> > > > > > > > >>>>>>>> `Materialized` should be `V`, not `VR`?
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> -Matthias
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> On 10/14/19 2:57 PM, Walker Carlson wrote:
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>
> > > > > > > > >>>>>
> > > > > > > > >>>
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup
> > > > > > > > >>>>>>>>> here
> > > > > > > > >>>>>>>>> is a link
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> On Mon, Oct 14, 2019 at 2:52 PM Walker Carlson <
> > > > > > > > >>>>>> wcarl...@confluent.io>
> > > > > > > > >>>>>>>>> wrote:
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>>> Hello all,
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> I have picked up and updated KIP-150. Due to
> changes
> > > to
> > > > > the
> > > > > > > > >>>>> project
> > > > > > > > >>>>>>>> since
> > > > > > > > >>>>>>>>>> KIP #150 was written there are a few items that
> need
> > > to be
> > > > > > > > >>>>> updated.
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> First item that changed is the adoption of the
> > > > > Materialized
> > > > > > > > >>>>>> parameter.
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> The second item is the WindowedBy. How the old KIP
> > > handles
> > > > > > > > >>>>> windowing
> > > > > > > > >>>>>>> is
> > > > > > > > >>>>>>>>>> that it overloads the aggregate function to take
> in
> > a
> > > > > Window
> > > > > > > > >>>>> object
> > > > > > > > >>>>>> as
> > > > > > > > >>>>>>>> well
> > > > > > > > >>>>>>>>>> as the other parameters. The current practice to
> > > window
> > > > > > > > >>>>>>> grouped-streams
> > > > > > > > >>>>>>>> is
> > > > > > > > >>>>>>>>>> to call windowedBy and receive a windowed stream
> > > object.
> > > > > The
> > > > > > > > >>>>>> existing
> > > > > > > > >>>>>>>>>> interface for a windowed stream made from a
> grouped
> > > stream
> > > > > > > will
> > > > > > > > >>>>> not
> > > > > > > > >>>>>>> work
> > > > > > > > >>>>>>>>>> for cogrouped streams. Hence, we have to make new
> > > > > interfaces
> > > > > > > for
> > > > > > > > >>>>>>>> cogrouped
> > > > > > > > >>>>>>>>>> windowed streams.
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> Please take a look, I would like to hear your
> > > feedback,
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> Walker
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>
> > > > > > > > >>>>>
> > > > > > > > >>>>
> > > > > > > > >>>
> > > > > > > > >>>
> > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > >
> >
>
>
> --
> -- Guozhang
>

Reply via email to