[
https://issues.apache.org/jira/browse/FLINK-25662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Thomas Weise updated FLINK-25662:
---------------------------------
Fix Version/s: (was: 1.14.3)
> Access KeyedStateStore in WindowAssigner
> ----------------------------------------
>
> Key: FLINK-25662
> URL: https://issues.apache.org/jira/browse/FLINK-25662
> Project: Flink
> Issue Type: Improvement
> Components: API / DataStream
> Affects Versions: 1.14.2
> Reporter: Sun Sheng
> Priority: Major
>
> Currently, when implement our own WindowAssigner, by the
> WindowAssignerContext, we can call only one function of
> getCurrentProcessingTime(), but there's no function to access the
> KeyedStateStore, in some cases I think it's necessary.
>
> In a stock trading, the events are controlled by it's status, such as
> non-trading, continuous-trading, e.g. and the status is in separated events,
> and we want to merge the windows from non-trading to continuous-trading as
> one window, by assigning a long-window for non-trading and then merge them
> for continuous-trading we can do it(using MergingWindowAssigner).
> But for recovery, we have to save the status to the KeyedStateStore, because
> the status is in separated events.
> At present, we have to add a KeyedProcessFunction before windowing, to save
> the status and send the status to the window assigner, it works but the code
> become more bloat:
> {code:java}
> stream.keyBy(...)
> .process(MyProcessFunction)
> .keyBy(...)
> .window(MyWindowAssigner)
> ... {code}
>
> I have tried to modify the Flink source code locally, by adding a function to
> WindowAssignerContext and implement it in WindowOperator:
> WindowAssigner.java
> {code:java}
> public abstract static class WindowAssignerContext {
> /** Returns the current processing time. */
> public abstract long getCurrentProcessingTime();
> public abstract KeyedStateStore globalState();
> }{code}
> WindowOperator.java
> {code:java}
> ...
> windowAssignerContext = new WindowAssigner.WindowAssignerContext() {
> @Override
> public long getCurrentProcessingTime() {
> return internalTimerService.currentProcessingTime();
> } @Override
> public KeyedStateStore globalState() {
> return WindowOperator.this.getKeyedStateStore();
> }
> };
> ...{code}
> The implementation is same as the WindowContext, very simple.
>
> By testing locally everything works fine, and my code becomes more clean.
> If it's useful I'm glad to work on it. :)
--
This message was sent by Atlassian Jira
(v8.20.1#820001)