Good call out, thanks Matthias! I've updated the example image in the KIP
and called out specifically that window ends are exclusive in the
corresponding javadocs.

- Almog

On Thu, Feb 20, 2025 at 5:45 PM Matthias J. Sax <mj...@apache.org> wrote:

> Hi,
>
> I was just re-reading to KIP in order to vote and notices that the
> example seems not to be correct?
>
> If we have a window of size 10, and bounds [0,10), [11,20), [21,30) the
> lower bound would be inclusive, and the upper bound would be exclusive.
> At least this is how we do it for `TimeWindows`, and I think it would
> make sense to adopt the same semantics for BatchWindows`?
>
> Thus, the even at t=10 would not go into the first window, but the
> second, and similar for t=20 and t=30.
>
> Overall, its seems to be a gap in the KIP that we missed, that the
> bounds are not explicitly defined and called out. Might be worth to add
> a short sentence/paragraph about it.
>
>
> Thoughts?
>
>
> -Matthias
>
> On 2/6/25 4:49 PM, Matthias J. Sax wrote:
> > Sounds good. Thanks for clarifying.
> >
> >
> > -Matthias
> >
> > On 2/6/25 3:19 PM, Almog Gavra wrote:
> >> Good call on the backwards compatibility - updated the KIP.
> >>
> >> Re: the grace period for BatchWindows, I think zero makes sense (and
> also
> >> makes implementing things a lot easier). In my mental model, we still
> >> drop
> >> late records that come in after the window closes, they just never
> happen
> >> because we use the current stream time when computing which window it
> >> should be in instead of the event time.
> >>
> >> If we do this the implementation is easier because: (a) we don't have to
> >> change the way retention for stored windows work and special case that
> >> for
> >> BatchWindows (retention is just window size + 0 grace) and (b) we can
> >> actually just reuse the KStreamWindowAggregateProcessor without changing
> >> anything else.
> >>
> >> LMK if that makes sense!
> >>
> >> On Thu, Feb 6, 2025 at 12:12 PM Matthias J. Sax <mj...@apache.org>
> wrote:
> >>
> >>> Hit "reply" too early. Just re-read the KIP.
> >>>
> >>> For `Windows#windowsFor(...)`, even if not intended to be implement by
> >>> users, it's strictly public API. Thus, we cannot just change the
> method,
> >>> but would need to keep the existing method and deprecate it, and add a
> >>> new overload with a default impl that calls the exiting one.
> >>>
> >>> For `BatchedWindows#gracePeriod` I am wondering if `return 0;` does
> make
> >>> sense? In the end, zero would still mean "drop late record and not put
> >>> them into the window", but that is exactly what we want to do. Thus,
> >>> IMHO, the new BatchedWindows don't have / don't need the notion of a
> >>> "grace period" to begin with, and we can just throw an
> >>> "UnsupportedOperationException" from this method?
> >>>
> >>> The underlying implementation of the "BatchWindowAggregationProcessor"
> >>> should never need to call `gracePeriod` anyway. Or do I miss something?
> >>>
> >>>
> >>> -Matthias
> >>>
> >>>
> >>> On 2/6/25 12:01 PM, Matthias J. Sax wrote:
> >>>> BatchWindows works for me.
> >>>>
> >>>>
> >>>> On 2/6/25 7:34 AM, Almog Gavra wrote:
> >>>>> Happy to name it BatchWindows. Will give some people time to chime in
> >>> and
> >>>>> then change the name.
> >>>>>
> >>>>> - Almog
> >>>>>
> >>>>> On Tue, Feb 4, 2025 at 11:10 PM Sophie Blee-Goldman
> >>>>> <sop...@responsive.dev>
> >>>>> wrote:
> >>>>>
> >>>>>> One minor suggestion: use BatchWindows instead of BatchedWindows.
> The
> >>>>>> version without the "ed" matches up with the established naming
> >>>>>> pattern and
> >>>>>> grammar used by other Windows classes: eg TimeWindows,
> >>>>>> SessionWindows,
> >>>>>> SlidingWindows
> >>>>>>
> >>>>>> Not a big deal though, won't redact my +1 on the voting thread if
> you
> >>>>>> prefer to keep it as BatchedWindows
> >>>>>>
> >>>>>> On Tue, Feb 4, 2025 at 10:51 AM Almog Gavra <almog.ga...@gmail.com>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Thanks for the discussion everyone! I've updated the Wiki with the
> >>>>>>> following changes:
> >>>>>>>
> >>>>>>> - Renamed to BatchedWindows
> >>>>>>> - Add a note in rejected alternatives about more general purpose
> >>>>>>> (micro-)batching functionality since the scope of that is much
> >>>>>>> wider.
> >>>>>>>
> >>>>>>> Since it looks like we've stabilized the discussion I'm going to go
> >>>>>>> ahead
> >>>>>>> and open up the vote! Definitely feel free to take another look and
> >>>>>>> leave
> >>>>>>> any additional thoughts.
> >>>>>>>
> >>>>>>>
> >>>>>>> On Thu, Jan 30, 2025 at 12:28 PM Matthias J. Sax <mj...@apache.org
> >
> >>>>>> wrote:
> >>>>>>>
> >>>>>>>>> batch window with max N records,
> >>>>>>>>>> and then also specifying a BufferConfig.maxRecords()
> >>>>>>>>
> >>>>>>>> That's actually two different and independent dimensions. "N
> >>>>>>>> records"
> >>>>>>>> would be the number of records in the window, but `maxRecords`
> >>>>>>>> is the
> >>>>>>>> number of unique keys/row in the buffer before it's flushed.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>> the current proposal
> >>>>>>>>> that only includes stream time based batches is no worse than
> >>>>>> existing
> >>>>>>>>> windows which also have that potential indeterminism in this
> >>> scenario
> >>>>>>>>> except for that it's more likely since grace period is always 0
> >>>>>>>>
> >>>>>>>> Guess this becomes a philosophical question. For TimeWindows,
> >>>>>>>> even if
> >>>>>>>> grace=0, it's still deterministic into which window each record
> >>>>>>>> goes --
> >>>>>>>> of course, the "cut off point" when we start to drop late
> >>>>>>>> records is
> >>>>>>>> subject to "noise" due to repartitioning, as stream-time advances
> >>> "non
> >>>>>>>> deterministically".
> >>>>>>>>
> >>>>>>>> So if we drop different records on a "re-run" on the same input
> >>>>>>>> data,
> >>>>>> we
> >>>>>>>> might still get different windows (as different records would have
> >>>>>>>> been
> >>>>>>>> dropped). But that is exactly why we have a grace-period to begin
> >>> with
> >>>>>>>> (to avoid it, and to make a re-run deterministic -- I think of a
> >>>>>>>> "too
> >>>>>>>> short" grace period as a misconfiguration -- or an informed
> >>>>>>>> decision to
> >>>>>>>> sacrifice determinism for something else)...
> >>>>>>>>
> >>>>>>>> And as the new window type, by definition, does not need/want a
> >>>>>>>> grace-period, IMHO, the new window type would be "worse" (for the
> >>> lack
> >>>>>>>> of a better word...); I don't think its really worse, it just
> >>>>>> inherently
> >>>>>>>> non-deterministic, and that's fine.
> >>>>>>>>
> >>>>>>>> Guess we are overall on the same page and share common
> >>>>>>>> understanding of
> >>>>>>>> the trade-off. So I think we are good :)
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 1/30/25 9:07 AM, Almog Gavra wrote:
> >>>>>>>>> I'm not opposed to "BatchedWindows" - I think I like that the
> most
> >>> so
> >>>>>>>> far.
> >>>>>>>>> I'll let that sit on the discussion thread for a while, and
> change
> >>>>>> the
> >>>>>>>> KIP
> >>>>>>>>> to match if no concerns.
> >>>>>>>>>
> >>>>>>>>>> What I don't understand is, why the relationship to
> >>>>>>>>> suppress()/emitStrategy() is relevant? Can you elaborate a little
> >>>>>> bit?
> >>>>>>>>>
> >>>>>>>>> The existing proposal has no impact on suppression/
> >>>>>>>>> emitStrategy, but
> >>>>>>> I'm
> >>>>>>>>> uncertain it's unrelated if you start to introduce window
> >>> constraints
> >>>>>>>> that
> >>>>>>>>> aren't just stream time. Imagine having a batch window with max N
> >>>>>>>> records,
> >>>>>>>>> and then also specifying a BufferConfig.maxRecords()
> >>>>>>>>> suppression...
> >>>>>>> what
> >>>>>>>>> happens in that case? With the current scope of the KIP, we don't
> >>>>>> need
> >>>>>>> to
> >>>>>>>>> worry about any of that so it's pretty well contained.
> >>>>>>>>>
> >>>>>>>>>> if data is auto-repartitioned before a `BatchWindows` step,
> >>>>>>>>> repartitioning introduces non-deterministic order in the
> >>>>>>>>> repartition
> >>>>>>>> topic
> >>>>>>>>>
> >>>>>>>>> This is a good point, I did not think about it (my original
> >>>>>>>>> comment
> >>>>>> was
> >>>>>>>>> specifically about wall clock time). I guess, though, the current
> >>>>>>>> proposal
> >>>>>>>>> that only includes stream time based batches is no worse than
> >>>>>> existing
> >>>>>>>>> windows which also have that potential indeterminism in this
> >>> scenario
> >>>>>>>>> except for that it's more likely since grace period is always 0.
> >>>>>>>>>
> >>>>>>>>>> I don't see any relationship to EOS or ALOS. Can you explain
> what
> >>>>>> you
> >>>>>>>>> mean?
> >>>>>>>>>
> >>>>>>>>> Hmm, I'm not totally sure what I was thinking... we can ignore
> >>>>>>>>> that
> >>>>>>> part.
> >>>>>>>>>
> >>>>>>>>> - Almog
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Tue, Jan 28, 2025 at 3:40 PM Matthias J. Sax <
> mj...@apache.org>
> >>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Interesting thoughts. So maybe we could go with `BatchWindows`
> >>>>>>>>>> as a
> >>>>>>>>>> name? Again, only spit-balling...
> >>>>>>>>>>
> >>>>>>>>>> If we really put "(micro-)batching" in the center of this idea,
> I
> >>>>>>> think
> >>>>>>>>>> both count-based and time-based (and time could actually be
> >>>>>>>>>> either
> >>>>>>>>>> stream-time or wall-clock-time), or any combination of these
> >>>>>>> dimensions
> >>>>>>>>>> could all make sense.
> >>>>>>>>>>
> >>>>>>>>>> Having said this: I don't think we need to support all
> dimensions
> >>>>>>>>>> initially, but if we add something like `BatchWindows` we can
> >>> extent
> >>>>>>> the
> >>>>>>>>>> supported specification incrementally.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> What I don't understand is, why the relationship to
> >>>>>>>>>> suppress()/emitStrategy() is relevant? Can you elaborate a
> little
> >>>>>> bit?
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>> how to make it deterministic with regards to time
> >>>>>>>>>>
> >>>>>>>>>> I think, in any case, we are leaving determinism-land (at
> >>>>>>>>>> least to
> >>>>>>> some
> >>>>>>>>>> extent). I guess, as long as the `BatchWindows` are applied
> >>> directly
> >>>>>>> to
> >>>>>>>>>> an input topic, we get determinism for both stream-time size, as
> >>>>>> well
> >>>>>>> as
> >>>>>>>>>> count-size windows (not for wall-clock0time, of course).
> >>>>>>>>>>
> >>>>>>>>>> However, if data is auto-repartitioned before a `BatchWindows`
> >>> step,
> >>>>>>>>>> repartitioning introduces non-deterministic order in the
> >>> repartition
> >>>>>>>>>> topic due to interleaved writes, and thus, also stream-time
> based
> >>>>>>>>>> windows would become non-deterministic (as there is no grace-
> >>>>>>>>>> period
> >>>>>> by
> >>>>>>>>>> design to "fix" the non-deterministic order, in contrast to full
> >>>>>>>>>> event-time based windows we support so far).
> >>>>>>>>>>
> >>>>>>>>>> So I don't think that is an actual difference between
> stream-time
> >>> or
> >>>>>>>>>> count-based `BatchWindows` with regard to determinism.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>> which is important for EOS and even ALOS
> >>>>>>>>>>
> >>>>>>>>>> I don't see any relationship to EOS or ALOS. Can you explain
> what
> >>>>>> you
> >>>>>>>> mean?
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> -Matthias
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On 1/28/25 10:43 AM, Almog Gavra wrote:
> >>>>>>>>>>> Thanks for the feedback Lucas and Bruno!
> >>>>>>>>>>>
> >>>>>>>>>>> L0. "Given the motivation section, it sounds we actually want
> >>>>>>> something
> >>>>>>>>>>> that I'd call "batching" rather than "windowing"."
> >>>>>>>>>>>
> >>>>>>>>>>> You are right here, and I think ultimately introducing more
> >>>>>> flexible
> >>>>>>>> and
> >>>>>>>>>>> controlled micro-batching will be useful for Kafka Streams, but
> >>>>>> that
> >>>>>>>>>>> expands the scope a little more than I'd want. We'd have to
> >>> rethink
> >>>>>>> the
> >>>>>>>>>>> implications of suppression as well as emitting and how to
> >>>>>>>>>>> make it
> >>>>>>>>>>> deterministic with regards to time (which is important for
> >>>>>>>>>>> EOS and
> >>>>>>> even
> >>>>>>>>>>> ALOS). The proposal here is a halfway point that I think is
> easy
> >>>>>>> enough
> >>>>>>>>>> to
> >>>>>>>>>>> reason about within the existing semantics of Kafka Streams and
> >>>>>> gets
> >>>>>>> us
> >>>>>>>>>> 80%
> >>>>>>>>>>> of the benefit. (See the second part below since these
> questions
> >>>>>> are
> >>>>>>>>>>> related)
> >>>>>>>>>>>
> >>>>>>>>>>> L2/B1. "why are we specifically emitting based on stream-
> >>>>>>>>>>> time?" /
> >>>>>>>>>> "Wouldn't
> >>>>>>>>>>> it make more sense to have similarly sized batches?"
> >>>>>>>>>>>
> >>>>>>>>>>> There are two motivations for this: (a) most of the use cases
> >>>>>>>>>>> for
> >>>>>>> this
> >>>>>>>>>> that
> >>>>>>>>>>> we've encountered are still time sensitive in that they don't
> >>>>>>>>>>> want
> >>>>>> to
> >>>>>>>>>> batch
> >>>>>>>>>>> N records, but rather batch over N seconds (there may be an
> >>>>>>>>>>> uneven
> >>>>>>>>>>> distribution of keys so that some keys hit N records very
> >>>>>>>>>>> quickly
> >>>>>> and
> >>>>>>>>>>> others are one and done, we'll never see that key again). (b)
> >>>>>>>>>>> this
> >>>>>> is
> >>>>>>>>>> much
> >>>>>>>>>>> easier to implement, and is a step forward, given this fits
> >>>>>>>>>>> really
> >>>>>>> well
> >>>>>>>>>>> into the existing windowing semantics.
> >>>>>>>>>>>
> >>>>>>>>>>> Ultimately, as you suggest, I'd want to be able to control
> these
> >>>>>>>>>> "batches"
> >>>>>>>>>>> with multiple emit strategies. It would be nice to specify a
> >>>>>>> condition
> >>>>>>>>>> like
> >>>>>>>>>>> "emit after N records, or N elapsed seconds, or N bytes
> >>>>>> accumulated"
> >>>>>>> -
> >>>>>>>>>> but
> >>>>>>>>>>> again (as mentioned above) modeling this with existing Kafka
> >>>>>> Streams
> >>>>>>>>>>> semantics/operators is a big stretch and I don't want to expand
> >>> the
> >>>>>>>> scope
> >>>>>>>>>>> of this KIP too much for that.
> >>>>>>>>>>>
> >>>>>>>>>>> Let me know if that makes sense! Basically I view this as a
> good
> >>>>>>> middle
> >>>>>>>>>>> ground that doesn't compromise semantics but probably addresses
> >>>>>> most
> >>>>>>>> real
> >>>>>>>>>>> (or near-real) time streaming use cases.
> >>>>>>>>>>>
> >>>>>>>>>>> - Almog
> >>>>>>>>>>>
> >>>>>>>>>>> On Tue, Jan 28, 2025 at 7:15 AM Bruno Cadonna
> >>>>>>>>>>> <cado...@apache.org
> >>>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi Almog,
> >>>>>>>>>>>>
> >>>>>>>>>>>> I had similar thoughts as Lucas. When I read the KIP, I asked
> >>>>>> myself
> >>>>>>>> why
> >>>>>>>>>>>> are the windows not specified on number of records instead of
> >>> time
> >>>>>>> if
> >>>>>>>> we
> >>>>>>>>>>>> do not care about whether the event time of the records is
> >>>>>>>>>>>> in the
> >>>>>>> time
> >>>>>>>>>>>> range of the window?
> >>>>>>>>>>>>
> >>>>>>>>>>>> In your motivation, you write that users might collect small
> >>>>>> batches
> >>>>>>>> of
> >>>>>>>>>>>> records to be passed to a consumer that can handle batched
> >>>>>> messages
> >>>>>>>> more
> >>>>>>>>>>>> effectively than individual messages. Wouldn't it make more
> >>>>>>>>>>>> sense
> >>>>>> to
> >>>>>>>>>>>> have similarly sized batches?
> >>>>>>>>>>>> You could also consider to do something like the Kafka
> producer
> >>>>>> that
> >>>>>>>> has
> >>>>>>>>>>>> a batch size and a linger time.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Best,
> >>>>>>>>>>>> Bruno
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 28.01.25 15:44, Lucas Brutschy wrote:
> >>>>>>>>>>>>> Hi Almog,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> this seems useful to me. I don't see anything wrong with the
> >>>>>>> details
> >>>>>>>>>>>>> of the proposal.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> More generally, I'd like to hear your thoughts on this vs.
> >>>>>>> batching.
> >>>>>>>>>>>>> Given the motivation section, it sounds we actually want
> >>>>>> something
> >>>>>>>>>>>>> that I'd call "batching" rather than "windowing". If you do
> >>>>>>>>>>>>> not
> >>>>>>>> really
> >>>>>>>>>>>>> care about the including events that fall into different time
> >>>>>>>> windows,
> >>>>>>>>>>>>> and the order of events does not matter much, why are we
> >>>>>>> specifically
> >>>>>>>>>>>>> emitting based on stream-time? Would you expect this
> mechanism
> >>> to
> >>>>>>>>>>>>> extend also to emitting batches based on number of records,
> >>>>>>> byte-size
> >>>>>>>>>>>>> of batch, wall-clock time?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>> Lucas
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Thu, Jan 23, 2025 at 7:59 PM Matthias J. Sax <
> >>>>>> mj...@apache.org>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks, Almog.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Good call out about `TimeWindows` vs `TimeWindow` (yes, I am
> >>>>>> aware
> >>>>>>>> and
> >>>>>>>>>>>>>> was actually re-reading my previous email before sending it
> a
> >>>>>> few
> >>>>>>>>>> times
> >>>>>>>>>>>>>> to make sure I use the right one; it's very subtle.)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> For `TimeWindows` semantics are certainly well defined, and
> >>>>>> there
> >>>>>>> is
> >>>>>>>>>>>>>> nothing to be discussed.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> For `TimeWindow`, I am not sure as I said, and your
> >>>>>> interpretation
> >>>>>>>> as
> >>>>>>>>>>>>>> "just a container" might be fine, too. I agree to the
> >>>>>>>>>>>>>> problem,
> >>>>>>> that
> >>>>>>>> if
> >>>>>>>>>>>>>> we add something new, we might just leak it again, and
> >>>>>>>>>>>>>> thus not
> >>>>>>> gain
> >>>>>>>>>>>>>> much, so the lesser evil might be to just re-use
> `TimeWindow`
> >>> as
> >>>>>>> you
> >>>>>>>>>>>>>> propose. I just wanted to point out this question to sanity
> >>>>>> check,
> >>>>>>>> and
> >>>>>>>>>>>>>> collect feedback about it.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On 1/23/25 8:58 AM, Almog Gavra wrote:
> >>>>>>>>>>>>>>> Thanks Matthias for the quick and detailed feedback!
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Nit: it seems you are mixing the terms "out-of-order" and
> >>>>>> "late"
> >>>>>>>> and
> >>>>>>>>>>>>>>> using them as synonymous, what we usually not do.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> M1. Ah, in my mind "late arriving" was after the window
> >>>>>>>>>>>>>>> closed
> >>>>>>> but
> >>>>>>>>>>>>>>> potentially before grace (and "out of order" was just
> >>>>>>>>>>>>>>> anything
> >>>>>>> that
> >>>>>>>>>> is
> >>>>>>>>>>>> out
> >>>>>>>>>>>>>>> of order). Do we have a specific word for "after the window
> >>>>>>> closes
> >>>>>>>>>> but
> >>>>>>>>>>>>>>> before grace"? Maybe we should have "fashionably late" and
> >>> "too
> >>>>>>>> late"
> >>>>>>>>>>>>>>> (haha, just kidding).
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I'll clear up the terminology in the KIP.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Today the only way to implement this semantic is using a
> >>>>>>>>>> punctuation
> >>>>>>>>>>>>>>> and manually storing events in an aggregation (which is
> >>>>>>> inefficient
> >>>>>>>>>> for
> >>>>>>>>>>>>>>> many reasons and can cause severe bottlenecks)
> >>>>>>>>>>>>>>>> Not sure if I follow this argument? I don't see any
> >>>>>> relationship
> >>>>>>>> to
> >>>>>>>>>>>>>>> punctuations. Can you elaborate?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> M2. The way we've seen people implement the desired
> behavior
> >>> in
> >>>>>>>> this
> >>>>>>>>>>>> KIP is
> >>>>>>>>>>>>>>> to use a non-windowed aggregation on a stream, and then
> >>>>>>>>>>>>>>> use a
> >>>>>>>>>>>> punctuation
> >>>>>>>>>>>>>>> every N seconds to scan the table and emit and delete the
> >>>>>> records
> >>>>>>>> to
> >>>>>>>>>>>>>>> simulate a window closing. It's certainly suboptimal, but
> it
> >>>>>> gets
> >>>>>>>> you
> >>>>>>>>>>>> very
> >>>>>>>>>>>>>>> similar semantics to what I describe in the KIP.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> M3. Re: extends Windows<TimeWindow>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> (note for other readers, I think you get this Matthias)
> >>>>>>> TimeWindow
> >>>>>>>> is
> >>>>>>>>>>>>>>> distinct from TimeWindows (with an s at the end).
> >>>>>>>>>>>>>>> TimeWindows
> >>>>>>> (with
> >>>>>>>>>> an
> >>>>>>>>>>>> s)
> >>>>>>>>>>>>>>> implies exactly what you suggest. I do think, however, that
> >>>>>>>>>> TimeWindow
> >>>>>>>>>>>>>>> (without an s) is a perfect abstraction to leverage here.
> >>>>>>>>>>>>>>> That
> >>>>>>>> class
> >>>>>>>>>>>>>>> doesn't do anything except to indicate the bounds of a
> >>>>>>>>>>>>>>> window
> >>>>>> (it
> >>>>>>>>>> has a
> >>>>>>>>>>>>>>> start time, an end time and a definition of whether two
> >>> windows
> >>>>>>>>>>>> overlap).
> >>>>>>>>>>>>>>> The javadoc for TimeWindow (without an s) doesn't even
> >>>>>>>>>>>>>>> need to
> >>>>>>>> change
> >>>>>>>>>>>> to be
> >>>>>>>>>>>>>>> used in the way this KIP suggests.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> As suggested, the StreamTimeWindow still uses time as its
> >>>>>> bounds
> >>>>>>> -
> >>>>>>>> it
> >>>>>>>>>>>> just
> >>>>>>>>>>>>>>> doesn't use the event time to determine which time window
> >>>>>>>>>>>>>>> the
> >>>>>>> event
> >>>>>>>>>> is
> >>>>>>>>>>>>>>> placed in.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> M4. Re: I just realized that `Window` is in an `internal`
> >>>>>> package
> >>>>>>>> but
> >>>>>>>>>>>>>>> `TimeWindows#windowsFor` does return `Map<Long,
> TimeWindow>`
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Yeah, I was wondering about that as well. I also agree its
> a
> >>>>>> bit
> >>>>>>>>>>>> orthogonal
> >>>>>>>>>>>>>>> to the KIP and I think if we introduce a new "extends
> >>>>>>>>>>>>>>> Window"
> >>>>>>> type
> >>>>>>>>>>>> we'll
> >>>>>>>>>>>>>>> just leak that type as well, so I don't really see a
> benefit
> >>>>>> from
> >>>>>>>>>>>> that. If
> >>>>>>>>>>>>>>> we do decide to tackle the leaky abstraction we might as
> >>>>>>>>>>>>>>> well
> >>>>>>> just
> >>>>>>>>>> have
> >>>>>>>>>>>>>>> TimeWindow to handle :)
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> M5. Specifying abstract methods
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Can do!
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> M6. Naming
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I like "OffsetOrderedWindows", I will wait for some other
> >>>>>>> feedback
> >>>>>>>>>> and
> >>>>>>>>>>>> if
> >>>>>>>>>>>>>>> no one has something better I'll update the KIP to that.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Re: `extends Window<TimeWindow>` I do think the fact that
> it
> >>>>>>>> defines
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>> window size with time is important, we're not saying the
> >>> window
> >>>>>>>>>> closes
> >>>>>>>>>>>>>>> after N offsets have passed but rather the stream time has
> >>>>>>> passed N
> >>>>>>>>>>>> seconds
> >>>>>>>>>>>>>>> (or ms or whatever). I agree that it doesn't need to be
> part
> >>> of
> >>>>>>> the
> >>>>>>>>>>>> main
> >>>>>>>>>>>>>>> name, however.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Thu, Jan 23, 2025 at 12:14 AM Matthias J. Sax <
> >>>>>>> mj...@apache.org
> >>>>>>>>>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Interesting KIP. It's a known problem, and the proposed
> >>>>>> solution
> >>>>>>>>>> make
> >>>>>>>>>>>>>>>> sense to me.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Nit: it seems you are mixing the terms "out-of-order" and
> >>>>>> "late"
> >>>>>>>> and
> >>>>>>>>>>>>>>>> using them as synonymous, what we usually not do.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> "Out-of-order" is the more generic term, while "late"
> means
> >>>>>>> after
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>> grace period (hence, late arriving data is still
> >>> out-of-order,
> >>>>>>> but
> >>>>>>>>>>>> it's
> >>>>>>>>>>>>>>>> a subset of out-of-order records... all late records are
> >>>>>>>>>> out-of-order,
> >>>>>>>>>>>>>>>> but not the other way around). I think the KIP could gain
> >>>>>>> clarity
> >>>>>>>>>> and
> >>>>>>>>>>>>>>>> avoid potential confusion to use both terms not a
> >>>>>>>>>>>>>>>> synonymous.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> In the end, "late" data cannot be handled at all right
> now,
> >>>>>>> based
> >>>>>>>> on
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> definition of "late". "late" data is always dropped.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Today the only way to implement this semantic is using a
> >>>>>>>>>> punctuation
> >>>>>>>>>>>> and
> >>>>>>>>>>>>>>>> manually storing events in an aggregation (which is
> >>>>>> inefficient
> >>>>>>>> for
> >>>>>>>>>>>> many
> >>>>>>>>>>>>>>>> reasons and can cause severe bottlenecks)
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Not sure if I follow this argument? I don't see any
> >>>>>> relationship
> >>>>>>>> to
> >>>>>>>>>>>>>>>> punctuations. Can you elaborate?
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> It's also not clear to me, why "storing events in an
> >>>>>>> aggregation"
> >>>>>>>> is
> >>>>>>>>>>>>>>>> problematic, and/or how this new window type would avoid
> >>>>>>>>>>>>>>>> it?
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> extends Windows<TimeWindow>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Not sure if this is the right thing to do it? I understand
> >>>>>> your
> >>>>>>>>>>>> intent,
> >>>>>>>>>>>>>>>> but to me, `TimeWindow` has certain semantics, implying we
> >>>>>> only
> >>>>>>>> put
> >>>>>>>>>>>> data
> >>>>>>>>>>>>>>>> with record-ts between the window bounds into the
> >>>>>>>>>>>>>>>> window. Not
> >>>>>>> sure
> >>>>>>>>>> if
> >>>>>>>>>>>> my
> >>>>>>>>>>>>>>>> interpretation is off, and re-using could be ok? It's just
> >>>>>>>> something
> >>>>>>>>>>>> we
> >>>>>>>>>>>>>>>> should figure out I guess.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> To be fair: it's not 100% clearly specific if these
> >>>>>>>>>>>>>>>> semantics
> >>>>>>>> apply
> >>>>>>>>>> or
> >>>>>>>>>>>>>>>> not. If we believe they don't apply, happy to pivot on
> >>>>>>>>>>>>>>>> this,
> >>>>>> but
> >>>>>>>> we
> >>>>>>>>>>>>>>>> might want to update the JavaDocs, if we only want to
> >>>>>>>>>>>>>>>> use it
> >>>>>> as
> >>>>>>>>>>>>>>>> "container class" (for the lack of better word).
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> What actually raises a different issue: I just realized
> >>>>>>>>>>>>>>>> that
> >>>>>>>>>> `Window`
> >>>>>>>>>>>> is
> >>>>>>>>>>>>>>>> in an `internal` package, but `TimeWindows#windowsFor`
> does
> >>>>>>> return
> >>>>>>>>>>>>>>>> `Map<Long, TimeWindow>`, thus making it effectively kinda
> >>>>>>>> public...
> >>>>>>>>>> It
> >>>>>>>>>>>>>>>> seems we should we do something about this? End users do
> >>>>>>>>>>>>>>>> not
> >>>>>>>> really
> >>>>>>>>>>>> need
> >>>>>>>>>>>>>>>> to use `TimeWindow` -- it's really internal, and it seems
> >>>>>>>>>> `windowsFor`
> >>>>>>>>>>>>>>>> is effective a leaky abstraction...
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> If we keep `TimeWindow` internal, I think we can easily be
> >>>>>> more
> >>>>>>>>>>>> relaxed
> >>>>>>>>>>>>>>>> with applied semantics. But it's kinda leaking right now,
> >>> what
> >>>>>>>> does
> >>>>>>>>>>>> not
> >>>>>>>>>>>>>>>> make me happy... (Of course, I also don't want to derail
> >>>>>>>>>>>>>>>> and
> >>>>>>>> hijack
> >>>>>>>>>>>> this
> >>>>>>>>>>>>>>>> KIP for some related, but orthogonal issue in the API...)
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On other nit I am wondering about is, if it might be
> >>>>>>>>>>>>>>>> good to
> >>>>>>>> specify
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> abstract methods inherited from `Windows` on the KIP for
> >>>>>>> clarity?
> >>>>>>>>>> Even
> >>>>>>>>>>>>>>>> they are implicitly well specified it might be
> >>>>>>>>>>>>>>>> beneficial to
> >>>>>>> spell
> >>>>>>>>>> it
> >>>>>>>>>>>>>>>> out. After all, `StreamTimeWindow` needs to implement
> there
> >>>>>>>> methods.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> And of course
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Note: I really don't like the name I cam up with
> >>>>>>>>>>>> StreamTimeWindows... if
> >>>>>>>>>>>>>>>> anyone has a better name in mind please suggest one!
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I am not a fan of the name either. But not really sure
> >>>>>>>>>>>>>>>> what a
> >>>>>>>> better
> >>>>>>>>>>>>>>>> name could be. In the end, it's more of an "offset ordered
> >>>>>>> window"
> >>>>>>>>>>>>>>>> rather than a "time window", because it does not really
> >>>>>>>>>>>>>>>> take
> >>>>>> the
> >>>>>>>>>>>> record
> >>>>>>>>>>>>>>>> timestamps into account to figure out into what window a
> >>>>>> record
> >>>>>>>> goes
> >>>>>>>>>>>>>>>> into, but it really uses the offset order.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I believe that the size of window itself does not matter
> >>>>>>>>>>>>>>>> too
> >>>>>>> much
> >>>>>>>>>> for
> >>>>>>>>>>>>>>>> naming, so even if the size is defined in time, I don't
> >>>>>>>>>>>>>>>> think
> >>>>>>> that
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>> term "time" must be part of the window name. (That's
> >>>>>>>>>>>>>>>> also why
> >>>>>> I
> >>>>>>> am
> >>>>>>>>>>>>>>>> wondering if we should go with `extends<TimeWindow>` or
> add
> >>>>>>>>>> something
> >>>>>>>>>>>>>>>> new, which we hopefully can keep internal...)
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Not sure if we ever intent to make `StreamTimeWindows`
> >>>>>>>> overlapping?
> >>>>>>>>>>>>>>>> Given the described use-cases it seems very unlikely? So
> >>>>>>>> effectively
> >>>>>>>>>>>>>>>> it's a "tumbling window" . Of course, we should not just
> >>>>>>> overload
> >>>>>>>>>> this
> >>>>>>>>>>>>>>>> term, but there is no `TumblingWindows` in the API -- only
> >>> the
> >>>>>>>> term
> >>>>>>>>>>>>>>>> Tumbling Window in the docs to describe the special case
> of
> >>>>>>>>>>>>>>>> non-overlapping `TimeWindows`.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> So maybe `OffsetTumblingWindows` could work? Or
> >>>>>>>>>>>> `OffsetOrderedWindows`?
> >>>>>>>>>>>>>>>> Just spitballing here, to get a discussion going...
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On 1/22/25 8:26 PM, Almog Gavra wrote:
> >>>>>>>>>>>>>>>>> Hello!
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I'd like to initiate a discussion thread on KIP-1127:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/
> >>>>>> KIP-1127+Flexible+Windows+for+Late+Arriving+Data
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> This KIP aims to make it easier to specify windowing
> >>>>>> semantics
> >>>>>>>> that
> >>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>> more tolerable to late arriving data, particularly with
> >>>>>>>>>> suppression.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Would appreciate any and all feedback.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>> Almog
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >>
> >
>
>

Reply via email to