Good call out, thanks Matthias! I've updated the example image in the KIP and called out specifically that window ends are exclusive in the corresponding javadocs.
- Almog On Thu, Feb 20, 2025 at 5:45 PM Matthias J. Sax <mj...@apache.org> wrote: > Hi, > > I was just re-reading to KIP in order to vote and notices that the > example seems not to be correct? > > If we have a window of size 10, and bounds [0,10), [11,20), [21,30) the > lower bound would be inclusive, and the upper bound would be exclusive. > At least this is how we do it for `TimeWindows`, and I think it would > make sense to adopt the same semantics for BatchWindows`? > > Thus, the even at t=10 would not go into the first window, but the > second, and similar for t=20 and t=30. > > Overall, its seems to be a gap in the KIP that we missed, that the > bounds are not explicitly defined and called out. Might be worth to add > a short sentence/paragraph about it. > > > Thoughts? > > > -Matthias > > On 2/6/25 4:49 PM, Matthias J. Sax wrote: > > Sounds good. Thanks for clarifying. > > > > > > -Matthias > > > > On 2/6/25 3:19 PM, Almog Gavra wrote: > >> Good call on the backwards compatibility - updated the KIP. > >> > >> Re: the grace period for BatchWindows, I think zero makes sense (and > also > >> makes implementing things a lot easier). In my mental model, we still > >> drop > >> late records that come in after the window closes, they just never > happen > >> because we use the current stream time when computing which window it > >> should be in instead of the event time. > >> > >> If we do this the implementation is easier because: (a) we don't have to > >> change the way retention for stored windows work and special case that > >> for > >> BatchWindows (retention is just window size + 0 grace) and (b) we can > >> actually just reuse the KStreamWindowAggregateProcessor without changing > >> anything else. > >> > >> LMK if that makes sense! > >> > >> On Thu, Feb 6, 2025 at 12:12 PM Matthias J. Sax <mj...@apache.org> > wrote: > >> > >>> Hit "reply" too early. Just re-read the KIP. > >>> > >>> For `Windows#windowsFor(...)`, even if not intended to be implement by > >>> users, it's strictly public API. Thus, we cannot just change the > method, > >>> but would need to keep the existing method and deprecate it, and add a > >>> new overload with a default impl that calls the exiting one. > >>> > >>> For `BatchedWindows#gracePeriod` I am wondering if `return 0;` does > make > >>> sense? In the end, zero would still mean "drop late record and not put > >>> them into the window", but that is exactly what we want to do. Thus, > >>> IMHO, the new BatchedWindows don't have / don't need the notion of a > >>> "grace period" to begin with, and we can just throw an > >>> "UnsupportedOperationException" from this method? > >>> > >>> The underlying implementation of the "BatchWindowAggregationProcessor" > >>> should never need to call `gracePeriod` anyway. Or do I miss something? > >>> > >>> > >>> -Matthias > >>> > >>> > >>> On 2/6/25 12:01 PM, Matthias J. Sax wrote: > >>>> BatchWindows works for me. > >>>> > >>>> > >>>> On 2/6/25 7:34 AM, Almog Gavra wrote: > >>>>> Happy to name it BatchWindows. Will give some people time to chime in > >>> and > >>>>> then change the name. > >>>>> > >>>>> - Almog > >>>>> > >>>>> On Tue, Feb 4, 2025 at 11:10 PM Sophie Blee-Goldman > >>>>> <sop...@responsive.dev> > >>>>> wrote: > >>>>> > >>>>>> One minor suggestion: use BatchWindows instead of BatchedWindows. > The > >>>>>> version without the "ed" matches up with the established naming > >>>>>> pattern and > >>>>>> grammar used by other Windows classes: eg TimeWindows, > >>>>>> SessionWindows, > >>>>>> SlidingWindows > >>>>>> > >>>>>> Not a big deal though, won't redact my +1 on the voting thread if > you > >>>>>> prefer to keep it as BatchedWindows > >>>>>> > >>>>>> On Tue, Feb 4, 2025 at 10:51 AM Almog Gavra <almog.ga...@gmail.com> > >>>>>> wrote: > >>>>>> > >>>>>>> Thanks for the discussion everyone! I've updated the Wiki with the > >>>>>>> following changes: > >>>>>>> > >>>>>>> - Renamed to BatchedWindows > >>>>>>> - Add a note in rejected alternatives about more general purpose > >>>>>>> (micro-)batching functionality since the scope of that is much > >>>>>>> wider. > >>>>>>> > >>>>>>> Since it looks like we've stabilized the discussion I'm going to go > >>>>>>> ahead > >>>>>>> and open up the vote! Definitely feel free to take another look and > >>>>>>> leave > >>>>>>> any additional thoughts. > >>>>>>> > >>>>>>> > >>>>>>> On Thu, Jan 30, 2025 at 12:28 PM Matthias J. Sax <mj...@apache.org > > > >>>>>> wrote: > >>>>>>> > >>>>>>>>> batch window with max N records, > >>>>>>>>>> and then also specifying a BufferConfig.maxRecords() > >>>>>>>> > >>>>>>>> That's actually two different and independent dimensions. "N > >>>>>>>> records" > >>>>>>>> would be the number of records in the window, but `maxRecords` > >>>>>>>> is the > >>>>>>>> number of unique keys/row in the buffer before it's flushed. > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>>> the current proposal > >>>>>>>>> that only includes stream time based batches is no worse than > >>>>>> existing > >>>>>>>>> windows which also have that potential indeterminism in this > >>> scenario > >>>>>>>>> except for that it's more likely since grace period is always 0 > >>>>>>>> > >>>>>>>> Guess this becomes a philosophical question. For TimeWindows, > >>>>>>>> even if > >>>>>>>> grace=0, it's still deterministic into which window each record > >>>>>>>> goes -- > >>>>>>>> of course, the "cut off point" when we start to drop late > >>>>>>>> records is > >>>>>>>> subject to "noise" due to repartitioning, as stream-time advances > >>> "non > >>>>>>>> deterministically". > >>>>>>>> > >>>>>>>> So if we drop different records on a "re-run" on the same input > >>>>>>>> data, > >>>>>> we > >>>>>>>> might still get different windows (as different records would have > >>>>>>>> been > >>>>>>>> dropped). But that is exactly why we have a grace-period to begin > >>> with > >>>>>>>> (to avoid it, and to make a re-run deterministic -- I think of a > >>>>>>>> "too > >>>>>>>> short" grace period as a misconfiguration -- or an informed > >>>>>>>> decision to > >>>>>>>> sacrifice determinism for something else)... > >>>>>>>> > >>>>>>>> And as the new window type, by definition, does not need/want a > >>>>>>>> grace-period, IMHO, the new window type would be "worse" (for the > >>> lack > >>>>>>>> of a better word...); I don't think its really worse, it just > >>>>>> inherently > >>>>>>>> non-deterministic, and that's fine. > >>>>>>>> > >>>>>>>> Guess we are overall on the same page and share common > >>>>>>>> understanding of > >>>>>>>> the trade-off. So I think we are good :) > >>>>>>>> > >>>>>>>> > >>>>>>>> -Matthias > >>>>>>>> > >>>>>>>> > >>>>>>>> On 1/30/25 9:07 AM, Almog Gavra wrote: > >>>>>>>>> I'm not opposed to "BatchedWindows" - I think I like that the > most > >>> so > >>>>>>>> far. > >>>>>>>>> I'll let that sit on the discussion thread for a while, and > change > >>>>>> the > >>>>>>>> KIP > >>>>>>>>> to match if no concerns. > >>>>>>>>> > >>>>>>>>>> What I don't understand is, why the relationship to > >>>>>>>>> suppress()/emitStrategy() is relevant? Can you elaborate a little > >>>>>> bit? > >>>>>>>>> > >>>>>>>>> The existing proposal has no impact on suppression/ > >>>>>>>>> emitStrategy, but > >>>>>>> I'm > >>>>>>>>> uncertain it's unrelated if you start to introduce window > >>> constraints > >>>>>>>> that > >>>>>>>>> aren't just stream time. Imagine having a batch window with max N > >>>>>>>> records, > >>>>>>>>> and then also specifying a BufferConfig.maxRecords() > >>>>>>>>> suppression... > >>>>>>> what > >>>>>>>>> happens in that case? With the current scope of the KIP, we don't > >>>>>> need > >>>>>>> to > >>>>>>>>> worry about any of that so it's pretty well contained. > >>>>>>>>> > >>>>>>>>>> if data is auto-repartitioned before a `BatchWindows` step, > >>>>>>>>> repartitioning introduces non-deterministic order in the > >>>>>>>>> repartition > >>>>>>>> topic > >>>>>>>>> > >>>>>>>>> This is a good point, I did not think about it (my original > >>>>>>>>> comment > >>>>>> was > >>>>>>>>> specifically about wall clock time). I guess, though, the current > >>>>>>>> proposal > >>>>>>>>> that only includes stream time based batches is no worse than > >>>>>> existing > >>>>>>>>> windows which also have that potential indeterminism in this > >>> scenario > >>>>>>>>> except for that it's more likely since grace period is always 0. > >>>>>>>>> > >>>>>>>>>> I don't see any relationship to EOS or ALOS. Can you explain > what > >>>>>> you > >>>>>>>>> mean? > >>>>>>>>> > >>>>>>>>> Hmm, I'm not totally sure what I was thinking... we can ignore > >>>>>>>>> that > >>>>>>> part. > >>>>>>>>> > >>>>>>>>> - Almog > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Tue, Jan 28, 2025 at 3:40 PM Matthias J. Sax < > mj...@apache.org> > >>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Interesting thoughts. So maybe we could go with `BatchWindows` > >>>>>>>>>> as a > >>>>>>>>>> name? Again, only spit-balling... > >>>>>>>>>> > >>>>>>>>>> If we really put "(micro-)batching" in the center of this idea, > I > >>>>>>> think > >>>>>>>>>> both count-based and time-based (and time could actually be > >>>>>>>>>> either > >>>>>>>>>> stream-time or wall-clock-time), or any combination of these > >>>>>>> dimensions > >>>>>>>>>> could all make sense. > >>>>>>>>>> > >>>>>>>>>> Having said this: I don't think we need to support all > dimensions > >>>>>>>>>> initially, but if we add something like `BatchWindows` we can > >>> extent > >>>>>>> the > >>>>>>>>>> supported specification incrementally. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> What I don't understand is, why the relationship to > >>>>>>>>>> suppress()/emitStrategy() is relevant? Can you elaborate a > little > >>>>>> bit? > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>>> how to make it deterministic with regards to time > >>>>>>>>>> > >>>>>>>>>> I think, in any case, we are leaving determinism-land (at > >>>>>>>>>> least to > >>>>>>> some > >>>>>>>>>> extent). I guess, as long as the `BatchWindows` are applied > >>> directly > >>>>>>> to > >>>>>>>>>> an input topic, we get determinism for both stream-time size, as > >>>>>> well > >>>>>>> as > >>>>>>>>>> count-size windows (not for wall-clock0time, of course). > >>>>>>>>>> > >>>>>>>>>> However, if data is auto-repartitioned before a `BatchWindows` > >>> step, > >>>>>>>>>> repartitioning introduces non-deterministic order in the > >>> repartition > >>>>>>>>>> topic due to interleaved writes, and thus, also stream-time > based > >>>>>>>>>> windows would become non-deterministic (as there is no grace- > >>>>>>>>>> period > >>>>>> by > >>>>>>>>>> design to "fix" the non-deterministic order, in contrast to full > >>>>>>>>>> event-time based windows we support so far). > >>>>>>>>>> > >>>>>>>>>> So I don't think that is an actual difference between > stream-time > >>> or > >>>>>>>>>> count-based `BatchWindows` with regard to determinism. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>>> which is important for EOS and even ALOS > >>>>>>>>>> > >>>>>>>>>> I don't see any relationship to EOS or ALOS. Can you explain > what > >>>>>> you > >>>>>>>> mean? > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> -Matthias > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On 1/28/25 10:43 AM, Almog Gavra wrote: > >>>>>>>>>>> Thanks for the feedback Lucas and Bruno! > >>>>>>>>>>> > >>>>>>>>>>> L0. "Given the motivation section, it sounds we actually want > >>>>>>> something > >>>>>>>>>>> that I'd call "batching" rather than "windowing"." > >>>>>>>>>>> > >>>>>>>>>>> You are right here, and I think ultimately introducing more > >>>>>> flexible > >>>>>>>> and > >>>>>>>>>>> controlled micro-batching will be useful for Kafka Streams, but > >>>>>> that > >>>>>>>>>>> expands the scope a little more than I'd want. We'd have to > >>> rethink > >>>>>>> the > >>>>>>>>>>> implications of suppression as well as emitting and how to > >>>>>>>>>>> make it > >>>>>>>>>>> deterministic with regards to time (which is important for > >>>>>>>>>>> EOS and > >>>>>>> even > >>>>>>>>>>> ALOS). The proposal here is a halfway point that I think is > easy > >>>>>>> enough > >>>>>>>>>> to > >>>>>>>>>>> reason about within the existing semantics of Kafka Streams and > >>>>>> gets > >>>>>>> us > >>>>>>>>>> 80% > >>>>>>>>>>> of the benefit. (See the second part below since these > questions > >>>>>> are > >>>>>>>>>>> related) > >>>>>>>>>>> > >>>>>>>>>>> L2/B1. "why are we specifically emitting based on stream- > >>>>>>>>>>> time?" / > >>>>>>>>>> "Wouldn't > >>>>>>>>>>> it make more sense to have similarly sized batches?" > >>>>>>>>>>> > >>>>>>>>>>> There are two motivations for this: (a) most of the use cases > >>>>>>>>>>> for > >>>>>>> this > >>>>>>>>>> that > >>>>>>>>>>> we've encountered are still time sensitive in that they don't > >>>>>>>>>>> want > >>>>>> to > >>>>>>>>>> batch > >>>>>>>>>>> N records, but rather batch over N seconds (there may be an > >>>>>>>>>>> uneven > >>>>>>>>>>> distribution of keys so that some keys hit N records very > >>>>>>>>>>> quickly > >>>>>> and > >>>>>>>>>>> others are one and done, we'll never see that key again). (b) > >>>>>>>>>>> this > >>>>>> is > >>>>>>>>>> much > >>>>>>>>>>> easier to implement, and is a step forward, given this fits > >>>>>>>>>>> really > >>>>>>> well > >>>>>>>>>>> into the existing windowing semantics. > >>>>>>>>>>> > >>>>>>>>>>> Ultimately, as you suggest, I'd want to be able to control > these > >>>>>>>>>> "batches" > >>>>>>>>>>> with multiple emit strategies. It would be nice to specify a > >>>>>>> condition > >>>>>>>>>> like > >>>>>>>>>>> "emit after N records, or N elapsed seconds, or N bytes > >>>>>> accumulated" > >>>>>>> - > >>>>>>>>>> but > >>>>>>>>>>> again (as mentioned above) modeling this with existing Kafka > >>>>>> Streams > >>>>>>>>>>> semantics/operators is a big stretch and I don't want to expand > >>> the > >>>>>>>> scope > >>>>>>>>>>> of this KIP too much for that. > >>>>>>>>>>> > >>>>>>>>>>> Let me know if that makes sense! Basically I view this as a > good > >>>>>>> middle > >>>>>>>>>>> ground that doesn't compromise semantics but probably addresses > >>>>>> most > >>>>>>>> real > >>>>>>>>>>> (or near-real) time streaming use cases. > >>>>>>>>>>> > >>>>>>>>>>> - Almog > >>>>>>>>>>> > >>>>>>>>>>> On Tue, Jan 28, 2025 at 7:15 AM Bruno Cadonna > >>>>>>>>>>> <cado...@apache.org > >>>> > >>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Hi Almog, > >>>>>>>>>>>> > >>>>>>>>>>>> I had similar thoughts as Lucas. When I read the KIP, I asked > >>>>>> myself > >>>>>>>> why > >>>>>>>>>>>> are the windows not specified on number of records instead of > >>> time > >>>>>>> if > >>>>>>>> we > >>>>>>>>>>>> do not care about whether the event time of the records is > >>>>>>>>>>>> in the > >>>>>>> time > >>>>>>>>>>>> range of the window? > >>>>>>>>>>>> > >>>>>>>>>>>> In your motivation, you write that users might collect small > >>>>>> batches > >>>>>>>> of > >>>>>>>>>>>> records to be passed to a consumer that can handle batched > >>>>>> messages > >>>>>>>> more > >>>>>>>>>>>> effectively than individual messages. Wouldn't it make more > >>>>>>>>>>>> sense > >>>>>> to > >>>>>>>>>>>> have similarly sized batches? > >>>>>>>>>>>> You could also consider to do something like the Kafka > producer > >>>>>> that > >>>>>>>> has > >>>>>>>>>>>> a batch size and a linger time. > >>>>>>>>>>>> > >>>>>>>>>>>> Best, > >>>>>>>>>>>> Bruno > >>>>>>>>>>>> > >>>>>>>>>>>> On 28.01.25 15:44, Lucas Brutschy wrote: > >>>>>>>>>>>>> Hi Almog, > >>>>>>>>>>>>> > >>>>>>>>>>>>> this seems useful to me. I don't see anything wrong with the > >>>>>>> details > >>>>>>>>>>>>> of the proposal. > >>>>>>>>>>>>> > >>>>>>>>>>>>> More generally, I'd like to hear your thoughts on this vs. > >>>>>>> batching. > >>>>>>>>>>>>> Given the motivation section, it sounds we actually want > >>>>>> something > >>>>>>>>>>>>> that I'd call "batching" rather than "windowing". If you do > >>>>>>>>>>>>> not > >>>>>>>> really > >>>>>>>>>>>>> care about the including events that fall into different time > >>>>>>>> windows, > >>>>>>>>>>>>> and the order of events does not matter much, why are we > >>>>>>> specifically > >>>>>>>>>>>>> emitting based on stream-time? Would you expect this > mechanism > >>> to > >>>>>>>>>>>>> extend also to emitting batches based on number of records, > >>>>>>> byte-size > >>>>>>>>>>>>> of batch, wall-clock time? > >>>>>>>>>>>>> > >>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>> Lucas > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Thu, Jan 23, 2025 at 7:59 PM Matthias J. Sax < > >>>>>> mj...@apache.org> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks, Almog. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Good call out about `TimeWindows` vs `TimeWindow` (yes, I am > >>>>>> aware > >>>>>>>> and > >>>>>>>>>>>>>> was actually re-reading my previous email before sending it > a > >>>>>> few > >>>>>>>>>> times > >>>>>>>>>>>>>> to make sure I use the right one; it's very subtle.) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> For `TimeWindows` semantics are certainly well defined, and > >>>>>> there > >>>>>>> is > >>>>>>>>>>>>>> nothing to be discussed. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> For `TimeWindow`, I am not sure as I said, and your > >>>>>> interpretation > >>>>>>>> as > >>>>>>>>>>>>>> "just a container" might be fine, too. I agree to the > >>>>>>>>>>>>>> problem, > >>>>>>> that > >>>>>>>> if > >>>>>>>>>>>>>> we add something new, we might just leak it again, and > >>>>>>>>>>>>>> thus not > >>>>>>> gain > >>>>>>>>>>>>>> much, so the lesser evil might be to just re-use > `TimeWindow` > >>> as > >>>>>>> you > >>>>>>>>>>>>>> propose. I just wanted to point out this question to sanity > >>>>>> check, > >>>>>>>> and > >>>>>>>>>>>>>> collect feedback about it. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On 1/23/25 8:58 AM, Almog Gavra wrote: > >>>>>>>>>>>>>>> Thanks Matthias for the quick and detailed feedback! > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Nit: it seems you are mixing the terms "out-of-order" and > >>>>>> "late" > >>>>>>>> and > >>>>>>>>>>>>>>> using them as synonymous, what we usually not do. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> M1. Ah, in my mind "late arriving" was after the window > >>>>>>>>>>>>>>> closed > >>>>>>> but > >>>>>>>>>>>>>>> potentially before grace (and "out of order" was just > >>>>>>>>>>>>>>> anything > >>>>>>> that > >>>>>>>>>> is > >>>>>>>>>>>> out > >>>>>>>>>>>>>>> of order). Do we have a specific word for "after the window > >>>>>>> closes > >>>>>>>>>> but > >>>>>>>>>>>>>>> before grace"? Maybe we should have "fashionably late" and > >>> "too > >>>>>>>> late" > >>>>>>>>>>>>>>> (haha, just kidding). > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I'll clear up the terminology in the KIP. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Today the only way to implement this semantic is using a > >>>>>>>>>> punctuation > >>>>>>>>>>>>>>> and manually storing events in an aggregation (which is > >>>>>>> inefficient > >>>>>>>>>> for > >>>>>>>>>>>>>>> many reasons and can cause severe bottlenecks) > >>>>>>>>>>>>>>>> Not sure if I follow this argument? I don't see any > >>>>>> relationship > >>>>>>>> to > >>>>>>>>>>>>>>> punctuations. Can you elaborate? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> M2. The way we've seen people implement the desired > behavior > >>> in > >>>>>>>> this > >>>>>>>>>>>> KIP is > >>>>>>>>>>>>>>> to use a non-windowed aggregation on a stream, and then > >>>>>>>>>>>>>>> use a > >>>>>>>>>>>> punctuation > >>>>>>>>>>>>>>> every N seconds to scan the table and emit and delete the > >>>>>> records > >>>>>>>> to > >>>>>>>>>>>>>>> simulate a window closing. It's certainly suboptimal, but > it > >>>>>> gets > >>>>>>>> you > >>>>>>>>>>>> very > >>>>>>>>>>>>>>> similar semantics to what I describe in the KIP. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> M3. Re: extends Windows<TimeWindow> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> (note for other readers, I think you get this Matthias) > >>>>>>> TimeWindow > >>>>>>>> is > >>>>>>>>>>>>>>> distinct from TimeWindows (with an s at the end). > >>>>>>>>>>>>>>> TimeWindows > >>>>>>> (with > >>>>>>>>>> an > >>>>>>>>>>>> s) > >>>>>>>>>>>>>>> implies exactly what you suggest. I do think, however, that > >>>>>>>>>> TimeWindow > >>>>>>>>>>>>>>> (without an s) is a perfect abstraction to leverage here. > >>>>>>>>>>>>>>> That > >>>>>>>> class > >>>>>>>>>>>>>>> doesn't do anything except to indicate the bounds of a > >>>>>>>>>>>>>>> window > >>>>>> (it > >>>>>>>>>> has a > >>>>>>>>>>>>>>> start time, an end time and a definition of whether two > >>> windows > >>>>>>>>>>>> overlap). > >>>>>>>>>>>>>>> The javadoc for TimeWindow (without an s) doesn't even > >>>>>>>>>>>>>>> need to > >>>>>>>> change > >>>>>>>>>>>> to be > >>>>>>>>>>>>>>> used in the way this KIP suggests. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> As suggested, the StreamTimeWindow still uses time as its > >>>>>> bounds > >>>>>>> - > >>>>>>>> it > >>>>>>>>>>>> just > >>>>>>>>>>>>>>> doesn't use the event time to determine which time window > >>>>>>>>>>>>>>> the > >>>>>>> event > >>>>>>>>>> is > >>>>>>>>>>>>>>> placed in. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> M4. Re: I just realized that `Window` is in an `internal` > >>>>>> package > >>>>>>>> but > >>>>>>>>>>>>>>> `TimeWindows#windowsFor` does return `Map<Long, > TimeWindow>` > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Yeah, I was wondering about that as well. I also agree its > a > >>>>>> bit > >>>>>>>>>>>> orthogonal > >>>>>>>>>>>>>>> to the KIP and I think if we introduce a new "extends > >>>>>>>>>>>>>>> Window" > >>>>>>> type > >>>>>>>>>>>> we'll > >>>>>>>>>>>>>>> just leak that type as well, so I don't really see a > benefit > >>>>>> from > >>>>>>>>>>>> that. If > >>>>>>>>>>>>>>> we do decide to tackle the leaky abstraction we might as > >>>>>>>>>>>>>>> well > >>>>>>> just > >>>>>>>>>> have > >>>>>>>>>>>>>>> TimeWindow to handle :) > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> M5. Specifying abstract methods > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Can do! > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> M6. Naming > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I like "OffsetOrderedWindows", I will wait for some other > >>>>>>> feedback > >>>>>>>>>> and > >>>>>>>>>>>> if > >>>>>>>>>>>>>>> no one has something better I'll update the KIP to that. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Re: `extends Window<TimeWindow>` I do think the fact that > it > >>>>>>>> defines > >>>>>>>>>>>> the > >>>>>>>>>>>>>>> window size with time is important, we're not saying the > >>> window > >>>>>>>>>> closes > >>>>>>>>>>>>>>> after N offsets have passed but rather the stream time has > >>>>>>> passed N > >>>>>>>>>>>> seconds > >>>>>>>>>>>>>>> (or ms or whatever). I agree that it doesn't need to be > part > >>> of > >>>>>>> the > >>>>>>>>>>>> main > >>>>>>>>>>>>>>> name, however. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Thu, Jan 23, 2025 at 12:14 AM Matthias J. Sax < > >>>>>>> mj...@apache.org > >>>>>>>>> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Interesting KIP. It's a known problem, and the proposed > >>>>>> solution > >>>>>>>>>> make > >>>>>>>>>>>>>>>> sense to me. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Nit: it seems you are mixing the terms "out-of-order" and > >>>>>> "late" > >>>>>>>> and > >>>>>>>>>>>>>>>> using them as synonymous, what we usually not do. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> "Out-of-order" is the more generic term, while "late" > means > >>>>>>> after > >>>>>>>>>> the > >>>>>>>>>>>>>>>> grace period (hence, late arriving data is still > >>> out-of-order, > >>>>>>> but > >>>>>>>>>>>> it's > >>>>>>>>>>>>>>>> a subset of out-of-order records... all late records are > >>>>>>>>>> out-of-order, > >>>>>>>>>>>>>>>> but not the other way around). I think the KIP could gain > >>>>>>> clarity > >>>>>>>>>> and > >>>>>>>>>>>>>>>> avoid potential confusion to use both terms not a > >>>>>>>>>>>>>>>> synonymous. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> In the end, "late" data cannot be handled at all right > now, > >>>>>>> based > >>>>>>>> on > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>> definition of "late". "late" data is always dropped. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Today the only way to implement this semantic is using a > >>>>>>>>>> punctuation > >>>>>>>>>>>> and > >>>>>>>>>>>>>>>> manually storing events in an aggregation (which is > >>>>>> inefficient > >>>>>>>> for > >>>>>>>>>>>> many > >>>>>>>>>>>>>>>> reasons and can cause severe bottlenecks) > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Not sure if I follow this argument? I don't see any > >>>>>> relationship > >>>>>>>> to > >>>>>>>>>>>>>>>> punctuations. Can you elaborate? > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> It's also not clear to me, why "storing events in an > >>>>>>> aggregation" > >>>>>>>> is > >>>>>>>>>>>>>>>> problematic, and/or how this new window type would avoid > >>>>>>>>>>>>>>>> it? > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> extends Windows<TimeWindow> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Not sure if this is the right thing to do it? I understand > >>>>>> your > >>>>>>>>>>>> intent, > >>>>>>>>>>>>>>>> but to me, `TimeWindow` has certain semantics, implying we > >>>>>> only > >>>>>>>> put > >>>>>>>>>>>> data > >>>>>>>>>>>>>>>> with record-ts between the window bounds into the > >>>>>>>>>>>>>>>> window. Not > >>>>>>> sure > >>>>>>>>>> if > >>>>>>>>>>>> my > >>>>>>>>>>>>>>>> interpretation is off, and re-using could be ok? It's just > >>>>>>>> something > >>>>>>>>>>>> we > >>>>>>>>>>>>>>>> should figure out I guess. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> To be fair: it's not 100% clearly specific if these > >>>>>>>>>>>>>>>> semantics > >>>>>>>> apply > >>>>>>>>>> or > >>>>>>>>>>>>>>>> not. If we believe they don't apply, happy to pivot on > >>>>>>>>>>>>>>>> this, > >>>>>> but > >>>>>>>> we > >>>>>>>>>>>>>>>> might want to update the JavaDocs, if we only want to > >>>>>>>>>>>>>>>> use it > >>>>>> as > >>>>>>>>>>>>>>>> "container class" (for the lack of better word). > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> What actually raises a different issue: I just realized > >>>>>>>>>>>>>>>> that > >>>>>>>>>> `Window` > >>>>>>>>>>>> is > >>>>>>>>>>>>>>>> in an `internal` package, but `TimeWindows#windowsFor` > does > >>>>>>> return > >>>>>>>>>>>>>>>> `Map<Long, TimeWindow>`, thus making it effectively kinda > >>>>>>>> public... > >>>>>>>>>> It > >>>>>>>>>>>>>>>> seems we should we do something about this? End users do > >>>>>>>>>>>>>>>> not > >>>>>>>> really > >>>>>>>>>>>> need > >>>>>>>>>>>>>>>> to use `TimeWindow` -- it's really internal, and it seems > >>>>>>>>>> `windowsFor` > >>>>>>>>>>>>>>>> is effective a leaky abstraction... > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> If we keep `TimeWindow` internal, I think we can easily be > >>>>>> more > >>>>>>>>>>>> relaxed > >>>>>>>>>>>>>>>> with applied semantics. But it's kinda leaking right now, > >>> what > >>>>>>>> does > >>>>>>>>>>>> not > >>>>>>>>>>>>>>>> make me happy... (Of course, I also don't want to derail > >>>>>>>>>>>>>>>> and > >>>>>>>> hijack > >>>>>>>>>>>> this > >>>>>>>>>>>>>>>> KIP for some related, but orthogonal issue in the API...) > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On other nit I am wondering about is, if it might be > >>>>>>>>>>>>>>>> good to > >>>>>>>> specify > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>> abstract methods inherited from `Windows` on the KIP for > >>>>>>> clarity? > >>>>>>>>>> Even > >>>>>>>>>>>>>>>> they are implicitly well specified it might be > >>>>>>>>>>>>>>>> beneficial to > >>>>>>> spell > >>>>>>>>>> it > >>>>>>>>>>>>>>>> out. After all, `StreamTimeWindow` needs to implement > there > >>>>>>>> methods. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> And of course > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Note: I really don't like the name I cam up with > >>>>>>>>>>>> StreamTimeWindows... if > >>>>>>>>>>>>>>>> anyone has a better name in mind please suggest one! > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I am not a fan of the name either. But not really sure > >>>>>>>>>>>>>>>> what a > >>>>>>>> better > >>>>>>>>>>>>>>>> name could be. In the end, it's more of an "offset ordered > >>>>>>> window" > >>>>>>>>>>>>>>>> rather than a "time window", because it does not really > >>>>>>>>>>>>>>>> take > >>>>>> the > >>>>>>>>>>>> record > >>>>>>>>>>>>>>>> timestamps into account to figure out into what window a > >>>>>> record > >>>>>>>> goes > >>>>>>>>>>>>>>>> into, but it really uses the offset order. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I believe that the size of window itself does not matter > >>>>>>>>>>>>>>>> too > >>>>>>> much > >>>>>>>>>> for > >>>>>>>>>>>>>>>> naming, so even if the size is defined in time, I don't > >>>>>>>>>>>>>>>> think > >>>>>>> that > >>>>>>>>>> the > >>>>>>>>>>>>>>>> term "time" must be part of the window name. (That's > >>>>>>>>>>>>>>>> also why > >>>>>> I > >>>>>>> am > >>>>>>>>>>>>>>>> wondering if we should go with `extends<TimeWindow>` or > add > >>>>>>>>>> something > >>>>>>>>>>>>>>>> new, which we hopefully can keep internal...) > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Not sure if we ever intent to make `StreamTimeWindows` > >>>>>>>> overlapping? > >>>>>>>>>>>>>>>> Given the described use-cases it seems very unlikely? So > >>>>>>>> effectively > >>>>>>>>>>>>>>>> it's a "tumbling window" . Of course, we should not just > >>>>>>> overload > >>>>>>>>>> this > >>>>>>>>>>>>>>>> term, but there is no `TumblingWindows` in the API -- only > >>> the > >>>>>>>> term > >>>>>>>>>>>>>>>> Tumbling Window in the docs to describe the special case > of > >>>>>>>>>>>>>>>> non-overlapping `TimeWindows`. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> So maybe `OffsetTumblingWindows` could work? Or > >>>>>>>>>>>> `OffsetOrderedWindows`? > >>>>>>>>>>>>>>>> Just spitballing here, to get a discussion going... > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On 1/22/25 8:26 PM, Almog Gavra wrote: > >>>>>>>>>>>>>>>>> Hello! > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> I'd like to initiate a discussion thread on KIP-1127: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/ > >>>>>> KIP-1127+Flexible+Windows+for+Late+Arriving+Data > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> This KIP aims to make it easier to specify windowing > >>>>>> semantics > >>>>>>>> that > >>>>>>>>>>>> are > >>>>>>>>>>>>>>>>> more tolerable to late arriving data, particularly with > >>>>>>>>>> suppression. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Would appreciate any and all feedback. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>>> Almog > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >>> > >> > > > >