Following review feedback on the PR, I'm revising the PIP-484 design. The 
earlier [DISCUSS] message described a new IncrementalWindowFunction interface;
that approach is dropped. The updated proposal exposes the public Window 
interface and adds a default process(Window, WindowContext) overload on 
WindowFunction.
The PR title and PIP text have been updated accordingly: 
https://github.com/apache/pulsar/pull/25967 Right now 
WindowFunction.process(Collection<Record<X>>, WindowContext) gets every message 
in the window on each trigger. The runtime already does more work than that. On 
each activation, WindowManager splits events into three lists: everything 
currently in the window, what was added since the last trigger, and what 
expired. Those lists go into WindowImpl, but WindowFunctionExecutor only passes 
window.get() through to user code. getNew() and getExpired() are built and then 
dropped.


If you're doing incremental work on a sliding window (a running sum, a count, 
anything that only needs the delta), you either rescan the full collection 
every time or keep your own diff logic on the side.


The fix is mostly wiring up data the runtime already has. First, promote the 
internal Window<T> interface to org.apache.pulsar.functions.api.Window and 
delete the copy under org.apache.pulsar.functions.windowing. The public 
interface exposes get(), getNew(), getExpired(), and getStartTimestamp() / 
getEndTimestamp() for the current activation. Javadoc will say the Window 
object is only valid for one process() call, and that the lists are snapshots 
you should not modify or hold onto across triggers.


Second, add a default overload on WindowFunction:


default T process(Window<Record<X>> window, WindowContext context) throws 
Exception {
return process(window.get(), context);
}
WindowFunction stays a @FunctionalInterface. The collection-based method is 
still the single abstract method, so existing lambdas keep today's behavior.


Third, change WindowFunctionExecutor to call 
windowFunction.process(inputWindow, context) instead of passing only 
inputWindow.get(). If you implement WindowFunction as a class and override 
process(Window, ...), that's what the runtime calls. Your process(Collection, 
...) override won't run unless the window overload delegates to it (for example 
return process(window.get(), context)).


A sliding-window sum is the straightforward case: add values from getNew(), 
subtract values from getExpired(), and skip a full rescan.


One limitation worth calling out: lambdas can't override default methods, so 
they stay on the collection path and can't see getNew() / getExpired(). 
Incremental processing needs a class-based implementation. We looked at putting 
Window on WindowContext instead, which would cover lambdas, but the overload on 
WindowFunction looked like the smaller change. If you think the lambda case 
matters more, I'd like to hear it.


Nothing else moves: no new config, REST endpoints, protocol, or metrics. 
Deployment validation stays the same because users still implement 
WindowFunction. Python and Go are out of scope here.


An earlier draft added a separate IncrementalWindowFunction interface. Review 
pushed us toward extending WindowFunction with a default overload instead, 
mainly to avoid extra submit-time checks and dispatch-order rules when a class 
implements more than one function interface.


Motivation, API contracts, backward-compat notes, alternatives, and a worked 
example are in the PIP on the PR. Comments here or on the PR are welcome.


Thanks,
Dream95







At 2026-06-08 18:50:53, "Dream95" <[email protected]> wrote:
>Hi all,
>I'd like to start a discussion on PIP-484: Expose Incremental Window Events
>via IncrementalWindowFunction.
>This proposal adds an IncrementalWindowFunction API so users can process
>window events incrementally (onEvict, onTrigger) instead of only receiving
>the full window batch on each fire.
>
>
>PIP PR: https://github.com/apache/pulsar/pull/25967
>
>Thanks.
>

Reply via email to