Hi all,

I just updated the KIP with option 1 as design and put option 2 and 3 in
rejected alternatives. Since Matthias is strongly against `trigger`,
I adopted the proposed `EmitStrategy` and dropped the "with" in the
function name. So it's like this:

stream
      .groupBy(..)
      .windowedBy(..)
      .emitStrategy(EmitStrategy.onWindowClose())
      .aggregate(..)
      .mapValues(..)

I used `onWindowClose` since `EmitStrategy` is meant to be an interface.

Hao

On Wed, Mar 23, 2022 at 6:35 PM Matthias J. Sax <mj...@apache.org> wrote:

> Wow. Quite a thread... #namingIsHard :D
>
> I won't repeat all arguments which are all very good ones. I can just
> state my personal favorite option:
>
> stream
>       .groupBy(..)
>       .windowedBy(..)
>       .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE)
>       .aggregate(..)
>       .mapValues(..)
>
> Is seems to be the best compromise / trade-off across the board.
> Personally, I would strong advocate against using `trigger()`!
>
>
> -Matthias
>
>
> On 3/23/22 4:38 PM, Guozhang Wang wrote:
> > Hao is right, I think that's the hindsight we have for `suppress` which
> > since can be applied anywhere for a K(windowed)Table, incurs an awkward
> > programming flexibility and I felt it's better to make its application
> > scope more constraint.
> >
> > And I also agree with John that, unless any of us feel strongly about any
> > options, Hao could make the final call about the namings.
> >
> >
> > Guozhang
> >
> > On Wed, Mar 23, 2022 at 1:49 PM Hao Li <h...@confluent.io.invalid> wrote:
> >
> >> For
> >>
> >> stream
> >>        .groupBy(..)
> >>        .windowedBy(..)
> >>        .aggregate(..)
> >>        .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE)
> >>        .mapValues(..)
> >>
> >> I think after `aggregate` it's already a table and then the emit
> strategy
> >> is too late to control
> >> how windowed stream is outputted to table. This is the concern Guozhang
> >> raised about having this in existing `suppress` operator as well.
> >>
> >> Thanks,
> >> Hao
> >>
> >> On Wed, Mar 23, 2022 at 1:05 PM Bruno Cadonna <cado...@apache.org>
> wrote:
> >>
> >>> Hi,
> >>>
> >>> Thank you for your answers to my questions!
> >>>
> >>> I see the argument about conciseness of configuring a stream with
> >>> methods instead of config objects. I just miss a bit the descriptive
> >>> aspect.
> >>>
> >>> What about
> >>>
> >>> stream
> >>>        .groupBy(..)
> >>>        .windowedBy(..)
> >>>        .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE)
> >>>        .aggregate(..)
> >>>        .mapValues(..)
> >>>
> >>> I have also another question. Why should emitting of results be
> >>> controlled by the window level api? If I want to emit results for each
> >>> input record the emit strategy is quite independent from the window. So
> >>> I somehow share Matthias' and Guozhang's concern that the emit strategy
> >>> seems misplaced there.
> >>>
> >>> What are the arguments against?
> >>>
> >>> stream
> >>>        .groupBy(..)
> >>>        .windowedBy(..)
> >>>        .aggregate(..)
> >>>        .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE)
> >>>        .mapValues(..)
> >>>
> >>>
> >>> A final administrative request: Hao, could you please add the rejected
> >>> alternatives to the KIP so that future us will know why we rejected
> them?
> >>>
> >>> Best,
> >>> Bruno
> >>>
> >>> On 23.03.22 19:38, John Roesler wrote:
> >>>> Hi all,
> >>>>
> >>>> I can see both sides of this.
> >>>>
> >>>> On one hand, when we say
> >>>> "stream.groupBy().windowBy().count()", it seems like we're
> >>>> telling KS to take the raw stream, group it based on key,
> >>>> then window it based on time, and then compute an
> >>>> aggregation on the windows. In that model, "trigger()" would
> >>>> have to mean something like "trigger it", which doesn't
> >>>> really make sense, since we aren't "triggering" the
> >>>> aggregation (then again, to an outside observer, it would
> >>>> appear that way... food for thought).
> >>>>
> >>>> Another way to look at it is that all we're really doing is
> >>>> configuring a windowed aggreation on the stream, and we're
> >>>> doing it with a progressive builder interface. In other
> >>>> words, the above is just a progressive builder for
> >>>> configuring an operation like
> >>>> "stream.aggregate(groupingConfig, windowingConfig,
> >>>> countFn)". Under the latter interpretation of the DSL, it
> >>>> makes perfect sense to add more optional progressive builder
> >>>> methods like trigger() to the WindowedKStream interfaces.
> >>>>
> >>>> Since part of the motivation for choosing the word "trigger"
> >>>> here is to stay close to what Flink defines, I'll also point
> >>>> out that Flink's syntax is also
> >>>> "stream.keyBy().window().trigger().aggregate()". Not that
> >>>> their API is the holy grail or anything, but it's at least
> >>>> an indication that this API isn't a horrible mistake.
> >>>>
> >>>> All other things being equal, I also prefer to leave tie-
> >>>> breakers in the hands of the contributor. So, if we've all
> >>>> said our piece and Hao still prefers option 1, then (as long
> >>>> as we don't think it's a horrible mistake), I think we
> >>>> should just let him go for it.
> >>>>
> >>>> Speaking of which, after reviewing the responses regarding
> >>>> deprecating `Suppressed#onWindowClose`, I still think we
> >>>> should just go ahead and deprecate it. Although it's not
> >>>> expressed exactly the same way, it still does exactly the
> >>>> same thing, or so close that it seems confusing to keep
> >>>> both. But again, if Hao really prefers to keep both, I won't
> >>>> insist on it :)
> >>>>
> >>>> Thanks all,
> >>>> -John
> >>>>
> >>>> On Wed, 2022-03-23 at 09:59 -0700, Hao Li wrote:
> >>>>> Thanks Bruno!
> >>>>>
> >>>>> Argument for option 1 is:
> >>>>> 1. Concise and descriptive. It avoids overloading existing functions
> >> and
> >>>>> it's very clear what it's doing. Imagine if there's a autocomplete
> >>> feature
> >>>>> in Intellij or other IDE for our DSL in the future, it's not
> favorable
> >>> to
> >>>>> show 6 `windowedBy` functions.
> >>>>> 2. Option 1 is operated on `windowedStream` to configure how it
> should
> >>> be
> >>>>> outputted. Option 2 operates on `KGroupedStream` to produce
> >>>>> `windowedStream` as well as configure how `windowedStream` should be
> >>>>>       outputted. I feel it's better to have a `windowedStream` and
> then
> >>>>> configure how it can be outputted. Somehow I feel option 2 breaks the
> >>>>> builder pattern.
> >>>>> 3. `WindowedByParameters` doesn't seem very descriptive. If we put
> all
> >>>>> kinds of different parameters into it to avoid future overloading,
> >> it's
> >>> too
> >>>>> bloated and not very user friendly.
> >>>>>
> >>>>> I agree option 1's `trigger` function is configuring the stream which
> >>> feels
> >>>>> different from existing `count` or `aggregate` etc. Configuring might
> >> be
> >>>>> also a kind of action to stream :) I'm not sure if it breaks DSL
> >>> principle
> >>>>> and if it does,
> >>>>> can we relax the principle given the benefits compared to option 2)?
> >>> Maybe
> >>>>> John can chime in as the DSL grammar author.
> >>>>>
> >>>>> Thanks,
> >>>>> Hao
> >>>>>
> >>>>> On Wed, Mar 23, 2022 at 2:59 AM Bruno Cadonna <cado...@apache.org>
> >>> wrote:
> >>>>>
> >>>>>> Hi Hao,
> >>>>>>
> >>>>>> I agree with Guozhang: Great summary! Thank you!
> >>>>>>
> >>>>>> Regarding "aligned with other config class names", there is this DSL
> >>>>>> grammar John once specified
> >>>>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar
> >>>>>> and we have already used it in the code. I found the grammar quite
> >>> useful.
> >>>>>>
> >>>>>> I am undecided if option 1 is really worth it. What are actually the
> >>>>>> arguments in favor of it? Is it only that we do not need to overload
> >>>>>> other methods? This does not seem worth to break DSL principles. An
> >>>>>> alternative proposal would be to go with option 2 and conform with
> >> the
> >>>>>> grammar above:
> >>>>>>
> >>>>>> <W extends Window> TimeWindowedKStream<K, V> windowedBy(final
> >>> Windows<W>
> >>>>>> windows, WindowedByParameters parameters);
> >>>>>>
> >>>>>> TimeWindowedKStream<K, V> windowedBy(final SlidingWindows windows,
> >>>>>> WindowedByParameters parameters);
> >>>>>>
> >>>>>> SessionWindowedKStream<K, V> windowedBy(final SessionWindows
> windows,
> >>>>>> WindowedByParameters parameters);
> >>>>>>
> >>>>>> This is similar to option 2 in the KIP, but it ensures that we put
> >> all
> >>>>>> future needed configs in the parameters object and we do not need to
> >>>>>> overload the methods anymore.
> >>>>>>
> >>>>>> Then if we also get KAFKA-10298 done, we could even collapse all
> >>>>>> `windowedBy()` methods into one.
> >>>>>>
> >>>>>> Best,
> >>>>>> Bruno
> >>>>>>
> >>>>>> On 22.03.22 22:31, Guozhang Wang wrote:
> >>>>>>> Thanks for the great summary Hao. I'm still learning towards option
> >> 2)
> >>>>>>> here, and I'm in favor of `trigger` as function name, and
> >> `Triggered`
> >>> as
> >>>>>>> config class name (mainly to be aligned with other config class
> >>> names).
> >>>>>>> Also want to see other's preferences between the options, as well
> as
> >>> the
> >>>>>>> namings.
> >>>>>>>
> >>>>>>>
> >>>>>>> Guozhang
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Tue, Mar 22, 2022 at 12:23 PM Hao Li <h...@confluent.io.invalid>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>>> `windowedStream.onWindowClose()` was the original option 1
> >>>>>>>> (`windowedStream.emitFinal()`) but was rejected
> >>>>>>>> because we could add more emit types and this will result in
> adding
> >>> more
> >>>>>>>> functions. I still prefer the
> >>>>>>>> "windowedStream.someFunc(Controlled.onWindowClose)"
> >>>>>>>> model since it's flexible and clear that it's configuring the emit
> >>>>>> policy.
> >>>>>>>> Let me summarize all the naming options we have and compare:
> >>>>>>>>
> >>>>>>>> *API function name:*
> >>>>>>>>
> >>>>>>>> *1. `windowedStream.trigger()`*
> >>>>>>>>        Pros:
> >>>>>>>>           i. Simple
> >>>>>>>>           ii. Similar to Flink's trigger function (is this a con
> >>>>>> actually?)
> >>>>>>>>        Cons:
> >>>>>>>>           i. `trigger()` can be confused with Flink trigger
> (raised
> >> by
> >>>>>> John)
> >>>>>>>>           ii. `trigger()` feels like an operation instead of a
> >>> configure
> >>>>>>>> function (raised by Bruno)?
> >>>>>>>>
> >>>>>>>> *2. `windowedStream.emitTrigger()`*
> >>>>>>>>         Pros:
> >>>>>>>>           i. Avoid confusion from Flink's trigger API
> >>>>>>>>           ii. `emitTrigger` feels like configuring the trigger
> >> because
> >>>>>>>> "trigger" here is a noun instead of verbose in `trigger()`
> >>>>>>>>         Cons:
> >>>>>>>>         i: Verbose?
> >>>>>>>>        ii: Not consistent with `Suppressed.untilWindowClose`?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> *Config class/object name:*
> >>>>>>>>
> >>>>>>>> 1. *`Emitted.onWindowClose()`* and *`Emitted.onEachUpdate()`*
> >>>>>>>>         Cons:
> >>>>>>>>         i. Doesn't go along with `trigger` (raised by Bruno)
> >>>>>>>>
> >>>>>>>> 2. *`Triggered.onWindowClose()`* and *`Triggered.onEachUpdate()`*
> >>>>>>>>
> >>>>>>>> 3. *`EmitTrigger.onWindowClose()`* and
> >> *`EmitTrigger.onEachUpdate()`*
> >>>>>>>>
> >>>>>>>> 4. *`(Emit|Trigger)(Config|Policy).onWindowClose()`* and
> >>>>>>>> *`(Emit|Trigger)(Config|Policy).onEachUpdate()`*
> >>>>>>>>         This is a combination of different names like:
> `EmitConfig`,
> >>>>>>>> `EmitPolicy`, `TriggerConfig` and `TriggerPolicy`...
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> If we are settled with option 1), we can add new options to these
> >>> names
> >>>>>> and
> >>>>>>>> comment on their Pros and Cons.
> >>>>>>>>
> >>>>>>>> Hao
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Tue, Mar 22, 2022 at 10:48 AM Guozhang Wang <
> wangg...@gmail.com
> >>>
> >>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> I see what you mean now, and I think it's a fair point that
> >>> composing
> >>>>>>>>> `trigger` and `emitted` seems awkward.
> >>>>>>>>>
> >>>>>>>>> Re: data process operator v.s. control operator, I shared your
> >>> concern
> >>>>>> as
> >>>>>>>>> well, and here's my train of thoughts: Having only data process
> >>>>>> operators
> >>>>>>>>> was my primary motivation for how we add the suppress operator
> ---
> >>> it
> >>>>>>>>> indeed "suppresses" data. But as a hind-sight it's disadvantage
> is
> >>>>>> that,
> >>>>>>>>> for example in Suppressed.onWindowClose() should be only related
> >> to
> >>> an
> >>>>>>>>> earlier windowedBy operator which is possibly very far from it in
> >>> the
> >>>>>>>>> resulting DSL code. It's not only a bit awkward for users to
> write
> >>> such
> >>>>>>>>> code, but also in such cases the DSL builder needs to maintain
> and
> >>>>>>>>> propagate this information to the suppress operator further down.
> >>> So we
> >>>>>>>> are
> >>>>>>>>> now thinking about "putting the control object as close as to
> >> where
> >>> the
> >>>>>>>>> related processor really happens". And in that world my original
> >>>>>>>>> preference was somewhere in option 2), i.e. just put the control
> >> as
> >>> a
> >>>>>>>> param
> >>>>>>>>> of the related "windowedBy" operator, but the trade-off is we
> keep
> >>>>>> adding
> >>>>>>>>> overloaded functions to these operators. So after some back and
> >>> forth
> >>>>>>>>> thoughts I'm learning towards relaxing our principles to only
> have
> >>>>>>>>> processing operators but no flow-control operators. That being
> >>> said, if
> >>>>>>>> you
> >>>>>>>>> have any ideas that we can have both world's benefits I'm all
> >> ears.
> >>>>>>>>>
> >>>>>>>>> Re: using a direct function like "windowedStream.onWindowClose()"
> >>> v.s.
> >>>>>>>>> "windowedStream.someFunc(Controlled.onWindowClose)", again my
> >>>>>> motivation
> >>>>>>>>> for the latter is for extensibility without adding more functions
> >> in
> >>>>>> the
> >>>>>>>>> future. If people feel this is not worthy we can do the first
> >>> option as
> >>>>>>>>> well. If we just feel the `trigger` and `emitted` does not feel
> >>>>>>>> composible
> >>>>>>>>> together, maybe we can consider something like
> >>>>>>>>> `windowedStream.trigger(Triggered.onWindowClose())"?
> >>>>>>>>>
> >>>>>>>>> Re: windowedBy v.s. windowBy, yeah I do not really have a good
> >>> reason
> >>>>>> why
> >>>>>>>>> we should use past term as well :P But if it's not bothering
> >> people
> >>>>>> much
> >>>>>>>>> I'd say we just keep it than deprecate/rename new APIs.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Tue, Mar 22, 2022 at 9:42 AM Bruno Cadonna <
> cado...@apache.org
> >>>
> >>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi Guozhang,
> >>>>>>>>>>
> >>>>>>>>>> There is no semantic difference. It is a cosmetic difference.
> >>>>>>>>>> Conceptually, I relate `Emitted` with the aggregation and not
> >> with
> >>>>>>>>>> `trigger()` in the API flow, because the aggregation emits the
> >>> result
> >>>>>>>>>> not `trigger()`. Therefore, I proposed to not use `Emitted` as
> >> the
> >>>>>> name
> >>>>>>>>>> of the config object passed to `trigger()`.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Best,
> >>>>>>>>>> Bruno
> >>>>>>>>>>
> >>>>>>>>>> On 22.03.22 17:24, Guozhang Wang wrote:
> >>>>>>>>>>> Hi Bruno,
> >>>>>>>>>>>
> >>>>>>>>>>> Could you elaborate a bit more here, what's the semantic
> >>> difference
> >>>>>>>>>> between
> >>>>>>>>>>> "the aggregation is triggered on window close and all
> >> aggregation
> >>>>>>>>> results
> >>>>>>>>>>> are emitted." for trigger(TriggerParameters.onWindowClose()),
> >> and
> >>>>>>>> "the
> >>>>>>>>>>> aggregation is configured to only emit final results." for
> >>>>>>>>>>> trigger(Emitted.onWindowClose())?
> >>>>>>>>>>>
> >>>>>>>>>>> On Tue, Mar 22, 2022 at 4:19 AM Bruno Cadonna <
> >> cado...@apache.org
> >>>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi Hao,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thank you for the KIP!
> >>>>>>>>>>>>
> >>>>>>>>>>>> Regarding option 1, I would not use `Emitted.onWindowClose()`
> >>> since
> >>>>>>>>> that
> >>>>>>>>>>>> does not seem compatible with the proposed flow. Conceptually,
> >>> now
> >>>>>>>> the
> >>>>>>>>>>>> flow states that the aggregation is triggered on window close
> >> and
> >>>>>>>> all
> >>>>>>>>>>>> aggregation results are emitted. `Emitted` suggests that the
> >>>>>>>>> aggregation
> >>>>>>>>>>>> is configured to only emit final results.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thus, I propose the following:
> >>>>>>>>>>>>
> >>>>>>>>>>>> stream
> >>>>>>>>>>>>          .groupBy(..)
> >>>>>>>>>>>>          .windowedBy(..)
> >>>>>>>>>>>>          .trigger(TriggerParameters.onWindowClose())
> >>>>>>>>>>>>          .aggregate(..) //result in a KTable<Windowed<..>>
> >>>>>>>>>>>>          .mapValues(..)
> >>>>>>>>>>>>
> >>>>>>>>>>>> An alternative to `trigger()` could be `schedule()`, but I do
> >> not
> >>>>>>>>> really
> >>>>>>>>>>>> like it.
> >>>>>>>>>>>>
> >>>>>>>>>>>> One thing I noticed with option 1 is that all other methods in
> >>> the
> >>>>>>>>>>>> example above are operations on data. `groupBy()` groups,
> >>>>>>>>> `windowedBy()`
> >>>>>>>>>>>> partitions, `aggregate()` computes the aggregate,
> `mapValues()`
> >>> maps
> >>>>>>>>>>>> values, even `suppress()` suppresses intermediate results. But
> >>> what
> >>>>>>>>> does
> >>>>>>>>>>>> `trigger()` do? `trigger()` seems a config lost among
> >> operations.
> >>>>>>>>>>>>
> >>>>>>>>>>>> However, if we do not want to restrict ourselves to only use
> >>> methods
> >>>>>>>>>>>> when we want to specify operations on data, I have the
> >> following
> >>>>>>>>>> proposal:
> >>>>>>>>>>>>
> >>>>>>>>>>>> stream
> >>>>>>>>>>>>          .groupBy(..)
> >>>>>>>>>>>>          .windowedBy(..)
> >>>>>>>>>>>>          .onWindowClose()
> >>>>>>>>>>>>          .aggregate(..) //result in a KTable<Windowed<..>>
> >>>>>>>>>>>>          .mapValues(..)
> >>>>>>>>>>>>
> >>>>>>>>>>>> Best,
> >>>>>>>>>>>> Bruno
> >>>>>>>>>>>>
> >>>>>>>>>>>> P.S.: Why is it `windowedBy()` and not `windowBy()`? All other
> >>>>>>>>>>>> operations also use present tense.
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 22.03.22 06:36, Hao Li wrote:
> >>>>>>>>>>>>> Hi John,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Yes. For naming, `trigger` is similar to Flink's trigger, but
> >> it
> >>>>>>>> has
> >>>>>>>>> a
> >>>>>>>>>>>>> different meaning in our case. `emit` sounds like an action
> to
> >>>>>>>> emit?
> >>>>>>>>>> How
> >>>>>>>>>>>>> about `emitTrigger`? I'm open to suggestions for the naming.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> For deprecating `Suppressed.untilWindowClose`, I agree with
> >>>>>>>> Guozhang
> >>>>>>>>> we
> >>>>>>>>>>>> can
> >>>>>>>>>>>>> deprecate `Suppressed` config as a whole later. Or we can
> >>> deprecate
> >>>>>>>>>>>>> `Suppressed.untilWindowClose` in later KIP after
> >> implementation
> >>> of
> >>>>>>>>> emit
> >>>>>>>>>>>>> final is done.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> BTW, isn't
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> stream
> >>>>>>>>>>>>>        .groupBy(..)
> >>>>>>>>>>>>>        .windowBy(..)
> >>>>>>>>>>>>>        .aggregate(..) //result in a KTable<Windowed<..>>
> >>>>>>>>>>>>>        .mapValues(..)
> >>>>>>>>>>>>>        .suppress(Suppressed.untilWindowClose) // since we can
> >>> trace
> >>>>>>>> back
> >>>>>>>>>> to
> >>>>>>>>>>>>> parent node, to find a window definition
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> same as
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> stream
> >>>>>>>>>>>>>        .groupBy(..)
> >>>>>>>>>>>>>        .windowBy(..)
> >>>>>>>>>>>>>        .trigger(Emitted.onWindowClose)
> >>>>>>>>>>>>>        .aggregate(..) //result in a KTable<Windowed<..>>
> >>>>>>>>>>>>>        .mapValues(..)
> >>>>>>>>>>>>> ?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Hao
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Mon, Mar 21, 2022 at 7:28 PM Guozhang Wang <
> >>> wangg...@gmail.com>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> I think the following case is only doable via `suppress`:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> stream
> >>>>>>>>>>>>>>        .groupBy(..)
> >>>>>>>>>>>>>>        .windowBy(..)
> >>>>>>>>>>>>>>        .aggregate(..) //result in a KTable<Windowed<..>>
> >>>>>>>>>>>>>>        .mapValues(..)
> >>>>>>>>>>>>>>        .suppress(Suppressed.untilWindowClose) // since we
> can
> >>> trace
> >>>>>>>>> back
> >>>>>>>>>> to
> >>>>>>>>>>>>>> parent node, to find a window definition
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Mon, Mar 21, 2022 at 6:36 PM John Roesler <
> >>> vvcep...@apache.org
> >>>>>>>>>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks, Guozhang!
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> To clarify, I was asking specifically about deprecating
> just
> >>> the
> >>>>>>>>>> method
> >>>>>>>>>>>>>>> ‘untilWindowClose’. I might not be thinking clearly about
> >> it,
> >>>>>>>>> though.
> >>>>>>>>>>>>>> What
> >>>>>>>>>>>>>>> does untilWindowClose do that this KIP doesn’t cover?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>> John
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Mon, Mar 21, 2022, at 20:31, Guozhang Wang wrote:
> >>>>>>>>>>>>>>>> Just my 2c: Suppressed is in `suppress` whose application
> >>> scope
> >>>>>>>> is
> >>>>>>>>>>>> much
> >>>>>>>>>>>>>>>> larger and hence more flexible. I.e. it can be used
> >> anywhere
> >>>>>>>> for a
> >>>>>>>>>>>>>>> `KTable`
> >>>>>>>>>>>>>>>> (but internally we would check whether certain emit
> >> policies
> >>>>>>>> like
> >>>>>>>>>>>>>>>> `untilWindowClose` is valid or not), whereas `trigger` as
> >> for
> >>>>>>>> now
> >>>>>>>>> is
> >>>>>>>>>>>>>> only
> >>>>>>>>>>>>>>>> applicable in XXWindowedKStream. So I think it would not
> be
> >>>>>>>>>> completely
> >>>>>>>>>>>>>>>> replacing Suppressed.untilWindowClose.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> In the future, personally I'd still want to keep one
> >> control
> >>>>>>>>> object
> >>>>>>>>>>>>>> still
> >>>>>>>>>>>>>>>> for all emit policies, and maybe if we have extended
> >> Emitted
> >>> for
> >>>>>>>>>> other
> >>>>>>>>>>>>>>>> emitting policies covered by Suppressed today, we can
> >>> discuss if
> >>>>>>>>> we
> >>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>> have `KTable.suppress(Emitted..)` replacing
> >>>>>>>>>>>>>>> `KTable.suppress(Suppressed..)`
> >>>>>>>>>>>>>>>> as a whole, but for this KIP I think it's too early.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Mon, Mar 21, 2022 at 6:18 PM John Roesler <
> >>>>>>>> vvcep...@apache.org
> >>>>>>>>>>
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks for the Kip, Hao!
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> For what it’s worth, I’m also in favor of your latest
> >>> framing
> >>>>>>>> of
> >>>>>>>>>> the
> >>>>>>>>>>>>>>> API,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I think the name is fine. I assume it’s inspired by
> Flink?
> >>> It’s
> >>>>>>>>> not
> >>>>>>>>>>>>>>>>> identical to the concept of a trigger in Flink, which
> >>> specifies
> >>>>>>>>>> when
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> evaluate the window, which might be confusing to some
> >> people
> >>>>>>>> who
> >>>>>>>>>> have
> >>>>>>>>>>>>>>> deep
> >>>>>>>>>>>>>>>>> experience with Flink. Then again, it seems close enough
> >>> that
> >>>>>>>> it
> >>>>>>>>>>>>>> should
> >>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>> clear to casual Flink users. For people with no other
> >> stream
> >>>>>>>>>>>>>> processing
> >>>>>>>>>>>>>>>>> experience, it might seem a bit esoteric compared to
> >>> something
> >>>>>>>>>>>>>>>>> self-documenting like ‘emit()’, but the docs should  make
> >> it
> >>>>>>>>> clear.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> One small question: it seems like this proposal is
> >>> identical to
> >>>>>>>>>>>>>>>>> Suppressed.untilWindowClose, and the KIP states that this
> >>> API
> >>>>>>>> is
> >>>>>>>>>>>>>>> superior.
> >>>>>>>>>>>>>>>>> In that case, should we deprecate
> >>> Suppressed.untilWindowClose?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>> John
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Mon, Mar 21, 2022, at 19:30, Guozhang Wang wrote:
> >>>>>>>>>>>>>>>>>> Hi Hao,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> For 2), I think it's a good idea in general to use a
> >>> separate
> >>>>>>>>>>>>>>> function on
> >>>>>>>>>>>>>>>>>> the Time/SessionWindowedKStream itself, to achieve the
> >> same
> >>>>>>>>> effect
> >>>>>>>>>>>>>>> that,
> >>>>>>>>>>>>>>>>>> for now, the emitting control is only for windowed
> >>>>>>>> aggregations
> >>>>>>>>> as
> >>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>> KIP, than overloading existing functions. We can discuss
> >>>>>>>> further
> >>>>>>>>>>>>>> about
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> actual function names, whether others like the name
> >>> `trigger`
> >>>>>>>> or
> >>>>>>>>>>>>>> not.
> >>>>>>>>>>>>>>> As
> >>>>>>>>>>>>>>>>>> for myself I feel `trigger` is a good one but I'd like
> to
> >>> see
> >>>>>>>> if
> >>>>>>>>>>>>>>> others
> >>>>>>>>>>>>>>>>>> have opinions as well.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On Mon, Mar 21, 2022 at 5:18 PM Hao Li
> >>>>>>>> <h...@confluent.io.invalid
> >>>>>>>>>>
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Hi Guozhang,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thanks for the feedback.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 1. I agree to have an `Emitted` control class and two
> >>> static
> >>>>>>>>>>>>>>>>> constructors
> >>>>>>>>>>>>>>>>>>> named `onWindowClose` and `onEachUpdate`.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 2. For the API function changes, I'm thinking of adding
> >> a
> >>> new
> >>>>>>>>>>>>>>> function
> >>>>>>>>>>>>>>>>>>> called `trigger` to `TimeWindowedKStream` and
> >>>>>>>>>>>>>>> `SessionWindowedKStream`.
> >>>>>>>>>>>>>>>>> It
> >>>>>>>>>>>>>>>>>>> takes `Emitted` config and returns the same stream.
> >>> Example:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> stream
> >>>>>>>>>>>>>>>>>>>        .groupBy(...)
> >>>>>>>>>>>>>>>>>>>        .windowedBy(...)
> >>>>>>>>>>>>>>>>>>>        .trigger(Emitted.onWindowClose). // N
> >>>>>>>>>>>>>>>>>>>        .count()
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> The benefits are:
> >>>>>>>>>>>>>>>>>>>        1. It's simple and avoids creating overloading
> of
> >>>>>>>> existing
> >>>>>>>>>>>>>>> functions
> >>>>>>>>>>>>>>>>> like
> >>>>>>>>>>>>>>>>>>> `windowedBy` or `count`, `reduce` or `aggregate`. In
> >>> fact, to
> >>>>>>>>> add
> >>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> `aggregate` functions, we need to add it to all
> existing
> >>>>>>>>> `count`,
> >>>>>>>>>>>>>>>>>>> `aggregate` overloading functions which is a lot.
> >>>>>>>>>>>>>>>>>>>        2. It operates directly on windowed kstream and
> >>> tells
> >>>>>> how
> >>>>>>>>> its
> >>>>>>>>>>>>>>> output
> >>>>>>>>>>>>>>>>>>> should be configured, if later we need to add this
> other
> >>> type
> >>>>>>>>> of
> >>>>>>>>>>>>>>>>> streams,
> >>>>>>>>>>>>>>>>>>> we can reuse same `trigger` API whereas other type of
> >>>>>>>>>>>>>> streams/tables
> >>>>>>>>>>>>>>> may
> >>>>>>>>>>>>>>>>>>> not have `aggregate`, `windowedby` api to make it
> >>> consistent.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Hao
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On Sat, Mar 19, 2022 at 5:40 PM Guozhang Wang <
> >>>>>>>>>> wangg...@gmail.com>
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Hello Hao,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I'm preferring option 2 over the other options mainly
> >>>>>>>> because
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>> added
> >>>>>>>>>>>>>>>>>>>> config object could potentially be used in other
> >>> operators
> >>>>>>>> as
> >>>>>>>>>>>>>> well
> >>>>>>>>>>>>>>>>> (not
> >>>>>>>>>>>>>>>>>>>> necessarily has to be a windowed operator and hence
> >> have
> >>> to
> >>>>>>>> be
> >>>>>>>>>>>>>>>>>>> piggy-backed
> >>>>>>>>>>>>>>>>>>>> on `windowedBy`, and that's also why I suggested not
> >>> naming
> >>>>>>>> it
> >>>>>>>>>>>>>>>>>>>> `WindowConfig` but just `EmitConfig`).
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> As for Matthias' question, I think the difference
> >> between
> >>>>>>>> the
> >>>>>>>>>>>>>>> windowed
> >>>>>>>>>>>>>>>>>>>> aggregate operator and the stream-stream join operator
> >> is
> >>>>>>>>> that,
> >>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> latter we think emit-final should be the only right
> >>> emitting
> >>>>>>>>>>>>>> policy
> >>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>> hence we should not let users to configure it. If
> users
> >>>>>>>>>> configure
> >>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>> e.g. emit eager they may get the old spurious emitting
> >>>>>>>>> behavior
> >>>>>>>>>>>>>>> which
> >>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>> violating the semantics.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> For option 2) itself, I have a few more thoughts:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 1. Thinking about Matthias' suggestions, I'm also
> >>> leaning a
> >>>>>>>>> bit
> >>>>>>>>>>>>>>>>>>>> towards adding the new param in the overloaded
> >>> `aggregate`,
> >>>>>>>>> than
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> overloaded `windowBy` function. The reason is that the
> >>>>>>>>> emitting
> >>>>>>>>>>>>>>> logic
> >>>>>>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>>>> be either window based or non-window based, in the
> long
> >>> run.
> >>>>>>>>>>>>>> Though
> >>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>> this KIP we could just add it in
> >>>>>>>>>>>>>> `XXXWindowedKStream.aggregate()`,
> >>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>> may
> >>>>>>>>>>>>>>>>>>>> want to extend to other non-windowed operators in the
> >>>>>>>> future.
> >>>>>>>>>>>>>>>>>>>> 2. To be consistent with other control class names, I
> >>> feel
> >>>>>>>>> maybe
> >>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>>>> name it "Emitted", not "EmitConfig".
> >>>>>>>>>>>>>>>>>>>> 3. Following the first comment, I think we can have
> the
> >>>>>>>> static
> >>>>>>>>>>>>>>>>>>> constructor
> >>>>>>>>>>>>>>>>>>>> names as "onWindowClose" and "onEachUpdate".
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> The resulted code pattern would be like this:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>         stream
> >>>>>>>>>>>>>>>>>>>>           .groupBy(..)
> >>>>>>>>>>>>>>>>>>>>           .windowBy(TimeWindow..)
> >>>>>>>>>>>>>>>>>>>>           .count(Emitted.onWindowClose)
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> WDYT?
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On Wed, Mar 16, 2022 at 12:07 PM Matthias J. Sax <
> >>>>>>>>>>>>>> mj...@apache.org
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> `allowedLateness` may not be a good name. What I
> >> have
> >>> in
> >>>>>>>>>>>>>> mind
> >>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>> use
> >>>>>>>>>>>>>>>>>>>>>>> this to control how frequently we try to emit final
> >>>>>>>>> results.
> >>>>>>>>>>>>>>>>> Maybe
> >>>>>>>>>>>>>>>>>>>> it's
> >>>>>>>>>>>>>>>>>>>>>>> more flexible to be used as config in properties as
> >> we
> >>>>>>>>> don't
> >>>>>>>>>>>>>>>>> need to
> >>>>>>>>>>>>>>>>>>>>>>> recompile DSL to change it.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> I see; making it a config seems better. Frankly, I am
> >>> not
> >>>>>>>>> even
> >>>>>>>>>>>>>>> sure
> >>>>>>>>>>>>>>>>> if
> >>>>>>>>>>>>>>>>>>>>> we need a config at all or if we can just hard code
> >> it?
> >>> For
> >>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> stream-stream join left/outer join fix, there is only
> >> an
> >>>>>>>>>>>>>> internal
> >>>>>>>>>>>>>>>>>>> config
> >>>>>>>>>>>>>>>>>>>>> but no public config either.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Option 1: Your proposal is?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>         stream
> >>>>>>>>>>>>>>>>>>>>>           .groupByKey()
> >>>>>>>>>>>>>>>>>>>>>           .windowBy(TimeWindow.ofSizeNoGrace(...))
> >>>>>>>>>>>>>>>>>>>>>           .configure(EmitConfig.emitFinal()
> >>>>>>>>>>>>>>>>>>>>>           .count()
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Does not change my argument that it seems to be
> >> misplace
> >>>>>>>> from
> >>>>>>>>>>>>>> an
> >>>>>>>>>>>>>>> API
> >>>>>>>>>>>>>>>>>>>>> flow POV.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Option 1 seems to be the least desirable to me.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> For option 2 and 3, and not sure which one I like
> >>> better.
> >>>>>>>>> Might
> >>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>> good
> >>>>>>>>>>>>>>>>>>>>> if other could chime in, too. I think I slightly
> >> prefer
> >>>>>>>>> option
> >>>>>>>>>>>>>> 2
> >>>>>>>>>>>>>>>>> over
> >>>>>>>>>>>>>>>>>>>>> option 3.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On 3/15/22 5:33 PM, Hao Li wrote:
> >>>>>>>>>>>>>>>>>>>>>> Thanks for the feedback Matthias.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> `allowedLateness` may not be a good name. What I
> have
> >>> in
> >>>>>>>>> mind
> >>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> use
> >>>>>>>>>>>>>>>>>>>>>> this to control how frequently we try to emit final
> >>>>>>>> results.
> >>>>>>>>>>>>>>> Maybe
> >>>>>>>>>>>>>>>>>>> it's
> >>>>>>>>>>>>>>>>>>>>>> more flexible to be used as config in properties as
> >> we
> >>>>>>>> don't
> >>>>>>>>>>>>>>> need
> >>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>> recompile DSL to change it.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> For option 1, I intend to use `emitFinal` to
> >> configure
> >>> how
> >>>>>>>>>>>>>>>>>>>>>> `TimeWindowedKStream` should be outputted to
> `KTable`
> >>>>>>>> after
> >>>>>>>>>>>>>>>>>>>> aggregation.
> >>>>>>>>>>>>>>>>>>>>>> But `emitFinal` is not an action to the
> >>>>>>>>> `TimeWindowedKStream`
> >>>>>>>>>>>>>>>>>>>> interface.
> >>>>>>>>>>>>>>>>>>>>>> Maybe adding `configure(EmitConfig config)` makes
> >> more
> >>>>>>>>> sense?
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> For option 2, config can be created using
> >>>>>>>>>>>>>>>>> `WindowConfig.emitFinal()`
> >>>>>>>>>>>>>>>>>>> or
> >>>>>>>>>>>>>>>>>>>>>> `EmitConfig.emitFinal`
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> For option 3, it will be something like
> >>> `TimeWindows(...,
> >>>>>>>>>>>>>>>>> EmitConfig
> >>>>>>>>>>>>>>>>>>>>>> emitConfig)`.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> For putting `EmitConfig` in aggregation operator, I
> >>> think
> >>>>>>>> it
> >>>>>>>>>>>>>>>>> doesn't
> >>>>>>>>>>>>>>>>>>>>>> control how we do aggregation but how we output to
> >>>>>>>> `KTable`.
> >>>>>>>>>>>>>>>>> That's
> >>>>>>>>>>>>>>>>>>>> why I
> >>>>>>>>>>>>>>>>>>>>>> feel option 1 makes more sense as it applies to
> >>>>>>>>>>>>>>>>>>> `TimeWindowedKStream`.
> >>>>>>>>>>>>>>>>>>>>> But
> >>>>>>>>>>>>>>>>>>>>>> I'm also OK with option 2.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Hao
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> On Tue, Mar 15, 2022 at 4:48 PM Matthias J. Sax <
> >>>>>>>>>>>>>>> mj...@apache.org
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> A general comment: it seem that we won't need any
> >> new
> >>>>>>>>>>>>>>>>>>>> `allowedLateness`
> >>>>>>>>>>>>>>>>>>>>>>> parameter because the grace-period is defined on
> the
> >>>>>>>> window
> >>>>>>>>>>>>>>>>> itself
> >>>>>>>>>>>>>>>>>>>>> already?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> (On the other hand, if I think about it once more,
> >>> maybe
> >>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> `grace-period` is actually not a property of the
> >>> window
> >>>>>>>> but
> >>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>> property
> >>>>>>>>>>>>>>>>>>>>>>> of the aggregation operator? _thinking_)
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>        From an API flow point of view, option 1
> might
> >>> not
> >>>>>> be
> >>>>>>>>>>>>>>> desirable
> >>>>>>>>>>>>>>>>>>>> IMHO:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>          stream
> >>>>>>>>>>>>>>>>>>>>>>>            .groupByKey()
> >>>>>>>>>>>>>>>>>>>>>>>            .windowBy(TimeWindow.ofSizeNoGrace(...))
> >>>>>>>>>>>>>>>>>>>>>>>            .emitFinal()
> >>>>>>>>>>>>>>>>>>>>>>>            .count()
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> The call to `emitFinal(0` seems not to be on the
> >> right
> >>>>>>>>> place
> >>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>> case?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Option 2 might work (I think we need to discuss a
> >> few
> >>>>>>>>>>>>>> details
> >>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> API
> >>>>>>>>>>>>>>>>>>>>>>> though):
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>          stream
> >>>>>>>>>>>>>>>>>>>>>>>            .groupByKey()
> >>>>>>>>>>>>>>>>>>>>>>>            .windowBy(
> >>>>>>>>>>>>>>>>>>>>>>>              TimeWindow.ofSizeNoGrace(...),
> >>>>>>>>>>>>>>>>>>>>>>>              EmitConfig.emitFinal() -- just made
> this
> >>> up;
> >>>>>>>> it's
> >>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> KIP
> >>>>>>>>>>>>>>>>>>>>>>>            )
> >>>>>>>>>>>>>>>>>>>>>>>            .count()
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> I made up the `WindowConfig.emitFinal()` call --
> >> from
> >>> the
> >>>>>>>>>>>>>> KIP
> >>>>>>>>>>>>>>>>> it's
> >>>>>>>>>>>>>>>>>>>>>>> unclear what API you have in mind?
> `EmitFinalConfig`
> >>> has
> >>>>>>>>> not
> >>>>>>>>>>>>>>>>> public
> >>>>>>>>>>>>>>>>>>>>>>> constructor not any builder method.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> For option 3, I am not sure what you really have in
> >>> mind.
> >>>>>>>>>>>>>> Can
> >>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>> given
> >>>>>>>>>>>>>>>>>>>>>>> a concrete example (similar to above) how users
> >> would
> >>>>>>>> write
> >>>>>>>>>>>>>>> their
> >>>>>>>>>>>>>>>>>>>> code?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Did you consider to actually pass in the
> >> `EmitConfig`
> >>>>>>>> into
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> aggregation operator? In the end, it seems not to
> be
> >>>>>>>>>>>>>> property
> >>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> window definition or windowing step, but a property
> >> of
> >>>>>>>> the
> >>>>>>>>>>>>>>> actual
> >>>>>>>>>>>>>>>>>>>>> operator:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>          stream
> >>>>>>>>>>>>>>>>>>>>>>>            .groupByKey()
> >>>>>>>>>>>>>>>>>>>>>>>            .windowBy(
> >>>>>>>>>>>>>>>>>>>>>>>              TimeWindow.ofSizeNoGrace(...)
> >>>>>>>>>>>>>>>>>>>>>>>            )
> >>>>>>>>>>>>>>>>>>>>>>>            .count(EmitConfig.emitFinal())
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> The API surface area that need to be updated might
> >> be
> >>>>>>>>> larger
> >>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>>> case though...
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> On 3/14/22 9:21 PM, Hao Li wrote:
> >>>>>>>>>>>>>>>>>>>>>>>> Thanks Guozhang!
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 1. I agree `EmitConfig` is better than
> >> `WindowConfig`
> >>>>>>>> and
> >>>>>>>>>>>>>>>>> option 2
> >>>>>>>>>>>>>>>>>>>>>>> modifies
> >>>>>>>>>>>>>>>>>>>>>>>> less places. What do you think of option 1 which
> >>> doesn't
> >>>>>>>>>>>>>>> change
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> current
> >>>>>>>>>>>>>>>>>>>>>>>> `windowedBy` api but configures `EmitConfig`
> >>> separately.
> >>>>>>>>>>>>>> The
> >>>>>>>>>>>>>>>>>>> benefit
> >>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>>> option 1 is if we need to configure something else
> >>>>>>>> later,
> >>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>> don't
> >>>>>>>>>>>>>>>>>>>> need
> >>>>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>> pile them on `windowedBy` but can add separate
> >> APIs.
> >>>>>>>>>>>>>>>>>>>>>>>> 2. I added it to `Stores` mainly to conform to
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>
> >>
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java#L227-L231
> >>>>>>>>>>>>>>>>>>>>>>> .
> >>>>>>>>>>>>>>>>>>>>>>>> But We can also create an internal API to do that
> >>>>>>>> without
> >>>>>>>>>>>>>>>>> modifying
> >>>>>>>>>>>>>>>>>>>>>>>> `Stores`.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Hao
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> On Mon, Mar 14, 2022 at 7:52 PM Guozhang Wang <
> >>>>>>>>>>>>>>>>> wangg...@gmail.com>
> >>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Hello Hao,
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the proposal, I have some preference
> >>> among
> >>>>>>>> the
> >>>>>>>>>>>>>>>>> options
> >>>>>>>>>>>>>>>>>>>> here
> >>>>>>>>>>>>>>>>>>>>>>> so I
> >>>>>>>>>>>>>>>>>>>>>>>>> will copy them here:
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> I'm now thinking if it's better to not add this
> >> new
> >>>>>>>>> config
> >>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>> each
> >>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>> Window interfaces, but instead add that at the
> >>>>>>>>>>>>>>>>>>>>> KGroupedStream#windowedBy
> >>>>>>>>>>>>>>>>>>>>>>>>> function. Also instead of adding just a boolean
> >>> flag,
> >>>>>>>>>>>>>> maybe
> >>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>>>>> add a
> >>>>>>>>>>>>>>>>>>>>>>>>> Configured class like Grouped, Suppressed, etc,
> >> e.g.
> >>>>>>>>> let's
> >>>>>>>>>>>>>>> call
> >>>>>>>>>>>>>>>>>>> it a
> >>>>>>>>>>>>>>>>>>>>>>>>> Emitted which for now would just have a single
> >>>>>>>> construct
> >>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>>>>>>> Emitted.atWindowClose whose semantics is the same
> >> as
> >>>>>>>>>>>>>>> emitFinal
> >>>>>>>>>>>>>>>>> ==
> >>>>>>>>>>>>>>>>>>>>> true.
> >>>>>>>>>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>>>>>>> think the benefits are:
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 1) you do not need to modify multiple Window
> >>> classes,
> >>>>>>>> but
> >>>>>>>>>>>>>>> just
> >>>>>>>>>>>>>>>>>>>>> overload
> >>>>>>>>>>>>>>>>>>>>>>> one
> >>>>>>>>>>>>>>>>>>>>>>>>> windowedBy function with a second param. This is
> >>> less
> >>>>>>>> of
> >>>>>>>>> a
> >>>>>>>>>>>>>>>>> scope
> >>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>> now,
> >>>>>>>>>>>>>>>>>>>>>>>>> and also more extensible for any future changes.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 2) With a config interface, we maintain its
> >>>>>>>> extensibility
> >>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>> well
> >>>>>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>>>>> being
> >>>>>>>>>>>>>>>>>>>>>>>>> able to reuse this Emitted interface for other
> >>>>>>>> operators
> >>>>>>>>>>>>>> if
> >>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>> wanted
> >>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>>> expand to.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> ----------------------------
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> So in general I'm leaning towards option 2). For
> >>> that,
> >>>>>>>>>>>>>> some
> >>>>>>>>>>>>>>>>> more
> >>>>>>>>>>>>>>>>>>>>>>> detailed
> >>>>>>>>>>>>>>>>>>>>>>>>> comments:
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 1) If we want to reuse that config object for
> >> other
> >>>>>>>>>>>>>>> non-window
> >>>>>>>>>>>>>>>>>>>>> stateful
> >>>>>>>>>>>>>>>>>>>>>>>>> operations, I think naming it as `EmitConfig` is
> >>>>>>>> probably
> >>>>>>>>>>>>>>>>> better
> >>>>>>>>>>>>>>>>>>>> than
> >>>>>>>>>>>>>>>>>>>>>>>>> `WindowConfig`.
> >>>>>>>>>>>>>>>>>>>>>>>>> 2) I saw your PR (
> >>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/11892)
> >>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>>>>>>>> also proposing to add new stores into the public
> >>>>>>>> factory
> >>>>>>>>>>>>>>>>> Stores,
> >>>>>>>>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>>>>>>>>> it's
> >>>>>>>>>>>>>>>>>>>>>>>>> not included in the KIP. Is that intentional?
> >>>>>>>> Personally
> >>>>>>>>> I
> >>>>>>>>>>>>>>>>> think
> >>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>>>> although we may eventually want to add a new
> store
> >>> type
> >>>>>>>>> to
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> public
> >>>>>>>>>>>>>>>>>>>>>>> APIs,
> >>>>>>>>>>>>>>>>>>>>>>>>> for this KIP maybe we do not have to add them but
> >>> can
> >>>>>>>>>>>>>> delay
> >>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>> later
> >>>>>>>>>>>>>>>>>>>>>>> after
> >>>>>>>>>>>>>>>>>>>>>>>>> we've learned the best way to layout. LMK what do
> >>> you
> >>>>>>>>>>>>>> think?
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Mar 11, 2022 at 2:13 PM Hao Li
> >>>>>>>>>>>>>>>>> <h...@confluent.io.invalid>
> >>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> Hi Dev team,
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> I'd like to start a discussion thread on Kafka
> >>> Streams
> >>>>>>>>>>>>>>>>> KIP-825:
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> This KIP is aimed to add new APIs to support
> >>>>>>>> outputting
> >>>>>>>>>>>>>>> final
> >>>>>>>>>>>>>>>>>>>>>>> aggregated
> >>>>>>>>>>>>>>>>>>>>>>>>>> results for windowed aggregations. I listed
> >> several
> >>>>>>>>>>>>>> options
> >>>>>>>>>>>>>>>>> there
> >>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>> I'm
> >>>>>>>>>>>>>>>>>>>>>>>>>> looking forward to your feedback.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>>>>>>>> Hao
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>> Hao
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> --
> >>>>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> -- Guozhang
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> Thanks,
> >>>>>>>> Hao
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>
> >>
> >>
> >> --
> >> Thanks,
> >> Hao
> >>
> >
> >
>


-- 
Thanks,
Hao

Reply via email to