[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15886419#comment-15886419 ]
Shannon Carey commented on FLINK-5929: -------------------------------------- If I understand correctly, I agree this would be useful. Currently we are working around this limitation in order to achieve communication between the Trigger (per-pane state) and the WindowFunction (per-operator state) by a hack within the WindowFunction that looks like this (we're not on 1.2 yet so we haven't looked at new ways to do this yet): {code} def apply(key: String, window: TimeWindow, input, out): Unit = { val fireTimestampState: ValueState[java.lang.Long] = getRuntimeContext.getState[java.lang.Long](fireTimestampStateDescriptor) if (fireTimestampState.isInstanceOf[MemValueState[String, TimeWindow, java.lang.Long]]) { fireTimestampState.asInstanceOf[MemValueState[String, TimeWindow, java.lang.Long]].setCurrentNamespace(window) } else if (fireTimestampState.isInstanceOf[RocksDBValueState[String, TimeWindow, java.lang.Long]]) { fireTimestampState.asInstanceOf[RocksDBValueState[String, TimeWindow, java.lang.Long]].setCurrentNamespace(window) } else if (fireTimestampState.isInstanceOf[FsValueState[String, TimeWindow, java.lang.Long]]) { fireTimestampState.asInstanceOf[FsValueState[String, TimeWindow, java.lang.Long]].setCurrentNamespace(window) } fireTimestampState.value() ... {code} > Allow Access to Per-Window State in ProcessWindowFunction > --------------------------------------------------------- > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API > Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)