[
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15897448#comment-15897448
]
ASF GitHub Bot commented on FLINK-5929:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/3479#discussion_r104432787
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
---
@@ -629,6 +645,135 @@ protected final boolean isCleanupTime(W window, long
time) {
}
/**
+ * For now keyed state is not allowed in ProcessWindowFunctions
+ */
+ public class MergingKeyStore implements KeyedStateStore {
+
+ protected W window;
+
+ @Override
+ public <T> ValueState<T> getState(ValueStateDescriptor<T>
stateProperties) {
+ throw new RuntimeException("keyedState is not allowed
in merging windows");
+ }
+
+ @Override
+ public <T> ListState<T> getListState(ListStateDescriptor<T>
stateProperties) {
+ throw new RuntimeException("keyedState is not allowed
in merging windows");
+ }
+
+ @Override
+ public <T> ReducingState<T>
getReducingState(ReducingStateDescriptor<T> stateProperties) {
+ throw new RuntimeException("keyedState is not allowed
in merging windows");
+ }
+
+ @Override
+ public <T, A> FoldingState<T, A>
getFoldingState(FoldingStateDescriptor<T, A> stateProperties) {
+ throw new RuntimeException("keyedState is not allowed
in merging windows");
+ }
+
+ @Override
+ public <UK, UV> MapState<UK, UV>
getMapState(MapStateDescriptor<UK, UV> stateProperties) {
+ throw new RuntimeException("keyedState is not allowed
in merging windows");
+ }
+ }
+
+ public class WindowPaneKeyStore implements KeyedStateStore {
+
+ protected W window;
+
+ @Override
+ public <T> ValueState<T> getState(ValueStateDescriptor<T>
stateProperties) {
+ try {
+ return
WindowOperator.this.getPartitionedState(window, windowSerializer,
stateProperties);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not retrieve
state", e);
+ }
+ }
+
+ @Override
+ public <T> ListState<T> getListState(ListStateDescriptor<T>
stateProperties) {
+ try {
+ return
WindowOperator.this.getPartitionedState(window, windowSerializer,
stateProperties);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not retrieve
state", e);
+ }
+ }
+
+ @Override
+ public <T> ReducingState<T>
getReducingState(ReducingStateDescriptor<T> stateProperties) {
+ try {
+ return
WindowOperator.this.getPartitionedState(window, windowSerializer,
stateProperties);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not retrieve
state", e);
+ }
+ }
+
+ @Override
+ public <T, ACC> FoldingState<T, ACC>
getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
+ try {
+ return
WindowOperator.this.getPartitionedState(window, windowSerializer,
stateProperties);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not retrieve
state", e);
+ }
+ }
+
+ @Override
+ public <UK, UV> MapState<UK, UV>
getMapState(MapStateDescriptor<UK, UV> stateProperties) {
+ try {
+ return
WindowOperator.this.getPartitionedState(window, windowSerializer,
stateProperties);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not retrieve
state", e);
+ }
+ }
+ }
+
+ /**
+ * {@code WindowContext} is a utility for handling {@code
ProcessWindowFunction} invocations. It can be reused
+ * by setting the {@code key} and {@code window} fields. No internal
state must be kept in
+ * the {@code WindowContext}
+ */
+ public class WindowContext implements
InternalWindowFunction.InternalWindowContext {
+ protected W window;
+
+ protected WindowPaneKeyStore windowPaneKeyStore;
+ protected MergingKeyStore mergingKeyStore;
+
+ public WindowContext(W window) {
+ this.window = window;
+ this.windowPaneKeyStore = new WindowPaneKeyStore();
+ this.mergingKeyStore = new MergingKeyStore();
+ }
+
+ @Override
+ public String toString() {
+ return "WindowContext{Window = " + window.toString() +
"}";
+ }
+
+ public void clear() throws Exception {
+ userFunction.clear(window, this);
+ }
+
+ @Override
+ public KeyedStateStore windowState() {
+ if (windowAssigner instanceof MergingWindowAssigner) {
+ return mergingKeyStore;
+ } else {
+ this.windowPaneKeyStore.window = window;
+ return this.windowPaneKeyStore;
+ }
+ }
+
+ @Override
+ public KeyedStateStore globalState() {
+ if (windowAssigner instanceof MergingWindowAssigner) {
--- End diff --
I think access to global state is fine for merging windows.
> Allow Access to Per-Window State in ProcessWindowFunction
> ---------------------------------------------------------
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
> Issue Type: Improvement
> Components: DataStream API
> Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}}
> can access is scoped to the key of the window but not the window itself. That
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For
> example, if you expect to have several {{Trigger}} firings (due to early and
> late firings) a user can keep state per window to keep some information
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two
> options:
> - Keep track of all state that a user uses and clean up when we reach the
> window GC horizon.
> - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called
> when we reach the window GC horizon that users can/should use to clean up
> their state.
> On the API side, we can add a method {{windowState()}} on
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and
> {{globalState()}} that would allow access to the (already available) global
> state. The {{Context}} would then look like this:
> {code}
> /**
> * The context holding window metadata
> */
> public abstract class Context {
> /**
> * @return The window that is being evaluated.
> */
> public abstract W window();
> /**
> * State accessor for per-key and per-window state.
> */
> KeyedStateStore windowState();
> /**
> * State accessor for per-key global state.
> */
> KeyedStateStore globalState();
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)