Hi all, I just updated the KIP with option 1 as design and put option 2 and 3 in rejected alternatives. Since Matthias is strongly against `trigger`, I adopted the proposed `EmitStrategy` and dropped the "with" in the function name. So it's like this:
stream .groupBy(..) .windowedBy(..) .emitStrategy(EmitStrategy.onWindowClose()) .aggregate(..) .mapValues(..) I used `onWindowClose` since `EmitStrategy` is meant to be an interface. Hao On Wed, Mar 23, 2022 at 6:35 PM Matthias J. Sax <mj...@apache.org> wrote: > Wow. Quite a thread... #namingIsHard :D > > I won't repeat all arguments which are all very good ones. I can just > state my personal favorite option: > > stream > .groupBy(..) > .windowedBy(..) > .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE) > .aggregate(..) > .mapValues(..) > > Is seems to be the best compromise / trade-off across the board. > Personally, I would strong advocate against using `trigger()`! > > > -Matthias > > > On 3/23/22 4:38 PM, Guozhang Wang wrote: > > Hao is right, I think that's the hindsight we have for `suppress` which > > since can be applied anywhere for a K(windowed)Table, incurs an awkward > > programming flexibility and I felt it's better to make its application > > scope more constraint. > > > > And I also agree with John that, unless any of us feel strongly about any > > options, Hao could make the final call about the namings. > > > > > > Guozhang > > > > On Wed, Mar 23, 2022 at 1:49 PM Hao Li <h...@confluent.io.invalid> wrote: > > > >> For > >> > >> stream > >> .groupBy(..) > >> .windowedBy(..) > >> .aggregate(..) > >> .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE) > >> .mapValues(..) > >> > >> I think after `aggregate` it's already a table and then the emit > strategy > >> is too late to control > >> how windowed stream is outputted to table. This is the concern Guozhang > >> raised about having this in existing `suppress` operator as well. > >> > >> Thanks, > >> Hao > >> > >> On Wed, Mar 23, 2022 at 1:05 PM Bruno Cadonna <cado...@apache.org> > wrote: > >> > >>> Hi, > >>> > >>> Thank you for your answers to my questions! > >>> > >>> I see the argument about conciseness of configuring a stream with > >>> methods instead of config objects. I just miss a bit the descriptive > >>> aspect. > >>> > >>> What about > >>> > >>> stream > >>> .groupBy(..) > >>> .windowedBy(..) > >>> .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE) > >>> .aggregate(..) > >>> .mapValues(..) > >>> > >>> I have also another question. Why should emitting of results be > >>> controlled by the window level api? If I want to emit results for each > >>> input record the emit strategy is quite independent from the window. So > >>> I somehow share Matthias' and Guozhang's concern that the emit strategy > >>> seems misplaced there. > >>> > >>> What are the arguments against? > >>> > >>> stream > >>> .groupBy(..) > >>> .windowedBy(..) > >>> .aggregate(..) > >>> .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE) > >>> .mapValues(..) > >>> > >>> > >>> A final administrative request: Hao, could you please add the rejected > >>> alternatives to the KIP so that future us will know why we rejected > them? > >>> > >>> Best, > >>> Bruno > >>> > >>> On 23.03.22 19:38, John Roesler wrote: > >>>> Hi all, > >>>> > >>>> I can see both sides of this. > >>>> > >>>> On one hand, when we say > >>>> "stream.groupBy().windowBy().count()", it seems like we're > >>>> telling KS to take the raw stream, group it based on key, > >>>> then window it based on time, and then compute an > >>>> aggregation on the windows. In that model, "trigger()" would > >>>> have to mean something like "trigger it", which doesn't > >>>> really make sense, since we aren't "triggering" the > >>>> aggregation (then again, to an outside observer, it would > >>>> appear that way... food for thought). > >>>> > >>>> Another way to look at it is that all we're really doing is > >>>> configuring a windowed aggreation on the stream, and we're > >>>> doing it with a progressive builder interface. In other > >>>> words, the above is just a progressive builder for > >>>> configuring an operation like > >>>> "stream.aggregate(groupingConfig, windowingConfig, > >>>> countFn)". Under the latter interpretation of the DSL, it > >>>> makes perfect sense to add more optional progressive builder > >>>> methods like trigger() to the WindowedKStream interfaces. > >>>> > >>>> Since part of the motivation for choosing the word "trigger" > >>>> here is to stay close to what Flink defines, I'll also point > >>>> out that Flink's syntax is also > >>>> "stream.keyBy().window().trigger().aggregate()". Not that > >>>> their API is the holy grail or anything, but it's at least > >>>> an indication that this API isn't a horrible mistake. > >>>> > >>>> All other things being equal, I also prefer to leave tie- > >>>> breakers in the hands of the contributor. So, if we've all > >>>> said our piece and Hao still prefers option 1, then (as long > >>>> as we don't think it's a horrible mistake), I think we > >>>> should just let him go for it. > >>>> > >>>> Speaking of which, after reviewing the responses regarding > >>>> deprecating `Suppressed#onWindowClose`, I still think we > >>>> should just go ahead and deprecate it. Although it's not > >>>> expressed exactly the same way, it still does exactly the > >>>> same thing, or so close that it seems confusing to keep > >>>> both. But again, if Hao really prefers to keep both, I won't > >>>> insist on it :) > >>>> > >>>> Thanks all, > >>>> -John > >>>> > >>>> On Wed, 2022-03-23 at 09:59 -0700, Hao Li wrote: > >>>>> Thanks Bruno! > >>>>> > >>>>> Argument for option 1 is: > >>>>> 1. Concise and descriptive. It avoids overloading existing functions > >> and > >>>>> it's very clear what it's doing. Imagine if there's a autocomplete > >>> feature > >>>>> in Intellij or other IDE for our DSL in the future, it's not > favorable > >>> to > >>>>> show 6 `windowedBy` functions. > >>>>> 2. Option 1 is operated on `windowedStream` to configure how it > should > >>> be > >>>>> outputted. Option 2 operates on `KGroupedStream` to produce > >>>>> `windowedStream` as well as configure how `windowedStream` should be > >>>>> outputted. I feel it's better to have a `windowedStream` and > then > >>>>> configure how it can be outputted. Somehow I feel option 2 breaks the > >>>>> builder pattern. > >>>>> 3. `WindowedByParameters` doesn't seem very descriptive. If we put > all > >>>>> kinds of different parameters into it to avoid future overloading, > >> it's > >>> too > >>>>> bloated and not very user friendly. > >>>>> > >>>>> I agree option 1's `trigger` function is configuring the stream which > >>> feels > >>>>> different from existing `count` or `aggregate` etc. Configuring might > >> be > >>>>> also a kind of action to stream :) I'm not sure if it breaks DSL > >>> principle > >>>>> and if it does, > >>>>> can we relax the principle given the benefits compared to option 2)? > >>> Maybe > >>>>> John can chime in as the DSL grammar author. > >>>>> > >>>>> Thanks, > >>>>> Hao > >>>>> > >>>>> On Wed, Mar 23, 2022 at 2:59 AM Bruno Cadonna <cado...@apache.org> > >>> wrote: > >>>>> > >>>>>> Hi Hao, > >>>>>> > >>>>>> I agree with Guozhang: Great summary! Thank you! > >>>>>> > >>>>>> Regarding "aligned with other config class names", there is this DSL > >>>>>> grammar John once specified > >>>>>> > >>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar > >>>>>> and we have already used it in the code. I found the grammar quite > >>> useful. > >>>>>> > >>>>>> I am undecided if option 1 is really worth it. What are actually the > >>>>>> arguments in favor of it? Is it only that we do not need to overload > >>>>>> other methods? This does not seem worth to break DSL principles. An > >>>>>> alternative proposal would be to go with option 2 and conform with > >> the > >>>>>> grammar above: > >>>>>> > >>>>>> <W extends Window> TimeWindowedKStream<K, V> windowedBy(final > >>> Windows<W> > >>>>>> windows, WindowedByParameters parameters); > >>>>>> > >>>>>> TimeWindowedKStream<K, V> windowedBy(final SlidingWindows windows, > >>>>>> WindowedByParameters parameters); > >>>>>> > >>>>>> SessionWindowedKStream<K, V> windowedBy(final SessionWindows > windows, > >>>>>> WindowedByParameters parameters); > >>>>>> > >>>>>> This is similar to option 2 in the KIP, but it ensures that we put > >> all > >>>>>> future needed configs in the parameters object and we do not need to > >>>>>> overload the methods anymore. > >>>>>> > >>>>>> Then if we also get KAFKA-10298 done, we could even collapse all > >>>>>> `windowedBy()` methods into one. > >>>>>> > >>>>>> Best, > >>>>>> Bruno > >>>>>> > >>>>>> On 22.03.22 22:31, Guozhang Wang wrote: > >>>>>>> Thanks for the great summary Hao. I'm still learning towards option > >> 2) > >>>>>>> here, and I'm in favor of `trigger` as function name, and > >> `Triggered` > >>> as > >>>>>>> config class name (mainly to be aligned with other config class > >>> names). > >>>>>>> Also want to see other's preferences between the options, as well > as > >>> the > >>>>>>> namings. > >>>>>>> > >>>>>>> > >>>>>>> Guozhang > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On Tue, Mar 22, 2022 at 12:23 PM Hao Li <h...@confluent.io.invalid> > >>>>>> wrote: > >>>>>>> > >>>>>>>> `windowedStream.onWindowClose()` was the original option 1 > >>>>>>>> (`windowedStream.emitFinal()`) but was rejected > >>>>>>>> because we could add more emit types and this will result in > adding > >>> more > >>>>>>>> functions. I still prefer the > >>>>>>>> "windowedStream.someFunc(Controlled.onWindowClose)" > >>>>>>>> model since it's flexible and clear that it's configuring the emit > >>>>>> policy. > >>>>>>>> Let me summarize all the naming options we have and compare: > >>>>>>>> > >>>>>>>> *API function name:* > >>>>>>>> > >>>>>>>> *1. `windowedStream.trigger()`* > >>>>>>>> Pros: > >>>>>>>> i. Simple > >>>>>>>> ii. Similar to Flink's trigger function (is this a con > >>>>>> actually?) > >>>>>>>> Cons: > >>>>>>>> i. `trigger()` can be confused with Flink trigger > (raised > >> by > >>>>>> John) > >>>>>>>> ii. `trigger()` feels like an operation instead of a > >>> configure > >>>>>>>> function (raised by Bruno)? > >>>>>>>> > >>>>>>>> *2. `windowedStream.emitTrigger()`* > >>>>>>>> Pros: > >>>>>>>> i. Avoid confusion from Flink's trigger API > >>>>>>>> ii. `emitTrigger` feels like configuring the trigger > >> because > >>>>>>>> "trigger" here is a noun instead of verbose in `trigger()` > >>>>>>>> Cons: > >>>>>>>> i: Verbose? > >>>>>>>> ii: Not consistent with `Suppressed.untilWindowClose`? > >>>>>>>> > >>>>>>>> > >>>>>>>> *Config class/object name:* > >>>>>>>> > >>>>>>>> 1. *`Emitted.onWindowClose()`* and *`Emitted.onEachUpdate()`* > >>>>>>>> Cons: > >>>>>>>> i. Doesn't go along with `trigger` (raised by Bruno) > >>>>>>>> > >>>>>>>> 2. *`Triggered.onWindowClose()`* and *`Triggered.onEachUpdate()`* > >>>>>>>> > >>>>>>>> 3. *`EmitTrigger.onWindowClose()`* and > >> *`EmitTrigger.onEachUpdate()`* > >>>>>>>> > >>>>>>>> 4. *`(Emit|Trigger)(Config|Policy).onWindowClose()`* and > >>>>>>>> *`(Emit|Trigger)(Config|Policy).onEachUpdate()`* > >>>>>>>> This is a combination of different names like: > `EmitConfig`, > >>>>>>>> `EmitPolicy`, `TriggerConfig` and `TriggerPolicy`... > >>>>>>>> > >>>>>>>> > >>>>>>>> If we are settled with option 1), we can add new options to these > >>> names > >>>>>> and > >>>>>>>> comment on their Pros and Cons. > >>>>>>>> > >>>>>>>> Hao > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> On Tue, Mar 22, 2022 at 10:48 AM Guozhang Wang < > wangg...@gmail.com > >>> > >>>>>> wrote: > >>>>>>>> > >>>>>>>>> I see what you mean now, and I think it's a fair point that > >>> composing > >>>>>>>>> `trigger` and `emitted` seems awkward. > >>>>>>>>> > >>>>>>>>> Re: data process operator v.s. control operator, I shared your > >>> concern > >>>>>> as > >>>>>>>>> well, and here's my train of thoughts: Having only data process > >>>>>> operators > >>>>>>>>> was my primary motivation for how we add the suppress operator > --- > >>> it > >>>>>>>>> indeed "suppresses" data. But as a hind-sight it's disadvantage > is > >>>>>> that, > >>>>>>>>> for example in Suppressed.onWindowClose() should be only related > >> to > >>> an > >>>>>>>>> earlier windowedBy operator which is possibly very far from it in > >>> the > >>>>>>>>> resulting DSL code. It's not only a bit awkward for users to > write > >>> such > >>>>>>>>> code, but also in such cases the DSL builder needs to maintain > and > >>>>>>>>> propagate this information to the suppress operator further down. > >>> So we > >>>>>>>> are > >>>>>>>>> now thinking about "putting the control object as close as to > >> where > >>> the > >>>>>>>>> related processor really happens". And in that world my original > >>>>>>>>> preference was somewhere in option 2), i.e. just put the control > >> as > >>> a > >>>>>>>> param > >>>>>>>>> of the related "windowedBy" operator, but the trade-off is we > keep > >>>>>> adding > >>>>>>>>> overloaded functions to these operators. So after some back and > >>> forth > >>>>>>>>> thoughts I'm learning towards relaxing our principles to only > have > >>>>>>>>> processing operators but no flow-control operators. That being > >>> said, if > >>>>>>>> you > >>>>>>>>> have any ideas that we can have both world's benefits I'm all > >> ears. > >>>>>>>>> > >>>>>>>>> Re: using a direct function like "windowedStream.onWindowClose()" > >>> v.s. > >>>>>>>>> "windowedStream.someFunc(Controlled.onWindowClose)", again my > >>>>>> motivation > >>>>>>>>> for the latter is for extensibility without adding more functions > >> in > >>>>>> the > >>>>>>>>> future. If people feel this is not worthy we can do the first > >>> option as > >>>>>>>>> well. If we just feel the `trigger` and `emitted` does not feel > >>>>>>>> composible > >>>>>>>>> together, maybe we can consider something like > >>>>>>>>> `windowedStream.trigger(Triggered.onWindowClose())"? > >>>>>>>>> > >>>>>>>>> Re: windowedBy v.s. windowBy, yeah I do not really have a good > >>> reason > >>>>>> why > >>>>>>>>> we should use past term as well :P But if it's not bothering > >> people > >>>>>> much > >>>>>>>>> I'd say we just keep it than deprecate/rename new APIs. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Tue, Mar 22, 2022 at 9:42 AM Bruno Cadonna < > cado...@apache.org > >>> > >>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi Guozhang, > >>>>>>>>>> > >>>>>>>>>> There is no semantic difference. It is a cosmetic difference. > >>>>>>>>>> Conceptually, I relate `Emitted` with the aggregation and not > >> with > >>>>>>>>>> `trigger()` in the API flow, because the aggregation emits the > >>> result > >>>>>>>>>> not `trigger()`. Therefore, I proposed to not use `Emitted` as > >> the > >>>>>> name > >>>>>>>>>> of the config object passed to `trigger()`. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Best, > >>>>>>>>>> Bruno > >>>>>>>>>> > >>>>>>>>>> On 22.03.22 17:24, Guozhang Wang wrote: > >>>>>>>>>>> Hi Bruno, > >>>>>>>>>>> > >>>>>>>>>>> Could you elaborate a bit more here, what's the semantic > >>> difference > >>>>>>>>>> between > >>>>>>>>>>> "the aggregation is triggered on window close and all > >> aggregation > >>>>>>>>> results > >>>>>>>>>>> are emitted." for trigger(TriggerParameters.onWindowClose()), > >> and > >>>>>>>> "the > >>>>>>>>>>> aggregation is configured to only emit final results." for > >>>>>>>>>>> trigger(Emitted.onWindowClose())? > >>>>>>>>>>> > >>>>>>>>>>> On Tue, Mar 22, 2022 at 4:19 AM Bruno Cadonna < > >> cado...@apache.org > >>>> > >>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Hi Hao, > >>>>>>>>>>>> > >>>>>>>>>>>> Thank you for the KIP! > >>>>>>>>>>>> > >>>>>>>>>>>> Regarding option 1, I would not use `Emitted.onWindowClose()` > >>> since > >>>>>>>>> that > >>>>>>>>>>>> does not seem compatible with the proposed flow. Conceptually, > >>> now > >>>>>>>> the > >>>>>>>>>>>> flow states that the aggregation is triggered on window close > >> and > >>>>>>>> all > >>>>>>>>>>>> aggregation results are emitted. `Emitted` suggests that the > >>>>>>>>> aggregation > >>>>>>>>>>>> is configured to only emit final results. > >>>>>>>>>>>> > >>>>>>>>>>>> Thus, I propose the following: > >>>>>>>>>>>> > >>>>>>>>>>>> stream > >>>>>>>>>>>> .groupBy(..) > >>>>>>>>>>>> .windowedBy(..) > >>>>>>>>>>>> .trigger(TriggerParameters.onWindowClose()) > >>>>>>>>>>>> .aggregate(..) //result in a KTable<Windowed<..>> > >>>>>>>>>>>> .mapValues(..) > >>>>>>>>>>>> > >>>>>>>>>>>> An alternative to `trigger()` could be `schedule()`, but I do > >> not > >>>>>>>>> really > >>>>>>>>>>>> like it. > >>>>>>>>>>>> > >>>>>>>>>>>> One thing I noticed with option 1 is that all other methods in > >>> the > >>>>>>>>>>>> example above are operations on data. `groupBy()` groups, > >>>>>>>>> `windowedBy()` > >>>>>>>>>>>> partitions, `aggregate()` computes the aggregate, > `mapValues()` > >>> maps > >>>>>>>>>>>> values, even `suppress()` suppresses intermediate results. But > >>> what > >>>>>>>>> does > >>>>>>>>>>>> `trigger()` do? `trigger()` seems a config lost among > >> operations. > >>>>>>>>>>>> > >>>>>>>>>>>> However, if we do not want to restrict ourselves to only use > >>> methods > >>>>>>>>>>>> when we want to specify operations on data, I have the > >> following > >>>>>>>>>> proposal: > >>>>>>>>>>>> > >>>>>>>>>>>> stream > >>>>>>>>>>>> .groupBy(..) > >>>>>>>>>>>> .windowedBy(..) > >>>>>>>>>>>> .onWindowClose() > >>>>>>>>>>>> .aggregate(..) //result in a KTable<Windowed<..>> > >>>>>>>>>>>> .mapValues(..) > >>>>>>>>>>>> > >>>>>>>>>>>> Best, > >>>>>>>>>>>> Bruno > >>>>>>>>>>>> > >>>>>>>>>>>> P.S.: Why is it `windowedBy()` and not `windowBy()`? All other > >>>>>>>>>>>> operations also use present tense. > >>>>>>>>>>>> > >>>>>>>>>>>> On 22.03.22 06:36, Hao Li wrote: > >>>>>>>>>>>>> Hi John, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Yes. For naming, `trigger` is similar to Flink's trigger, but > >> it > >>>>>>>> has > >>>>>>>>> a > >>>>>>>>>>>>> different meaning in our case. `emit` sounds like an action > to > >>>>>>>> emit? > >>>>>>>>>> How > >>>>>>>>>>>>> about `emitTrigger`? I'm open to suggestions for the naming. > >>>>>>>>>>>>> > >>>>>>>>>>>>> For deprecating `Suppressed.untilWindowClose`, I agree with > >>>>>>>> Guozhang > >>>>>>>>> we > >>>>>>>>>>>> can > >>>>>>>>>>>>> deprecate `Suppressed` config as a whole later. Or we can > >>> deprecate > >>>>>>>>>>>>> `Suppressed.untilWindowClose` in later KIP after > >> implementation > >>> of > >>>>>>>>> emit > >>>>>>>>>>>>> final is done. > >>>>>>>>>>>>> > >>>>>>>>>>>>> BTW, isn't > >>>>>>>>>>>>> > >>>>>>>>>>>>> stream > >>>>>>>>>>>>> .groupBy(..) > >>>>>>>>>>>>> .windowBy(..) > >>>>>>>>>>>>> .aggregate(..) //result in a KTable<Windowed<..>> > >>>>>>>>>>>>> .mapValues(..) > >>>>>>>>>>>>> .suppress(Suppressed.untilWindowClose) // since we can > >>> trace > >>>>>>>> back > >>>>>>>>>> to > >>>>>>>>>>>>> parent node, to find a window definition > >>>>>>>>>>>>> > >>>>>>>>>>>>> same as > >>>>>>>>>>>>> > >>>>>>>>>>>>> stream > >>>>>>>>>>>>> .groupBy(..) > >>>>>>>>>>>>> .windowBy(..) > >>>>>>>>>>>>> .trigger(Emitted.onWindowClose) > >>>>>>>>>>>>> .aggregate(..) //result in a KTable<Windowed<..>> > >>>>>>>>>>>>> .mapValues(..) > >>>>>>>>>>>>> ? > >>>>>>>>>>>>> > >>>>>>>>>>>>> Hao > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Mon, Mar 21, 2022 at 7:28 PM Guozhang Wang < > >>> wangg...@gmail.com> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> I think the following case is only doable via `suppress`: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> stream > >>>>>>>>>>>>>> .groupBy(..) > >>>>>>>>>>>>>> .windowBy(..) > >>>>>>>>>>>>>> .aggregate(..) //result in a KTable<Windowed<..>> > >>>>>>>>>>>>>> .mapValues(..) > >>>>>>>>>>>>>> .suppress(Suppressed.untilWindowClose) // since we > can > >>> trace > >>>>>>>>> back > >>>>>>>>>> to > >>>>>>>>>>>>>> parent node, to find a window definition > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Guozhang > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Mon, Mar 21, 2022 at 6:36 PM John Roesler < > >>> vvcep...@apache.org > >>>>>>>>> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks, Guozhang! > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> To clarify, I was asking specifically about deprecating > just > >>> the > >>>>>>>>>> method > >>>>>>>>>>>>>>> ‘untilWindowClose’. I might not be thinking clearly about > >> it, > >>>>>>>>> though. > >>>>>>>>>>>>>> What > >>>>>>>>>>>>>>> does untilWindowClose do that this KIP doesn’t cover? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>> John > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Mon, Mar 21, 2022, at 20:31, Guozhang Wang wrote: > >>>>>>>>>>>>>>>> Just my 2c: Suppressed is in `suppress` whose application > >>> scope > >>>>>>>> is > >>>>>>>>>>>> much > >>>>>>>>>>>>>>>> larger and hence more flexible. I.e. it can be used > >> anywhere > >>>>>>>> for a > >>>>>>>>>>>>>>> `KTable` > >>>>>>>>>>>>>>>> (but internally we would check whether certain emit > >> policies > >>>>>>>> like > >>>>>>>>>>>>>>>> `untilWindowClose` is valid or not), whereas `trigger` as > >> for > >>>>>>>> now > >>>>>>>>> is > >>>>>>>>>>>>>> only > >>>>>>>>>>>>>>>> applicable in XXWindowedKStream. So I think it would not > be > >>>>>>>>>> completely > >>>>>>>>>>>>>>>> replacing Suppressed.untilWindowClose. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> In the future, personally I'd still want to keep one > >> control > >>>>>>>>> object > >>>>>>>>>>>>>> still > >>>>>>>>>>>>>>>> for all emit policies, and maybe if we have extended > >> Emitted > >>> for > >>>>>>>>>> other > >>>>>>>>>>>>>>>> emitting policies covered by Suppressed today, we can > >>> discuss if > >>>>>>>>> we > >>>>>>>>>>>>>> could > >>>>>>>>>>>>>>>> have `KTable.suppress(Emitted..)` replacing > >>>>>>>>>>>>>>> `KTable.suppress(Suppressed..)` > >>>>>>>>>>>>>>>> as a whole, but for this KIP I think it's too early. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Guozhang > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Mon, Mar 21, 2022 at 6:18 PM John Roesler < > >>>>>>>> vvcep...@apache.org > >>>>>>>>>> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thanks for the Kip, Hao! > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> For what it’s worth, I’m also in favor of your latest > >>> framing > >>>>>>>> of > >>>>>>>>>> the > >>>>>>>>>>>>>>> API, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> I think the name is fine. I assume it’s inspired by > Flink? > >>> It’s > >>>>>>>>> not > >>>>>>>>>>>>>>>>> identical to the concept of a trigger in Flink, which > >>> specifies > >>>>>>>>>> when > >>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>> evaluate the window, which might be confusing to some > >> people > >>>>>>>> who > >>>>>>>>>> have > >>>>>>>>>>>>>>> deep > >>>>>>>>>>>>>>>>> experience with Flink. Then again, it seems close enough > >>> that > >>>>>>>> it > >>>>>>>>>>>>>> should > >>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>> clear to casual Flink users. For people with no other > >> stream > >>>>>>>>>>>>>> processing > >>>>>>>>>>>>>>>>> experience, it might seem a bit esoteric compared to > >>> something > >>>>>>>>>>>>>>>>> self-documenting like ‘emit()’, but the docs should make > >> it > >>>>>>>>> clear. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> One small question: it seems like this proposal is > >>> identical to > >>>>>>>>>>>>>>>>> Suppressed.untilWindowClose, and the KIP states that this > >>> API > >>>>>>>> is > >>>>>>>>>>>>>>> superior. > >>>>>>>>>>>>>>>>> In that case, should we deprecate > >>> Suppressed.untilWindowClose? > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>> John > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Mon, Mar 21, 2022, at 19:30, Guozhang Wang wrote: > >>>>>>>>>>>>>>>>>> Hi Hao, > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> For 2), I think it's a good idea in general to use a > >>> separate > >>>>>>>>>>>>>>> function on > >>>>>>>>>>>>>>>>>> the Time/SessionWindowedKStream itself, to achieve the > >> same > >>>>>>>>> effect > >>>>>>>>>>>>>>> that, > >>>>>>>>>>>>>>>>>> for now, the emitting control is only for windowed > >>>>>>>> aggregations > >>>>>>>>> as > >>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>> KIP, than overloading existing functions. We can discuss > >>>>>>>> further > >>>>>>>>>>>>>> about > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> actual function names, whether others like the name > >>> `trigger` > >>>>>>>> or > >>>>>>>>>>>>>> not. > >>>>>>>>>>>>>>> As > >>>>>>>>>>>>>>>>>> for myself I feel `trigger` is a good one but I'd like > to > >>> see > >>>>>>>> if > >>>>>>>>>>>>>>> others > >>>>>>>>>>>>>>>>>> have opinions as well. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Guozhang > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On Mon, Mar 21, 2022 at 5:18 PM Hao Li > >>>>>>>> <h...@confluent.io.invalid > >>>>>>>>>> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Hi Guozhang, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Thanks for the feedback. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 1. I agree to have an `Emitted` control class and two > >>> static > >>>>>>>>>>>>>>>>> constructors > >>>>>>>>>>>>>>>>>>> named `onWindowClose` and `onEachUpdate`. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 2. For the API function changes, I'm thinking of adding > >> a > >>> new > >>>>>>>>>>>>>>> function > >>>>>>>>>>>>>>>>>>> called `trigger` to `TimeWindowedKStream` and > >>>>>>>>>>>>>>> `SessionWindowedKStream`. > >>>>>>>>>>>>>>>>> It > >>>>>>>>>>>>>>>>>>> takes `Emitted` config and returns the same stream. > >>> Example: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> stream > >>>>>>>>>>>>>>>>>>> .groupBy(...) > >>>>>>>>>>>>>>>>>>> .windowedBy(...) > >>>>>>>>>>>>>>>>>>> .trigger(Emitted.onWindowClose). // N > >>>>>>>>>>>>>>>>>>> .count() > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> The benefits are: > >>>>>>>>>>>>>>>>>>> 1. It's simple and avoids creating overloading > of > >>>>>>>> existing > >>>>>>>>>>>>>>> functions > >>>>>>>>>>>>>>>>> like > >>>>>>>>>>>>>>>>>>> `windowedBy` or `count`, `reduce` or `aggregate`. In > >>> fact, to > >>>>>>>>> add > >>>>>>>>>>>>>> it > >>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>> `aggregate` functions, we need to add it to all > existing > >>>>>>>>> `count`, > >>>>>>>>>>>>>>>>>>> `aggregate` overloading functions which is a lot. > >>>>>>>>>>>>>>>>>>> 2. It operates directly on windowed kstream and > >>> tells > >>>>>> how > >>>>>>>>> its > >>>>>>>>>>>>>>> output > >>>>>>>>>>>>>>>>>>> should be configured, if later we need to add this > other > >>> type > >>>>>>>>> of > >>>>>>>>>>>>>>>>> streams, > >>>>>>>>>>>>>>>>>>> we can reuse same `trigger` API whereas other type of > >>>>>>>>>>>>>> streams/tables > >>>>>>>>>>>>>>> may > >>>>>>>>>>>>>>>>>>> not have `aggregate`, `windowedby` api to make it > >>> consistent. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Hao > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On Sat, Mar 19, 2022 at 5:40 PM Guozhang Wang < > >>>>>>>>>> wangg...@gmail.com> > >>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Hello Hao, > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> I'm preferring option 2 over the other options mainly > >>>>>>>> because > >>>>>>>>>> the > >>>>>>>>>>>>>>>>> added > >>>>>>>>>>>>>>>>>>>> config object could potentially be used in other > >>> operators > >>>>>>>> as > >>>>>>>>>>>>>> well > >>>>>>>>>>>>>>>>> (not > >>>>>>>>>>>>>>>>>>>> necessarily has to be a windowed operator and hence > >> have > >>> to > >>>>>>>> be > >>>>>>>>>>>>>>>>>>> piggy-backed > >>>>>>>>>>>>>>>>>>>> on `windowedBy`, and that's also why I suggested not > >>> naming > >>>>>>>> it > >>>>>>>>>>>>>>>>>>>> `WindowConfig` but just `EmitConfig`). > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> As for Matthias' question, I think the difference > >> between > >>>>>>>> the > >>>>>>>>>>>>>>> windowed > >>>>>>>>>>>>>>>>>>>> aggregate operator and the stream-stream join operator > >> is > >>>>>>>>> that, > >>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> latter we think emit-final should be the only right > >>> emitting > >>>>>>>>>>>>>> policy > >>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>> hence we should not let users to configure it. If > users > >>>>>>>>>> configure > >>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>> e.g. emit eager they may get the old spurious emitting > >>>>>>>>> behavior > >>>>>>>>>>>>>>> which > >>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>> violating the semantics. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> For option 2) itself, I have a few more thoughts: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 1. Thinking about Matthias' suggestions, I'm also > >>> leaning a > >>>>>>>>> bit > >>>>>>>>>>>>>>>>>>>> towards adding the new param in the overloaded > >>> `aggregate`, > >>>>>>>>> than > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> overloaded `windowBy` function. The reason is that the > >>>>>>>>> emitting > >>>>>>>>>>>>>>> logic > >>>>>>>>>>>>>>>>>>> could > >>>>>>>>>>>>>>>>>>>> be either window based or non-window based, in the > long > >>> run. > >>>>>>>>>>>>>> Though > >>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>> this KIP we could just add it in > >>>>>>>>>>>>>> `XXXWindowedKStream.aggregate()`, > >>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>> may > >>>>>>>>>>>>>>>>>>>> want to extend to other non-windowed operators in the > >>>>>>>> future. > >>>>>>>>>>>>>>>>>>>> 2. To be consistent with other control class names, I > >>> feel > >>>>>>>>> maybe > >>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>>>>> name it "Emitted", not "EmitConfig". > >>>>>>>>>>>>>>>>>>>> 3. Following the first comment, I think we can have > the > >>>>>>>> static > >>>>>>>>>>>>>>>>>>> constructor > >>>>>>>>>>>>>>>>>>>> names as "onWindowClose" and "onEachUpdate". > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> The resulted code pattern would be like this: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> stream > >>>>>>>>>>>>>>>>>>>> .groupBy(..) > >>>>>>>>>>>>>>>>>>>> .windowBy(TimeWindow..) > >>>>>>>>>>>>>>>>>>>> .count(Emitted.onWindowClose) > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> WDYT? > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> On Wed, Mar 16, 2022 at 12:07 PM Matthias J. Sax < > >>>>>>>>>>>>>> mj...@apache.org > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> `allowedLateness` may not be a good name. What I > >> have > >>> in > >>>>>>>>>>>>>> mind > >>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>> use > >>>>>>>>>>>>>>>>>>>>>>> this to control how frequently we try to emit final > >>>>>>>>> results. > >>>>>>>>>>>>>>>>> Maybe > >>>>>>>>>>>>>>>>>>>> it's > >>>>>>>>>>>>>>>>>>>>>>> more flexible to be used as config in properties as > >> we > >>>>>>>>> don't > >>>>>>>>>>>>>>>>> need to > >>>>>>>>>>>>>>>>>>>>>>> recompile DSL to change it. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> I see; making it a config seems better. Frankly, I am > >>> not > >>>>>>>>> even > >>>>>>>>>>>>>>> sure > >>>>>>>>>>>>>>>>> if > >>>>>>>>>>>>>>>>>>>>> we need a config at all or if we can just hard code > >> it? > >>> For > >>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>> stream-stream join left/outer join fix, there is only > >> an > >>>>>>>>>>>>>> internal > >>>>>>>>>>>>>>>>>>> config > >>>>>>>>>>>>>>>>>>>>> but no public config either. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Option 1: Your proposal is? > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> stream > >>>>>>>>>>>>>>>>>>>>> .groupByKey() > >>>>>>>>>>>>>>>>>>>>> .windowBy(TimeWindow.ofSizeNoGrace(...)) > >>>>>>>>>>>>>>>>>>>>> .configure(EmitConfig.emitFinal() > >>>>>>>>>>>>>>>>>>>>> .count() > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Does not change my argument that it seems to be > >> misplace > >>>>>>>> from > >>>>>>>>>>>>>> an > >>>>>>>>>>>>>>> API > >>>>>>>>>>>>>>>>>>>>> flow POV. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Option 1 seems to be the least desirable to me. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> For option 2 and 3, and not sure which one I like > >>> better. > >>>>>>>>> Might > >>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>> good > >>>>>>>>>>>>>>>>>>>>> if other could chime in, too. I think I slightly > >> prefer > >>>>>>>>> option > >>>>>>>>>>>>>> 2 > >>>>>>>>>>>>>>>>> over > >>>>>>>>>>>>>>>>>>>>> option 3. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> On 3/15/22 5:33 PM, Hao Li wrote: > >>>>>>>>>>>>>>>>>>>>>> Thanks for the feedback Matthias. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> `allowedLateness` may not be a good name. What I > have > >>> in > >>>>>>>>> mind > >>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>> use > >>>>>>>>>>>>>>>>>>>>>> this to control how frequently we try to emit final > >>>>>>>> results. > >>>>>>>>>>>>>>> Maybe > >>>>>>>>>>>>>>>>>>> it's > >>>>>>>>>>>>>>>>>>>>>> more flexible to be used as config in properties as > >> we > >>>>>>>> don't > >>>>>>>>>>>>>>> need > >>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>> recompile DSL to change it. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> For option 1, I intend to use `emitFinal` to > >> configure > >>> how > >>>>>>>>>>>>>>>>>>>>>> `TimeWindowedKStream` should be outputted to > `KTable` > >>>>>>>> after > >>>>>>>>>>>>>>>>>>>> aggregation. > >>>>>>>>>>>>>>>>>>>>>> But `emitFinal` is not an action to the > >>>>>>>>> `TimeWindowedKStream` > >>>>>>>>>>>>>>>>>>>> interface. > >>>>>>>>>>>>>>>>>>>>>> Maybe adding `configure(EmitConfig config)` makes > >> more > >>>>>>>>> sense? > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> For option 2, config can be created using > >>>>>>>>>>>>>>>>> `WindowConfig.emitFinal()` > >>>>>>>>>>>>>>>>>>> or > >>>>>>>>>>>>>>>>>>>>>> `EmitConfig.emitFinal` > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> For option 3, it will be something like > >>> `TimeWindows(..., > >>>>>>>>>>>>>>>>> EmitConfig > >>>>>>>>>>>>>>>>>>>>>> emitConfig)`. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> For putting `EmitConfig` in aggregation operator, I > >>> think > >>>>>>>> it > >>>>>>>>>>>>>>>>> doesn't > >>>>>>>>>>>>>>>>>>>>>> control how we do aggregation but how we output to > >>>>>>>> `KTable`. > >>>>>>>>>>>>>>>>> That's > >>>>>>>>>>>>>>>>>>>> why I > >>>>>>>>>>>>>>>>>>>>>> feel option 1 makes more sense as it applies to > >>>>>>>>>>>>>>>>>>> `TimeWindowedKStream`. > >>>>>>>>>>>>>>>>>>>>> But > >>>>>>>>>>>>>>>>>>>>>> I'm also OK with option 2. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Hao > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> On Tue, Mar 15, 2022 at 4:48 PM Matthias J. Sax < > >>>>>>>>>>>>>>> mj...@apache.org > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> A general comment: it seem that we won't need any > >> new > >>>>>>>>>>>>>>>>>>>> `allowedLateness` > >>>>>>>>>>>>>>>>>>>>>>> parameter because the grace-period is defined on > the > >>>>>>>> window > >>>>>>>>>>>>>>>>> itself > >>>>>>>>>>>>>>>>>>>>> already? > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> (On the other hand, if I think about it once more, > >>> maybe > >>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> `grace-period` is actually not a property of the > >>> window > >>>>>>>> but > >>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>> property > >>>>>>>>>>>>>>>>>>>>>>> of the aggregation operator? _thinking_) > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> From an API flow point of view, option 1 > might > >>> not > >>>>>> be > >>>>>>>>>>>>>>> desirable > >>>>>>>>>>>>>>>>>>>> IMHO: > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> stream > >>>>>>>>>>>>>>>>>>>>>>> .groupByKey() > >>>>>>>>>>>>>>>>>>>>>>> .windowBy(TimeWindow.ofSizeNoGrace(...)) > >>>>>>>>>>>>>>>>>>>>>>> .emitFinal() > >>>>>>>>>>>>>>>>>>>>>>> .count() > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> The call to `emitFinal(0` seems not to be on the > >> right > >>>>>>>>> place > >>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>> case? > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Option 2 might work (I think we need to discuss a > >> few > >>>>>>>>>>>>>> details > >>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> API > >>>>>>>>>>>>>>>>>>>>>>> though): > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> stream > >>>>>>>>>>>>>>>>>>>>>>> .groupByKey() > >>>>>>>>>>>>>>>>>>>>>>> .windowBy( > >>>>>>>>>>>>>>>>>>>>>>> TimeWindow.ofSizeNoGrace(...), > >>>>>>>>>>>>>>>>>>>>>>> EmitConfig.emitFinal() -- just made > this > >>> up; > >>>>>>>> it's > >>>>>>>>>>>>>> not > >>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> KIP > >>>>>>>>>>>>>>>>>>>>>>> ) > >>>>>>>>>>>>>>>>>>>>>>> .count() > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> I made up the `WindowConfig.emitFinal()` call -- > >> from > >>> the > >>>>>>>>>>>>>> KIP > >>>>>>>>>>>>>>>>> it's > >>>>>>>>>>>>>>>>>>>>>>> unclear what API you have in mind? > `EmitFinalConfig` > >>> has > >>>>>>>>> not > >>>>>>>>>>>>>>>>> public > >>>>>>>>>>>>>>>>>>>>>>> constructor not any builder method. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> For option 3, I am not sure what you really have in > >>> mind. > >>>>>>>>>>>>>> Can > >>>>>>>>>>>>>>> you > >>>>>>>>>>>>>>>>>>>> given > >>>>>>>>>>>>>>>>>>>>>>> a concrete example (similar to above) how users > >> would > >>>>>>>> write > >>>>>>>>>>>>>>> their > >>>>>>>>>>>>>>>>>>>> code? > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Did you consider to actually pass in the > >> `EmitConfig` > >>>>>>>> into > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> aggregation operator? In the end, it seems not to > be > >>>>>>>>>>>>>> property > >>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> window definition or windowing step, but a property > >> of > >>>>>>>> the > >>>>>>>>>>>>>>> actual > >>>>>>>>>>>>>>>>>>>>> operator: > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> stream > >>>>>>>>>>>>>>>>>>>>>>> .groupByKey() > >>>>>>>>>>>>>>>>>>>>>>> .windowBy( > >>>>>>>>>>>>>>>>>>>>>>> TimeWindow.ofSizeNoGrace(...) > >>>>>>>>>>>>>>>>>>>>>>> ) > >>>>>>>>>>>>>>>>>>>>>>> .count(EmitConfig.emitFinal()) > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> The API surface area that need to be updated might > >> be > >>>>>>>>> larger > >>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>>>> case though... > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> On 3/14/22 9:21 PM, Hao Li wrote: > >>>>>>>>>>>>>>>>>>>>>>>> Thanks Guozhang! > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> 1. I agree `EmitConfig` is better than > >> `WindowConfig` > >>>>>>>> and > >>>>>>>>>>>>>>>>> option 2 > >>>>>>>>>>>>>>>>>>>>>>> modifies > >>>>>>>>>>>>>>>>>>>>>>>> less places. What do you think of option 1 which > >>> doesn't > >>>>>>>>>>>>>>> change > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> current > >>>>>>>>>>>>>>>>>>>>>>>> `windowedBy` api but configures `EmitConfig` > >>> separately. > >>>>>>>>>>>>>> The > >>>>>>>>>>>>>>>>>>> benefit > >>>>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>>> option 1 is if we need to configure something else > >>>>>>>> later, > >>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>> don't > >>>>>>>>>>>>>>>>>>>> need > >>>>>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>> pile them on `windowedBy` but can add separate > >> APIs. > >>>>>>>>>>>>>>>>>>>>>>>> 2. I added it to `Stores` mainly to conform to > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>> > >>> > >> > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java#L227-L231 > >>>>>>>>>>>>>>>>>>>>>>> . > >>>>>>>>>>>>>>>>>>>>>>>> But We can also create an internal API to do that > >>>>>>>> without > >>>>>>>>>>>>>>>>> modifying > >>>>>>>>>>>>>>>>>>>>>>>> `Stores`. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Hao > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> On Mon, Mar 14, 2022 at 7:52 PM Guozhang Wang < > >>>>>>>>>>>>>>>>> wangg...@gmail.com> > >>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Hello Hao, > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the proposal, I have some preference > >>> among > >>>>>>>> the > >>>>>>>>>>>>>>>>> options > >>>>>>>>>>>>>>>>>>>> here > >>>>>>>>>>>>>>>>>>>>>>> so I > >>>>>>>>>>>>>>>>>>>>>>>>> will copy them here: > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> I'm now thinking if it's better to not add this > >> new > >>>>>>>>> config > >>>>>>>>>>>>>>> on > >>>>>>>>>>>>>>>>> each > >>>>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>> Window interfaces, but instead add that at the > >>>>>>>>>>>>>>>>>>>>> KGroupedStream#windowedBy > >>>>>>>>>>>>>>>>>>>>>>>>> function. Also instead of adding just a boolean > >>> flag, > >>>>>>>>>>>>>> maybe > >>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>>>>>> add a > >>>>>>>>>>>>>>>>>>>>>>>>> Configured class like Grouped, Suppressed, etc, > >> e.g. > >>>>>>>>> let's > >>>>>>>>>>>>>>> call > >>>>>>>>>>>>>>>>>>> it a > >>>>>>>>>>>>>>>>>>>>>>>>> Emitted which for now would just have a single > >>>>>>>> construct > >>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>>>>>>>>>> Emitted.atWindowClose whose semantics is the same > >> as > >>>>>>>>>>>>>>> emitFinal > >>>>>>>>>>>>>>>>> == > >>>>>>>>>>>>>>>>>>>>> true. > >>>>>>>>>>>>>>>>>>>>>>> I > >>>>>>>>>>>>>>>>>>>>>>>>> think the benefits are: > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> 1) you do not need to modify multiple Window > >>> classes, > >>>>>>>> but > >>>>>>>>>>>>>>> just > >>>>>>>>>>>>>>>>>>>>> overload > >>>>>>>>>>>>>>>>>>>>>>> one > >>>>>>>>>>>>>>>>>>>>>>>>> windowedBy function with a second param. This is > >>> less > >>>>>>>> of > >>>>>>>>> a > >>>>>>>>>>>>>>>>> scope > >>>>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>>> now, > >>>>>>>>>>>>>>>>>>>>>>>>> and also more extensible for any future changes. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> 2) With a config interface, we maintain its > >>>>>>>> extensibility > >>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>> well > >>>>>>>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>>>>>>>> being > >>>>>>>>>>>>>>>>>>>>>>>>> able to reuse this Emitted interface for other > >>>>>>>> operators > >>>>>>>>>>>>>> if > >>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>> wanted > >>>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>>> expand to. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> ---------------------------- > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> So in general I'm leaning towards option 2). For > >>> that, > >>>>>>>>>>>>>> some > >>>>>>>>>>>>>>>>> more > >>>>>>>>>>>>>>>>>>>>>>> detailed > >>>>>>>>>>>>>>>>>>>>>>>>> comments: > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> 1) If we want to reuse that config object for > >> other > >>>>>>>>>>>>>>> non-window > >>>>>>>>>>>>>>>>>>>>> stateful > >>>>>>>>>>>>>>>>>>>>>>>>> operations, I think naming it as `EmitConfig` is > >>>>>>>> probably > >>>>>>>>>>>>>>>>> better > >>>>>>>>>>>>>>>>>>>> than > >>>>>>>>>>>>>>>>>>>>>>>>> `WindowConfig`. > >>>>>>>>>>>>>>>>>>>>>>>>> 2) I saw your PR ( > >>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/11892) > >>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>> you > >>>>>>>>>>>>>>>>>>>>>>> are > >>>>>>>>>>>>>>>>>>>>>>>>> also proposing to add new stores into the public > >>>>>>>> factory > >>>>>>>>>>>>>>>>> Stores, > >>>>>>>>>>>>>>>>>>> but > >>>>>>>>>>>>>>>>>>>>>>> it's > >>>>>>>>>>>>>>>>>>>>>>>>> not included in the KIP. Is that intentional? > >>>>>>>> Personally > >>>>>>>>> I > >>>>>>>>>>>>>>>>> think > >>>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>>>> although we may eventually want to add a new > store > >>> type > >>>>>>>>> to > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> public > >>>>>>>>>>>>>>>>>>>>>>> APIs, > >>>>>>>>>>>>>>>>>>>>>>>>> for this KIP maybe we do not have to add them but > >>> can > >>>>>>>>>>>>>> delay > >>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>> later > >>>>>>>>>>>>>>>>>>>>>>> after > >>>>>>>>>>>>>>>>>>>>>>>>> we've learned the best way to layout. LMK what do > >>> you > >>>>>>>>>>>>>> think? > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Guozhang > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Mar 11, 2022 at 2:13 PM Hao Li > >>>>>>>>>>>>>>>>> <h...@confluent.io.invalid> > >>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> Hi Dev team, > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> I'd like to start a discussion thread on Kafka > >>> Streams > >>>>>>>>>>>>>>>>> KIP-825: > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>> > >>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> This KIP is aimed to add new APIs to support > >>>>>>>> outputting > >>>>>>>>>>>>>>> final > >>>>>>>>>>>>>>>>>>>>>>> aggregated > >>>>>>>>>>>>>>>>>>>>>>>>>> results for windowed aggregations. I listed > >> several > >>>>>>>>>>>>>> options > >>>>>>>>>>>>>>>>> there > >>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>> I'm > >>>>>>>>>>>>>>>>>>>>>>>>>> looking forward to your feedback. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>>>>>>>>> Hao > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>>>>>>>>>>> -- Guozhang > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>>>>>> -- Guozhang > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>> Hao > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>>>> -- Guozhang > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>> -- Guozhang > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> -- > >>>>>>>>>>>>>> -- Guozhang > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -- > >>>>>>>>> -- Guozhang > >>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> -- > >>>>>>>> Thanks, > >>>>>>>> Hao > >>>>>>>> > >>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>>> > >>>> > >>> > >> > >> > >> -- > >> Thanks, > >> Hao > >> > > > > > -- Thanks, Hao