Following review feedback on the PR, I'm revising the PIP-484 design. The
earlier [DISCUSS] message described a new IncrementalWindowFunction interface;
that approach is dropped. The updated proposal exposes the public Window
interface and adds a default process(Window, WindowContext) overload on
WindowFunction.
The PR title and PIP text have been updated accordingly:
https://github.com/apache/pulsar/pull/25967 Right now
WindowFunction.process(Collection<Record<X>>, WindowContext) gets every message
in the window on each trigger. The runtime already does more work than that. On
each activation, WindowManager splits events into three lists: everything
currently in the window, what was added since the last trigger, and what
expired. Those lists go into WindowImpl, but WindowFunctionExecutor only passes
window.get() through to user code. getNew() and getExpired() are built and then
dropped.
If you're doing incremental work on a sliding window (a running sum, a count,
anything that only needs the delta), you either rescan the full collection
every time or keep your own diff logic on the side.
The fix is mostly wiring up data the runtime already has. First, promote the
internal Window<T> interface to org.apache.pulsar.functions.api.Window and
delete the copy under org.apache.pulsar.functions.windowing. The public
interface exposes get(), getNew(), getExpired(), and getStartTimestamp() /
getEndTimestamp() for the current activation. Javadoc will say the Window
object is only valid for one process() call, and that the lists are snapshots
you should not modify or hold onto across triggers.
Second, add a default overload on WindowFunction:
default T process(Window<Record<X>> window, WindowContext context) throws
Exception {
return process(window.get(), context);
}
WindowFunction stays a @FunctionalInterface. The collection-based method is
still the single abstract method, so existing lambdas keep today's behavior.
Third, change WindowFunctionExecutor to call
windowFunction.process(inputWindow, context) instead of passing only
inputWindow.get(). If you implement WindowFunction as a class and override
process(Window, ...), that's what the runtime calls. Your process(Collection,
...) override won't run unless the window overload delegates to it (for example
return process(window.get(), context)).
A sliding-window sum is the straightforward case: add values from getNew(),
subtract values from getExpired(), and skip a full rescan.
One limitation worth calling out: lambdas can't override default methods, so
they stay on the collection path and can't see getNew() / getExpired().
Incremental processing needs a class-based implementation. We looked at putting
Window on WindowContext instead, which would cover lambdas, but the overload on
WindowFunction looked like the smaller change. If you think the lambda case
matters more, I'd like to hear it.
Nothing else moves: no new config, REST endpoints, protocol, or metrics.
Deployment validation stays the same because users still implement
WindowFunction. Python and Go are out of scope here.
An earlier draft added a separate IncrementalWindowFunction interface. Review
pushed us toward extending WindowFunction with a default overload instead,
mainly to avoid extra submit-time checks and dispatch-order rules when a class
implements more than one function interface.
Motivation, API contracts, backward-compat notes, alternatives, and a worked
example are in the PIP on the PR. Comments here or on the PR are welcome.
Thanks,
Dream95
At 2026-06-08 18:50:53, "Dream95" <[email protected]> wrote:
>Hi all,
>I'd like to start a discussion on PIP-484: Expose Incremental Window Events
>via IncrementalWindowFunction.
>This proposal adds an IncrementalWindowFunction API so users can process
>window events incrementally (onEvict, onTrigger) instead of only receiving
>the full window batch on each fire.
>
>
>PIP PR: https://github.com/apache/pulsar/pull/25967
>
>Thanks.
>