I think what John tries to say is the following:

We have `windowedBy(Windows)` that accept hopping/tumbling windows but
also custom window and we use a specific algorithm. Note, that custom
windows must "work" based on the used algorithm.

For session windows we have `windowedBy(SessionWindows)` and apply a
different algorithm. Users could pass in a custom `SessionWindows` as
long as the session algorithm works correctly for it.

For the new sliding windows, we want to use a different algorithm
compare to hopping/tumbling windows. If we let sliding window extend
`Windows`, we can decide at runtime if we need to use the
hopping/tumbling window algorithm for hopping/tumbling windows or the
new sliding window algorithm for sliding windows. However, if we get a
custom window, which algorithm do we pick now? The existing
tumbling/hopping window algorithm of the new sliding window algorithm?
Both a custom "time-window" and custom "sliding window" implement the
generic `Windows` class and thus we cannot make a decision as we don't
know the user's intent.

As a matter of fact, even if the user might not be aware of it, the
algorithm we use does already leak into the API (if a user extends
`Windows` is must work with our hopping/tumbling window algorithm and if
a user extends `SessionWindows` it must work with our session algorithm)
and it seems we need to preserve this property for sliding window.


-Matthias

On 7/22/20 4:35 PM, Sophie Blee-Goldman wrote:
> Hey John,
> 
> Just a few follow-up questions/comments about the whole Windows thing:
> 
> That's a good way of looking at things; in particular the point about
> SessionWindows
> for example requiring a Merger while other "statically enumerable" windows
> require
> only an adder seems to touch on the heart of the matter.
> 
>  It seems like what Time and Universal (and any other Windows
>> implementation) have in common is that the windows are statically
>> enumerable.
>> As a consequence, they can all rely on an aggregation maintenence algorithm
>> that involves enumerating each of the windows and updating it. That
>> also means that their DSL object (TimeWindowedKStream) doesn't need
>> "subtractors" or "mergers", but only "adders"; again, this is a consequence
>> of the fact that the windows are enumerable.
> 
> 
> Given that, I'm a bit confused why you conclude that sliding windows are
> fundamentally
> different from the "statically enumerable" windows -- sliding windows
> require only an
> adder too. I'm not sure it's a consequence of being enumerable, or that
> being enumerable
> is the fundamental property that unites all Windows (ignoring JoinWindows
> here). Yes,  it
> currently does apply to all Windows implementations, but we shouldn't
> assume that it
> *has *to be that way on the basis that it currently happens to be.
> 
> Also, the fact that they can all rely on the same aggregation algorithm
> seems like an
> implementation detail and it would be weird to force a separate/new DSL API
> just because
> under the covers we swap in a different processor.
> 
> To be fair, I don't think there's a strong reason *against* not extending
> Windows -- in the end
> it will just mean adding a new #windowedBy method and copy/pasting
> everything from
>  TimeWindowedKStream pretty much word for word. But anytime you find
> yourself
> copying over code almost exactly, there should probably be a good reason
> why :)
> 
> 
> On Wed, Jul 22, 2020 at 3:48 PM John Roesler <vvcep...@apache.org> wrote:
> 
>> Thanks Leah!
>>
>> 5) Regarding the empty windows, I'm wondering if we should simply propose
>> that the windows should not be emitted downstream of the operator or
>> visible in IQ. Then, it'll be up to the implementation to make it happen.
>> I'm
>> personally not concerned about it, since it seems like there are multiple
>> ways to accomplish this.
>>
>> Note, the discrepancy Matthias pointed out is actually a design bug. The
>> windowed aggregation (like every operation in Streams) produces a "view",
>> which then forms the basis of downstream operations. When we pass the
>> Materialized option to the operation, all we're doing is saying to
>> "materialize"
>> the view (aka, actually store the computed view) and also make it
>> queriable.
>> It would be illegal for the "queriable, materialized view" to differ in
>> any way
>> from the "view". So, it seems we must either propose to emit the empty
>> windows AND make them visible in IQ, or propose NOT to emit the empty
>> windows AND NOT make them visible in IQ.
>>
>> 7) Regarding whether we can extend TimeWindows (or Windows):
>> I've been mulling this over more. I think it's worth asking the question of
>> what these classes even mean. For example, why is SessionWindows a
>> different thing from TimeWindows and UniversalWindows (which are both
>> Windows)?
>>
>> This conversation is extra complicated because of the incomplete and
>> mis-matched class hierarchy, but we can try to look past it for now.
>>
>> It seems like what Time and Universal (and any other Windows
>> implementation) have in common is that the windows are statically
>> enumerable.
>> As a consequence, they can all rely on an aggregation maintenence algorithm
>> that involves enumerating each of the windows and updating it. That
>> also means that their DSL object (TimeWindowedKStream) doesn't need
>> "subtractors" or "mergers", but only "adders"; again, this is a consequence
>> of the fact that the windows are enumerable.
>>
>> In contrast, session windows are data driven, so they are not statically
>> enumerable. Their algorithm has to rely on scans, and to do the scans,
>> it needs to know the "inactivity gap", which needs to be part of the window
>> definition. Likewise, session windows have the property that they need
>> to be merged, so their DSL object also requires mergers.
>>
>> It really seems like your new window definition doesn't fit into either
>> category. It uses a different algorithm, which relies on scans, but it is
>> also fixed in size, so it doesn't need mergers. In this situation, it seems
>> like the safe bet is to just create SessionWindows with no interface and
>> add a separate set of DSL operations and objects. It's a little extra code,
>> but it seems to keep everything tidier and more comprehensible, both
>> for us and for users.
>>
>> What do you think?
>> -John
>>
>>
>>
>> On Wed, Jul 22, 2020, at 10:30, Leah Thomas wrote:
>>> Hi Matthias,
>>>
>>> Thanks for the suggestions, I've updated the KIP and child page
>> accordingly
>>> and addressed some below.
>>>
>>> 1) For the mandatory grace period, we should use a static builder method
>>>> that take two parameters.
>>>>
>>>
>>>  That makes sense, I've changed that in the public API.
>>>
>>> Btw: this implementation actually raises an issue for IQ: those empty
>>>> windows would be returned.
>>>
>>>
>>> This is a great point, with the current implementation plan empty windows
>>> would be returned. I think creating a second window store would
>> definitely
>>> work, but there would be more overhead in having two stores and switching
>>> windows between the stores, as well as doing scans in both stores to find
>>> existing windows. There might be a way to do avoid emitting empty windows
>>> without creating a second window store, I'll look more into it.
>>>
>>> Cheers,
>>> Leah
>>>
>>> On Tue, Jul 21, 2020 at 1:25 PM Matthias J. Sax <mj...@apache.org>
>> wrote:
>>>
>>>> Thanks for updating the KIP.
>>>>
>>>> Couple of follow up comments:
>>>>
>>>> 1) For the mandatory grace period, we should use a static builder
>> method
>>>> that take two parameters. This provides a better API as user cannot
>>>> forget to set the grace period. Throwing a runtime exception seems not
>>>> to be the best way to handle this case.
>>>>
>>>>
>>>>
>>>> 2) In Fig.2 you list 10 hopping windows. I believe it should actually
>> be
>>>> more? There first hopping window would be [-6,-4[ and the last one
>> would
>>>> be from [19,29[ -- hence, the cost saving are actually much higher.
>>>>
>>>>
>>>>
>>>> 3a) IQ: you are saying that the user need to compute the start time as
>>>>
>>>>> windowSize+the time they're looking at
>>>>
>>>> Should this be "targetTime - windowSize" instead?
>>>>
>>>>
>>>>
>>>> 3b) IQ: in you example you say "window size of 10 minutes" with an
>>>> incident at 9:15.
>>>>
>>>>> they're looking for a window with the start time of 8:15.
>>>>
>>>> The example does not seem to add up?
>>>>
>>>>
>>>>
>>>> 4) For "Processing Windows": you describe a three step approach: I just
>>>> want to point out, that step (1) is not necessary for each input
>> record,
>>>> because timestamps are not guaranteed to be unique and thus a previous
>>>> record with the same key and timestamp might have create the windows
>>>> already.
>>>>
>>>> Nit: I am also not exactly sure what you mean by step (3) as you use
>> the
>>>> word "send". I guess you mean "put"?
>>>>
>>>> It seem there are actually more details in the sub-page:
>>>>
>>>>> A new record for SlidingWindows will always create two new windows.
>> If
>>>> either of those windows already exist in the windows store, their
>>>> aggregation will simply be updated to include the new record, but no
>>>> duplicate window will be added to the WindowStore.
>>>>
>>>> However, the first and second sentence contradict each other a little
>>>> bit. I think the first sentence is not correct.
>>>>
>>>> Nit:
>>>>
>>>>> For in-order records, the left window will always be empty.
>>>>
>>>> This should be "right window" ?
>>>>
>>>>
>>>>
>>>> 5) "Emitting Results": it might be worth to point out, that a
>>>> second/future window of a new record is create with no records, and
>>>> thus, even if it's initialized it won't be emitted. Only if a
>>>> consecutive record falls into the window, the window would be updates
>>>> and the window result (for a window content of one record) would be
>> sent
>>>> downstream.
>>>>
>>>> Again, the sub-page contains this details. Might still be worth to add
>>>> to the top level page, too.
>>>>
>>>> Btw: this implementation actually raises an issue for IQ: those empty
>>>> windows would be returned. Thus I am wondering if we need to use two
>>>> stores internally? One store for actual windows and one store for empty
>>>> windows? If an empty window is updated, it's move to the other store?
>>>> For IQ, we only allow to query the non-empty-window store?
>>>>
>>>>
>>>>
>>>> 6) On the sub-page:
>>>>
>>>>> The left window of in-order records and both windows for out-of-order
>>>> records need to be updated with the values of records that have already
>>>> been processed.
>>>>
>>>> Why "both windows for out-of-order records"? IMHO, we don't know how
>>>> many existing windows needs to be updated when processing an
>>>> out-of-order record. Of course, an out-of-order record could not fall
>>>> into any existing window but create two new windows, too.
>>>>
>>>>>  Because each record creates one new window that includes itself and
>> one
>>>> window that does not
>>>>
>>>> As state above, this does not seem to hold. I understand why you mean,
>>>> but it would be good to be exact.
>>>>
>>>> Figure 2: You use the term "late" but you mean "out-of-order" I guess
>> --
>>>> a record is _late_ if it's not processed any longer as the grace period
>>>> passed already.
>>>>
>>>> Figure 2: "Late" should be out-or-order. The example text say a window
>>>> [16,26] should be created but the figure shows the green window as
>> [15,20].
>>>>
>>>> About the blue window: maybe add not that the blue window contains the
>>>> aggregate we need for the green window, _before_ the new record `a` is
>>>> added to the blue window.
>>>>
>>>>
>>>>
>>>> 7) I am not really happy to extend TimeWindows and I think the argument
>>>> about JoinWindows is not the best (IMHO, JoinWindows do it already
>> wrong
>>>> and we just repeat the same mistake). However, it seems our window
>>>> hierarchy is "broken" already and it might be out of scope for this KIP
>>>> to fix it. Hence, I am ok that we bite the bullet for now and clean it
>>>> up later.
>>>>
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>> On 7/20/20 5:18 PM, Guozhang Wang wrote:
>>>>> Hi Leah,
>>>>>
>>>>> Thanks for the updated KIP. I agree that extending SlidingWindows
>> from
>>>>> Windows is fine for the sake of not introducing more public APIs (and
>>>> their
>>>>> internal xxxImpl classes), and its cons is small enough to tolerate
>> to
>>>> me.
>>>>>
>>>>>
>>>>> Guozhang
>>>>>
>>>>>
>>>>> On Mon, Jul 20, 2020 at 1:49 PM Leah Thomas <ltho...@confluent.io>
>>>> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> Thanks for the feedback on the KIP. I've updated the KIP page
>>>>>> <
>>>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
>>>>>>>
>>>>>> to address these points and have created a child page
>>>>>> <
>>>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/Aggregation+details+for+KIP-450
>>>>>>>
>>>>>> to go more in depth on certain implementation details.
>>>>>>
>>>>>> *Grace Period:*
>>>>>> I think Sophie raises a good point that the default grace period of
>> 24
>>>>>> hours is often too long and was chosen when retention time and grace
>>>> period
>>>>>> were the same. For SlidingWindows, I propose we make the grace
>> period
>>>>>> mandatory. To keep formatting consistent with other types of
>> windows,
>>>> grace
>>>>>> period won't be an additional parameter in the #of method, but will
>>>> still
>>>>>> look like it does in other use cases:
>>>>>> .windowedBy(SlidingWindows.of(twentySeconds).grace(fiftySeconds). If
>>>> grace
>>>>>> period isn't properly initialized, an error will be thrown through
>> the
>>>>>> process method.
>>>>>>
>>>>>> *Storage Layer + Aggregation:*
>>>>>> SlidingWindows will use a WindowStore because computation can be
>> done
>>>> with
>>>>>> the information stored in a WindowStore (window timestamp and
>> value).
>>>> Using
>>>>>> the WindowStore also simplifies computation as SlidingWindows can
>>>> leverage
>>>>>> existing processes. Because we are using a WindowStore, the
>> aggregation
>>>>>> process will be similar to that of a hopping window. As records
>> come in
>>>>>> their value is added to the aggregation that already exists,
>> following
>>>> the
>>>>>> same procedure as hopping windows. The aggregation difference
>> between
>>>>>> SlidingWindows and HoppingWindows comes in creating new windows for
>> a
>>>>>> SlidingWindow, where you need to find the existing records that
>> belong
>>>> to
>>>>>> the new window. This computation is similar to the aggregation in
>>>>>> SessionWindows and requires a scan to the WindowStore to find the
>> window
>>>>>> with the aggregation needed, which will always be pre-computed. The
>> scan
>>>>>> requires creating an iterator, but should have minimal performance
>>>> effects
>>>>>> as this strategy is already implemented in SessionWindows. More
>> details
>>>> on
>>>>>> finding the aggregation that needs to be put in a new window can be
>>>> found
>>>>>> on the implementation page
>>>>>> <
>>>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/Aggregation+details+for+KIP-450
>>>>>>>
>>>>>> .
>>>>>>
>>>>>> *Extending Windows<TimeWindow>, Windows<Window> or nothing*
>>>>>> Because SlidingWindows are still defined by a windowSize (whereas
>>>>>> SessionWindows are defined purely by data), I think it makes sense
>> to
>>>>>> leverage the existing Window processes instead of creating a new
>> store
>>>> type
>>>>>> that would be very similar to the WindowStore. While it's true that
>> the
>>>>>> #windowsFor method isn't necessary for SlidingWindows, JoinWindows
>> also
>>>>>> extends Windows<Window> and throws an UnsupportedOperationException
>> in
>>>> the
>>>>>> #windowsFor method, which is what SlidingWindows can do. The
>> difference
>>>>>> between extending Windows<TimeWindow> or Windows<Window> is
>> minimal, as
>>>>>> both are ways to pass window parameters. Extending
>> Windows<TimeWindow>
>>>> will
>>>>>> give us more leverage in utilizing existing processes.
>>>>>>
>>>>>> *Emit Strategy*
>>>>>> I would argue that emitting for every update is still the best way
>> to go
>>>>>> for SlidingWindows because it mimics the other types of windows, and
>>>>>> suppression can be leveraged to limit what SlidingWindows emits.
>> While
>>>> some
>>>>>> users may only want to see the last value, others may want to see
>> more,
>>>> and
>>>>>> leaving the emit strategy to emit partial results allows both users
>> to
>>>>>> access what they want.
>>>>>>
>>>>>> *Additional Features*
>>>>>> Supporting sliding windows inherently, and shifting inefficient
>> hopping
>>>>>> windows to sliding windows, is an interesting idea and could be
>> built on
>>>>>> top of SlidingWindows when they are finished, but right now seems
>> out of
>>>>>> scope for the needs of this KIP. Similarly, including a
>> `subtraction`
>>>>>> feature could have performance improvements, but doesn't seem
>> necessary
>>>> for
>>>>>> the implementation of this KIP.
>>>>>>
>>>>>> Let me know what you think of the updates,
>>>>>>
>>>>>> Leah
>>>>>>
>>>>>> On Thu, Jul 16, 2020 at 11:57 AM John Roesler <vvcep...@apache.org>
>>>> wrote:
>>>>>>
>>>>>>> Hello all,
>>>>>>>
>>>>>>> Thanks for the KIP, Leah!
>>>>>>>
>>>>>>> Regarding (1): I'd go farther actually. Making Windows an abstract
>>>>>>> class was a mistake from the beginning that led to us not being
>>>>>>> able to fix a very confusing situation for users around retention
>>>> times,
>>>>>>> final results emitting, etc. Thus, I would not suggest extending
>>>>>>> TimeWindows for sure, but would also not suggest extending Windows.
>>>>>>>
>>>>>>> The very simplest thing to do is follow the example of
>> SessionWindows,
>>>>>>> which is just a completely self-contained class. If we don't mess
>> with
>>>>>>> class inheritance, we won't ever have any of the problems related
>> to
>>>>>>> class inheritance. This is my preferred solution.
>>>>>>>
>>>>>>> Still, Sliding windows has a lot in common with TimeWindows and
>> other
>>>>>>> fixed-size windows, namely that the windows are fixed in size. If
>> we
>>>> want
>>>>>>> to preserve the current two-part windowing API in which you can
>> window
>>>>>>> by either "fixed" or "data driven" modes, I'd suggest we avoid
>>>> increasing
>>>>>>> the blast radius of Windows by taking the opportunity to replace it
>>>> with
>>>>>>> a proper interface and implement that interface instead.
>>>>>>>
>>>>>>> For example:
>>>>>>> https://github.com/apache/kafka/pull/9031
>>>>>>>
>>>>>>> Then, SlidingWindows would just implement FixedSizeWindowDefinition
>>>>>>>
>>>>>>> ======
>>>>>>>
>>>>>>> Regarding (2), it seems more straightforward as a user of Streams
>>>>>>> to just have one mental model. _All_ of our aggregation operations
>>>>>>> follow an eager emission model, in which we just emit an update
>>>> whenever
>>>>>>> an update is available. We already provided Suppression to
>> explicitly
>>>>>> apply
>>>>>>> different update semantics in the case it's required. Why should we
>>>>>> define
>>>>>>> a snowflake operation with completely different semantics from
>>>> everything
>>>>>>> else? I.e., systems are generally easier to use when they follow a
>> few
>>>>>>> simple, composable rules than when they have a lot of different,
>>>> specific
>>>>>>> rules.
>>>>>>>
>>>>>>>
>>>>>>> ======
>>>>>>>
>>>>>>> New point: (4):
>>>>>>> It would be nice to include some examples of user code that would
>> use
>>>> the
>>>>>>> new API, which should include:
>>>>>>> 1. using the DSL with the sliding window definition
>>>>>>> 2. accessing the stored results of a sliding window aggregation
>> via IQ
>>>>>>> 3. defining a custom processor to access sliding windows in a store
>>>>>>>
>>>>>>> It generally helps reviewers wrap their heads around the proposal,
>> as
>>>>>> well
>>>>>>> as shaking out any design issues that would otherwise only come up
>>>> during
>>>>>>> implementation/testing/review.
>>>>>>>
>>>>>>> Thanks again for the awesome proposal!
>>>>>>> -John
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jul 14, 2020, at 12:31, Guozhang Wang wrote:
>>>>>>>> Hello Leah,
>>>>>>>>
>>>>>>>> Thanks for the nice written KIP. A few thoughts:
>>>>>>>>
>>>>>>>> 1) I echo the other reviewer's comments regarding the typing: why
>>>>>>> extending
>>>>>>>> TimeWindow instead of just extending Window?
>>>>>>>>
>>>>>>>> 2) I also feel that emitting policy for this type of windowing
>>>>>>> aggregation
>>>>>>>> may be different from the existing ones. Existing emitting policy
>> is
>>>>>> very
>>>>>>>> simple: emit every time when window get updates, and emit every
>> time
>>>> on
>>>>>>>> out-of-ordering data within grace period, this is because for
>>>>>>> time-windows
>>>>>>>> the window close time is strictly depend on the window start time
>>>> which
>>>>>>> is
>>>>>>>> fixed, while for session-windows although the window open/close
>> time
>>>> is
>>>>>>>> also data-dependent it is relatively infrequent compared to the
>>>>>>>> sliding-windows. For this KIP, since each new data would cause a
>>>>>>>> new sliding-window, the num. windows maintained logically could be
>>>> much
>>>>>>>> larger and hence emitting on each update may be too aggressive.
>>>>>>>>
>>>>>>>> 3) Although KIP itself should be focusing on user face
>> interfaces, I'd
>>>>>>>> suggest we create a children page of KIP-450 discussing about its
>>>>>>>> implementations as well, since some of that may drive the
>> interface
>>>>>>> design.
>>>>>>>> E.g. personally I think having a combiner interface in addition to
>>>>>>>> aggregator would be useful but that's based on my 2cents about the
>>>>>>>> implementation design (I once created a child page describing it:
>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/Optimize+Windowed+Aggregation
>>>>>>>> ).
>>>>>>>>
>>>>>>>>
>>>>>>>> Guozhang
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Jul 14, 2020 at 5:31 AM Bruno Cadonna <br...@confluent.io
>>>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Leah,
>>>>>>>>>
>>>>>>>>> Thank you for the KIP!
>>>>>>>>>
>>>>>>>>> Here is my feedback:
>>>>>>>>>
>>>>>>>>> 1. The KIP would benefit from some code examples that show how
>> to use
>>>>>>>>> sliding windows in aggregations.
>>>>>>>>>
>>>>>>>>> 2. The different sliding windows in Figure 1 and 2 are really
>> hard to
>>>>>>>>> distinguish. Could you please try to make them graphically better
>>>>>>>>> distinguishable? You could try to draw the frames of consecutive
>>>>>>>>> windows shifted to each other.
>>>>>>>>>
>>>>>>>>> 3. I agree with Matthias, that extending Windows<TimeWindow>
>> does not
>>>>>>>>> seem to be the best approach. What would be the result of
>>>>>>>>> windowsFor()?
>>>>>>>>>
>>>>>>>>> 4. In the section "Public Interfaces" you should remove
>>>>>> implementation
>>>>>>>>> details like private constructors and private fields.
>>>>>>>>>
>>>>>>>>> 5. Do we need a new store interface or can we use WindowStore?
>> Some
>>>>>>>>> words about that would be informative.
>>>>>>>>>
>>>>>>>>> 6. @Matthias, if the subtrator is not strictly needed, I would
>> skip
>>>>>> it
>>>>>>>>> for now and add it later.
>>>>>>>>>
>>>>>>>>> 7. I also agree that having a section that describes how to
>> handle
>>>>>>>>> out-of-order records would be good to understand what is still
>>>>>> missing
>>>>>>>>> and what we can reuse.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Bruno
>>>>>>>>>
>>>>>>>>> On Sat, Jul 11, 2020 at 9:16 PM Matthias J. Sax <
>> mj...@apache.org>
>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Leah,
>>>>>>>>>>
>>>>>>>>>> thanks for your update. However, it does not completely answer
>> my
>>>>>>>>> question.
>>>>>>>>>>
>>>>>>>>>> In our current window implementations, we emit a window result
>>>>>> update
>>>>>>>>>> record (ie, early/partial result) for each input record. When an
>>>>>>>>>> out-of-order record arrives, we just update to corresponding old
>>>>>>> window
>>>>>>>>>> and emit another update.
>>>>>>>>>>
>>>>>>>>>> It's unclear from the KIP if you propose the same emit
>> strategy? --
>>>>>>> For
>>>>>>>>>> sliding windows it might be worth to consider to use a different
>>>>>> emit
>>>>>>>>>> strategy and only support emitting the final result only (ie,
>> after
>>>>>>> the
>>>>>>>>>> grace period passed)?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Boyang, also raises a good point that relates to my point from
>>>>>> above
>>>>>>>>>> about pre-aggregations and storage layout. Our current time
>> windows
>>>>>>> are
>>>>>>>>>> all pre-aggregated and stored in parallel. We can also lookup
>>>>>> windows
>>>>>>>>>> efficiently, as we can compute the windowed-key given the input
>>>>>>> record
>>>>>>>>>> key and timestamp based on the window definition.
>>>>>>>>>>
>>>>>>>>>> However, for sliding windows, window boundaries are data
>> dependent
>>>>>>> and
>>>>>>>>>> thus we cannot compute them upfront. Thus, how can we "find"
>>>>>> existing
>>>>>>>>>> window efficiently? Furthermore, out-of-order data would create
>> new
>>>>>>>>>> windows in the past and we need to be able to handle this case.
>>>>>>>>>>
>>>>>>>>>> Thus, to handle out-of-order data correctly, we need to store
>> all
>>>>>> raw
>>>>>>>>>> input events. Additionally, we could also store pre-aggregated
>>>>>>> results
>>>>>>>>>> if we thinks it's benfitial. -- If we apply "emit only final
>>>>>> results"
>>>>>>>>>> strategy, storing pre-aggregated result would not be necessary
>>>>>>> though.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Btw: for sliding windows it might also be useful to consider
>>>>>> allowing
>>>>>>>>>> users to supply a `Subtractor` -- this subtractor could be
>> applied
>>>>>> on
>>>>>>>>>> the current window result (in case we store it) if a record
>> drops
>>>>>>> out of
>>>>>>>>>> the window. Of course, not all aggregation functions are
>>>>>> subtractable
>>>>>>>>>> and we can consider this as a follow up task, too, and not
>> include
>>>>>> in
>>>>>>>>>> this KIP for now. Thoughts?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I was also thinking about the type hierarchy. I am not sure if
>>>>>>> extending
>>>>>>>>>> TimeWindow is the best approach? For TimeWindows, we can
>>>>>> pre-compute
>>>>>>>>>> window boundaries (cf `windowsFor()`) while for a sliding window
>>>>>> the
>>>>>>>>>> boundaries are data dependent. Session windows are also data
>>>>>>> dependent
>>>>>>>>>> and thus they don't inherit from TimeWindow (Maybe check out the
>>>>>> KIP
>>>>>>>>>> that added session windows? It could provides some good
>> insights.)
>>>>>>> -- I
>>>>>>>>>> believe the same rational applies to sliding windows?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> -Matthias
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 7/10/20 12:47 PM, Boyang Chen wrote:
>>>>>>>>>>> Thanks Leah and Sophie for the KIP.
>>>>>>>>>>>
>>>>>>>>>>> 1. I'm a bit surprised that we don't have an advance time.
>> Could
>>>>>> we
>>>>>>>>>>> elaborate how the storage layer is structured?
>>>>>>>>>>>
>>>>>>>>>>> 2. IIUC, there will be extra cost in terms of fetching
>>>>>> aggregation
>>>>>>>>> results,
>>>>>>>>>>> since we couldn't pre-aggregate until the user asks for it.
>> Would
>>>>>>> be
>>>>>>>>> good
>>>>>>>>>>> to also discuss it.
>>>>>>>>>>>
>>>>>>>>>>> 3. We haven't discussed the possibility of supporting sliding
>>>>>>> windows
>>>>>>>>>>> inherently. For a user who actually uses a hopping window,
>>>>>> Streams
>>>>>>>>> could
>>>>>>>>>>> detect such an inefficiency doing a window_size/advance_time
>>>>>> ratio
>>>>>>> to
>>>>>>>>> reach
>>>>>>>>>>> a conclusion on whether the write amplification is too high
>>>>>>> compared
>>>>>>>>> with
>>>>>>>>>>> some configured threshold. The benefit of doing so is that
>>>>>> existing
>>>>>>>>> Streams
>>>>>>>>>>> users don't need to change their code, learn a new API, but
>> only
>>>>>> to
>>>>>>>>> upgrade
>>>>>>>>>>> Streams library to get benefits for their inefficient hopping
>>>>>>> window
>>>>>>>>>>> implementation. There might be some compatibility issues for
>>>>>> sure,
>>>>>>> but
>>>>>>>>>>> worth listing them out for trade-off.
>>>>>>>>>>>
>>>>>>>>>>> Boyang
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Jul 10, 2020 at 12:40 PM Leah Thomas <
>>>>>> ltho...@confluent.io
>>>>>>>>
>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hey Matthias,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for pointing that out. I added the following to the
>>>>>> Propose
>>>>>>>>> Changes
>>>>>>>>>>>> section of the KIP:
>>>>>>>>>>>>
>>>>>>>>>>>> "Records that come out of order will be processed the same way
>>>>>> as
>>>>>>>>> in-order
>>>>>>>>>>>> records, as long as they fall within the grace period. Any new
>>>>>>> windows
>>>>>>>>>>>> created by the late record will still be created, and the
>>>>>> existing
>>>>>>>>> windows
>>>>>>>>>>>> that are changed by the late record will be updated. Any
>> record
>>>>>>> that
>>>>>>>>> falls
>>>>>>>>>>>> outside of the grace period (either user defined or default)
>>>>>> will
>>>>>>> be
>>>>>>>>>>>> discarded. "
>>>>>>>>>>>>
>>>>>>>>>>>> All the best,
>>>>>>>>>>>> Leah
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Jul 9, 2020 at 9:47 PM Matthias J. Sax <
>>>>>> mj...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Leah,
>>>>>>>>>>>>>
>>>>>>>>>>>>> thanks a lot for the KIP. Very well written.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The KIP does not talk about the handling of out-of-order data
>>>>>>> though.
>>>>>>>>>>>>> How do you propose to address this?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 7/8/20 5:33 PM, Leah Thomas wrote:
>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>> I'd like to kick-off the discussion for KIP-450, adding
>>>>>> sliding
>>>>>>>>> window
>>>>>>>>>>>>>> aggregation support to Kafka Streams.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Let me know what you think,
>>>>>>>>>>>>>> Leah
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> -- Guozhang
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to