[
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15886419#comment-15886419
]
Shannon Carey commented on FLINK-5929:
--------------------------------------
If I understand correctly, I agree this would be useful. Currently we are
working around this limitation in order to achieve communication between the
Trigger (per-pane state) and the WindowFunction (per-operator state) by a hack
within the WindowFunction that looks like this (we're not on 1.2 yet so we
haven't looked at new ways to do this yet):
{code}
def apply(key: String, window: TimeWindow, input, out): Unit = {
val fireTimestampState: ValueState[java.lang.Long] =
getRuntimeContext.getState[java.lang.Long](fireTimestampStateDescriptor)
if (fireTimestampState.isInstanceOf[MemValueState[String, TimeWindow,
java.lang.Long]]) {
fireTimestampState.asInstanceOf[MemValueState[String, TimeWindow,
java.lang.Long]].setCurrentNamespace(window)
} else if (fireTimestampState.isInstanceOf[RocksDBValueState[String,
TimeWindow, java.lang.Long]]) {
fireTimestampState.asInstanceOf[RocksDBValueState[String, TimeWindow,
java.lang.Long]].setCurrentNamespace(window)
} else if (fireTimestampState.isInstanceOf[FsValueState[String, TimeWindow,
java.lang.Long]]) {
fireTimestampState.asInstanceOf[FsValueState[String, TimeWindow,
java.lang.Long]].setCurrentNamespace(window)
}
fireTimestampState.value()
...
{code}
> Allow Access to Per-Window State in ProcessWindowFunction
> ---------------------------------------------------------
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
> Issue Type: Improvement
> Components: DataStream API
> Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}}
> can access is scoped to the key of the window but not the window itself. That
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For
> example, if you expect to have several {{Trigger}} firings (due to early and
> late firings) a user can keep state per window to keep some information
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two
> options:
> - Keep track of all state that a user uses and clean up when we reach the
> window GC horizon.
> - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called
> when we reach the window GC horizon that users can/should use to clean up
> their state.
> On the API side, we can add a method {{windowState()}} on
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and
> {{globalState()}} that would allow access to the (already available) global
> state. The {{Context}} would then look like this:
> {code}
> /**
> * The context holding window metadata
> */
> public abstract class Context {
> /**
> * @return The window that is being evaluated.
> */
> public abstract W window();
> /**
> * State accessor for per-key and per-window state.
> */
> KeyedStateStore windowState();
> /**
> * State accessor for per-key global state.
> */
> KeyedStateStore globalState();
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)