I'm not opposed to "BatchedWindows" - I think I like that the most so far. I'll let that sit on the discussion thread for a while, and change the KIP to match if no concerns.
> What I don't understand is, why the relationship to suppress()/emitStrategy() is relevant? Can you elaborate a little bit? The existing proposal has no impact on suppression/emitStrategy, but I'm uncertain it's unrelated if you start to introduce window constraints that aren't just stream time. Imagine having a batch window with max N records, and then also specifying a BufferConfig.maxRecords() suppression... what happens in that case? With the current scope of the KIP, we don't need to worry about any of that so it's pretty well contained. > if data is auto-repartitioned before a `BatchWindows` step, repartitioning introduces non-deterministic order in the repartition topic This is a good point, I did not think about it (my original comment was specifically about wall clock time). I guess, though, the current proposal that only includes stream time based batches is no worse than existing windows which also have that potential indeterminism in this scenario except for that it's more likely since grace period is always 0. > I don't see any relationship to EOS or ALOS. Can you explain what you mean? Hmm, I'm not totally sure what I was thinking... we can ignore that part. - Almog On Tue, Jan 28, 2025 at 3:40 PM Matthias J. Sax <mj...@apache.org> wrote: > Interesting thoughts. So maybe we could go with `BatchWindows` as a > name? Again, only spit-balling... > > If we really put "(micro-)batching" in the center of this idea, I think > both count-based and time-based (and time could actually be either > stream-time or wall-clock-time), or any combination of these dimensions > could all make sense. > > Having said this: I don't think we need to support all dimensions > initially, but if we add something like `BatchWindows` we can extent the > supported specification incrementally. > > > > What I don't understand is, why the relationship to > suppress()/emitStrategy() is relevant? Can you elaborate a little bit? > > > > > how to make it deterministic with regards to time > > I think, in any case, we are leaving determinism-land (at least to some > extent). I guess, as long as the `BatchWindows` are applied directly to > an input topic, we get determinism for both stream-time size, as well as > count-size windows (not for wall-clock0time, of course). > > However, if data is auto-repartitioned before a `BatchWindows` step, > repartitioning introduces non-deterministic order in the repartition > topic due to interleaved writes, and thus, also stream-time based > windows would become non-deterministic (as there is no grace-period by > design to "fix" the non-deterministic order, in contrast to full > event-time based windows we support so far). > > So I don't think that is an actual difference between stream-time or > count-based `BatchWindows` with regard to determinism. > > > > > which is important for EOS and even ALOS > > I don't see any relationship to EOS or ALOS. Can you explain what you mean? > > > > -Matthias > > > On 1/28/25 10:43 AM, Almog Gavra wrote: > > Thanks for the feedback Lucas and Bruno! > > > > L0. "Given the motivation section, it sounds we actually want something > > that I'd call "batching" rather than "windowing"." > > > > You are right here, and I think ultimately introducing more flexible and > > controlled micro-batching will be useful for Kafka Streams, but that > > expands the scope a little more than I'd want. We'd have to rethink the > > implications of suppression as well as emitting and how to make it > > deterministic with regards to time (which is important for EOS and even > > ALOS). The proposal here is a halfway point that I think is easy enough > to > > reason about within the existing semantics of Kafka Streams and gets us > 80% > > of the benefit. (See the second part below since these questions are > > related) > > > > L2/B1. "why are we specifically emitting based on stream-time?" / > "Wouldn't > > it make more sense to have similarly sized batches?" > > > > There are two motivations for this: (a) most of the use cases for this > that > > we've encountered are still time sensitive in that they don't want to > batch > > N records, but rather batch over N seconds (there may be an uneven > > distribution of keys so that some keys hit N records very quickly and > > others are one and done, we'll never see that key again). (b) this is > much > > easier to implement, and is a step forward, given this fits really well > > into the existing windowing semantics. > > > > Ultimately, as you suggest, I'd want to be able to control these > "batches" > > with multiple emit strategies. It would be nice to specify a condition > like > > "emit after N records, or N elapsed seconds, or N bytes accumulated" - > but > > again (as mentioned above) modeling this with existing Kafka Streams > > semantics/operators is a big stretch and I don't want to expand the scope > > of this KIP too much for that. > > > > Let me know if that makes sense! Basically I view this as a good middle > > ground that doesn't compromise semantics but probably addresses most real > > (or near-real) time streaming use cases. > > > > - Almog > > > > On Tue, Jan 28, 2025 at 7:15 AM Bruno Cadonna <cado...@apache.org> > wrote: > > > >> Hi Almog, > >> > >> I had similar thoughts as Lucas. When I read the KIP, I asked myself why > >> are the windows not specified on number of records instead of time if we > >> do not care about whether the event time of the records is in the time > >> range of the window? > >> > >> In your motivation, you write that users might collect small batches of > >> records to be passed to a consumer that can handle batched messages more > >> effectively than individual messages. Wouldn't it make more sense to > >> have similarly sized batches? > >> You could also consider to do something like the Kafka producer that has > >> a batch size and a linger time. > >> > >> Best, > >> Bruno > >> > >> On 28.01.25 15:44, Lucas Brutschy wrote: > >>> Hi Almog, > >>> > >>> this seems useful to me. I don't see anything wrong with the details > >>> of the proposal. > >>> > >>> More generally, I'd like to hear your thoughts on this vs. batching. > >>> Given the motivation section, it sounds we actually want something > >>> that I'd call "batching" rather than "windowing". If you do not really > >>> care about the including events that fall into different time windows, > >>> and the order of events does not matter much, why are we specifically > >>> emitting based on stream-time? Would you expect this mechanism to > >>> extend also to emitting batches based on number of records, byte-size > >>> of batch, wall-clock time? > >>> > >>> Cheers, > >>> Lucas > >>> > >>> On Thu, Jan 23, 2025 at 7:59 PM Matthias J. Sax <mj...@apache.org> > >> wrote: > >>>> > >>>> Thanks, Almog. > >>>> > >>>> Good call out about `TimeWindows` vs `TimeWindow` (yes, I am aware and > >>>> was actually re-reading my previous email before sending it a few > times > >>>> to make sure I use the right one; it's very subtle.) > >>>> > >>>> For `TimeWindows` semantics are certainly well defined, and there is > >>>> nothing to be discussed. > >>>> > >>>> For `TimeWindow`, I am not sure as I said, and your interpretation as > >>>> "just a container" might be fine, too. I agree to the problem, that if > >>>> we add something new, we might just leak it again, and thus not gain > >>>> much, so the lesser evil might be to just re-use `TimeWindow` as you > >>>> propose. I just wanted to point out this question to sanity check, and > >>>> collect feedback about it. > >>>> > >>>> > >>>> -Matthias > >>>> > >>>> On 1/23/25 8:58 AM, Almog Gavra wrote: > >>>>> Thanks Matthias for the quick and detailed feedback! > >>>>> > >>>>>> Nit: it seems you are mixing the terms "out-of-order" and "late" and > >>>>> using them as synonymous, what we usually not do. > >>>>> > >>>>> M1. Ah, in my mind "late arriving" was after the window closed but > >>>>> potentially before grace (and "out of order" was just anything that > is > >> out > >>>>> of order). Do we have a specific word for "after the window closes > but > >>>>> before grace"? Maybe we should have "fashionably late" and "too late" > >>>>> (haha, just kidding). > >>>>> > >>>>> I'll clear up the terminology in the KIP. > >>>>> > >>>>>>> Today the only way to implement this semantic is using a > punctuation > >>>>> and manually storing events in an aggregation (which is inefficient > for > >>>>> many reasons and can cause severe bottlenecks) > >>>>>> Not sure if I follow this argument? I don't see any relationship to > >>>>> punctuations. Can you elaborate? > >>>>> > >>>>> M2. The way we've seen people implement the desired behavior in this > >> KIP is > >>>>> to use a non-windowed aggregation on a stream, and then use a > >> punctuation > >>>>> every N seconds to scan the table and emit and delete the records to > >>>>> simulate a window closing. It's certainly suboptimal, but it gets you > >> very > >>>>> similar semantics to what I describe in the KIP. > >>>>> > >>>>> M3. Re: extends Windows<TimeWindow> > >>>>> > >>>>> (note for other readers, I think you get this Matthias) TimeWindow is > >>>>> distinct from TimeWindows (with an s at the end). TimeWindows (with > an > >> s) > >>>>> implies exactly what you suggest. I do think, however, that > TimeWindow > >>>>> (without an s) is a perfect abstraction to leverage here. That class > >>>>> doesn't do anything except to indicate the bounds of a window (it > has a > >>>>> start time, an end time and a definition of whether two windows > >> overlap). > >>>>> The javadoc for TimeWindow (without an s) doesn't even need to change > >> to be > >>>>> used in the way this KIP suggests. > >>>>> > >>>>> As suggested, the StreamTimeWindow still uses time as its bounds - it > >> just > >>>>> doesn't use the event time to determine which time window the event > is > >>>>> placed in. > >>>>> > >>>>> M4. Re: I just realized that `Window` is in an `internal` package but > >>>>> `TimeWindows#windowsFor` does return `Map<Long, TimeWindow>` > >>>>> > >>>>> Yeah, I was wondering about that as well. I also agree its a bit > >> orthogonal > >>>>> to the KIP and I think if we introduce a new "extends Window" type > >> we'll > >>>>> just leak that type as well, so I don't really see a benefit from > >> that. If > >>>>> we do decide to tackle the leaky abstraction we might as well just > have > >>>>> TimeWindow to handle :) > >>>>> > >>>>> M5. Specifying abstract methods > >>>>> > >>>>> Can do! > >>>>> > >>>>> M6. Naming > >>>>> > >>>>> I like "OffsetOrderedWindows", I will wait for some other feedback > and > >> if > >>>>> no one has something better I'll update the KIP to that. > >>>>> > >>>>> Re: `extends Window<TimeWindow>` I do think the fact that it defines > >> the > >>>>> window size with time is important, we're not saying the window > closes > >>>>> after N offsets have passed but rather the stream time has passed N > >> seconds > >>>>> (or ms or whatever). I agree that it doesn't need to be part of the > >> main > >>>>> name, however. > >>>>> > >>>>> > >>>>> On Thu, Jan 23, 2025 at 12:14 AM Matthias J. Sax <mj...@apache.org> > >> wrote: > >>>>> > >>>>>> Interesting KIP. It's a known problem, and the proposed solution > make > >>>>>> sense to me. > >>>>>> > >>>>>> Nit: it seems you are mixing the terms "out-of-order" and "late" and > >>>>>> using them as synonymous, what we usually not do. > >>>>>> > >>>>>> "Out-of-order" is the more generic term, while "late" means after > the > >>>>>> grace period (hence, late arriving data is still out-of-order, but > >> it's > >>>>>> a subset of out-of-order records... all late records are > out-of-order, > >>>>>> but not the other way around). I think the KIP could gain clarity > and > >>>>>> avoid potential confusion to use both terms not a synonymous. > >>>>>> > >>>>>> In the end, "late" data cannot be handled at all right now, based on > >> the > >>>>>> definition of "late". "late" data is always dropped. > >>>>>> > >>>>>> > >>>>>> > >>>>>>> Today the only way to implement this semantic is using a > punctuation > >> and > >>>>>> manually storing events in an aggregation (which is inefficient for > >> many > >>>>>> reasons and can cause severe bottlenecks) > >>>>>> > >>>>>> Not sure if I follow this argument? I don't see any relationship to > >>>>>> punctuations. Can you elaborate? > >>>>>> > >>>>>> It's also not clear to me, why "storing events in an aggregation" is > >>>>>> problematic, and/or how this new window type would avoid it? > >>>>>> > >>>>>> > >>>>>> > >>>>>>> extends Windows<TimeWindow> > >>>>>> > >>>>>> Not sure if this is the right thing to do it? I understand your > >> intent, > >>>>>> but to me, `TimeWindow` has certain semantics, implying we only put > >> data > >>>>>> with record-ts between the window bounds into the window. Not sure > if > >> my > >>>>>> interpretation is off, and re-using could be ok? It's just something > >> we > >>>>>> should figure out I guess. > >>>>>> > >>>>>> To be fair: it's not 100% clearly specific if these semantics apply > or > >>>>>> not. If we believe they don't apply, happy to pivot on this, but we > >>>>>> might want to update the JavaDocs, if we only want to use it as > >>>>>> "container class" (for the lack of better word). > >>>>>> > >>>>>> What actually raises a different issue: I just realized that > `Window` > >> is > >>>>>> in an `internal` package, but `TimeWindows#windowsFor` does return > >>>>>> `Map<Long, TimeWindow>`, thus making it effectively kinda public... > It > >>>>>> seems we should we do something about this? End users do not really > >> need > >>>>>> to use `TimeWindow` -- it's really internal, and it seems > `windowsFor` > >>>>>> is effective a leaky abstraction... > >>>>>> > >>>>>> If we keep `TimeWindow` internal, I think we can easily be more > >> relaxed > >>>>>> with applied semantics. But it's kinda leaking right now, what does > >> not > >>>>>> make me happy... (Of course, I also don't want to derail and hijack > >> this > >>>>>> KIP for some related, but orthogonal issue in the API...) > >>>>>> > >>>>>> On other nit I am wondering about is, if it might be good to specify > >> the > >>>>>> abstract methods inherited from `Windows` on the KIP for clarity? > Even > >>>>>> they are implicitly well specified it might be beneficial to spell > it > >>>>>> out. After all, `StreamTimeWindow` needs to implement there methods. > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> And of course > >>>>>> > >>>>>>> Note: I really don't like the name I cam up with > >> StreamTimeWindows... if > >>>>>> anyone has a better name in mind please suggest one! > >>>>>> > >>>>>> I am not a fan of the name either. But not really sure what a better > >>>>>> name could be. In the end, it's more of an "offset ordered window" > >>>>>> rather than a "time window", because it does not really take the > >> record > >>>>>> timestamps into account to figure out into what window a record goes > >>>>>> into, but it really uses the offset order. > >>>>>> > >>>>>> I believe that the size of window itself does not matter too much > for > >>>>>> naming, so even if the size is defined in time, I don't think that > the > >>>>>> term "time" must be part of the window name. (That's also why I am > >>>>>> wondering if we should go with `extends<TimeWindow>` or add > something > >>>>>> new, which we hopefully can keep internal...) > >>>>>> > >>>>>> Not sure if we ever intent to make `StreamTimeWindows` overlapping? > >>>>>> Given the described use-cases it seems very unlikely? So effectively > >>>>>> it's a "tumbling window" . Of course, we should not just overload > this > >>>>>> term, but there is no `TumblingWindows` in the API -- only the term > >>>>>> Tumbling Window in the docs to describe the special case of > >>>>>> non-overlapping `TimeWindows`. > >>>>>> > >>>>>> So maybe `OffsetTumblingWindows` could work? Or > >> `OffsetOrderedWindows`? > >>>>>> Just spitballing here, to get a discussion going... > >>>>>> > >>>>>> > >>>>>> > >>>>>> -Matthias > >>>>>> > >>>>>> > >>>>>> > >>>>>> On 1/22/25 8:26 PM, Almog Gavra wrote: > >>>>>>> Hello! > >>>>>>> > >>>>>>> I'd like to initiate a discussion thread on KIP-1127: > >>>>>>> > >>>>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1127+Flexible+Windows+for+Late+Arriving+Data > >>>>>>> > >>>>>>> This KIP aims to make it easier to specify windowing semantics that > >> are > >>>>>>> more tolerable to late arriving data, particularly with > suppression. > >>>>>>> > >>>>>>> Would appreciate any and all feedback. > >>>>>>> > >>>>>>> Cheers, > >>>>>>> Almog > >>>>>>> > >>>>>> > >>>>>> > >>>>> > >>>> > >> > >> > > > >