[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15897448#comment-15897448
 ] 

ASF GitHub Bot commented on FLINK-5929:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3479#discussion_r104432787
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
    @@ -629,6 +645,135 @@ protected final boolean isCleanupTime(W window, long 
time) {
        }
     
        /**
    +    * For now keyed state is not allowed in ProcessWindowFunctions
    +    */
    +   public class MergingKeyStore implements KeyedStateStore {
    +
    +           protected W window;
    +
    +           @Override
    +           public <T> ValueState<T> getState(ValueStateDescriptor<T> 
stateProperties) {
    +                   throw new RuntimeException("keyedState is not allowed 
in merging windows");
    +           }
    +
    +           @Override
    +           public <T> ListState<T> getListState(ListStateDescriptor<T> 
stateProperties) {
    +                   throw new RuntimeException("keyedState is not allowed 
in merging windows");
    +           }
    +
    +           @Override
    +           public <T> ReducingState<T> 
getReducingState(ReducingStateDescriptor<T> stateProperties) {
    +                   throw new RuntimeException("keyedState is not allowed 
in merging windows");
    +           }
    +
    +           @Override
    +           public <T, A> FoldingState<T, A> 
getFoldingState(FoldingStateDescriptor<T, A> stateProperties) {
    +                   throw new RuntimeException("keyedState is not allowed 
in merging windows");
    +           }
    +
    +           @Override
    +           public <UK, UV> MapState<UK, UV> 
getMapState(MapStateDescriptor<UK, UV> stateProperties) {
    +                   throw new RuntimeException("keyedState is not allowed 
in merging windows");
    +           }
    +   }
    +
    +   public class WindowPaneKeyStore implements KeyedStateStore {
    +
    +           protected W window;
    +
    +           @Override
    +           public <T> ValueState<T> getState(ValueStateDescriptor<T> 
stateProperties) {
    +                   try {
    +                           return 
WindowOperator.this.getPartitionedState(window, windowSerializer, 
stateProperties);
    +                   } catch (Exception e) {
    +                           throw new RuntimeException("Could not retrieve 
state", e);
    +                   }
    +           }
    +
    +           @Override
    +           public <T> ListState<T> getListState(ListStateDescriptor<T> 
stateProperties) {
    +                   try {
    +                           return 
WindowOperator.this.getPartitionedState(window, windowSerializer, 
stateProperties);
    +                   } catch (Exception e) {
    +                           throw new RuntimeException("Could not retrieve 
state", e);
    +                   }
    +           }
    +
    +           @Override
    +           public <T> ReducingState<T> 
getReducingState(ReducingStateDescriptor<T> stateProperties) {
    +                   try {
    +                           return 
WindowOperator.this.getPartitionedState(window, windowSerializer, 
stateProperties);
    +                   } catch (Exception e) {
    +                           throw new RuntimeException("Could not retrieve 
state", e);
    +                   }
    +           }
    +
    +           @Override
    +           public <T, ACC> FoldingState<T, ACC> 
getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
    +                   try {
    +                           return 
WindowOperator.this.getPartitionedState(window, windowSerializer, 
stateProperties);
    +                   } catch (Exception e) {
    +                           throw new RuntimeException("Could not retrieve 
state", e);
    +                   }
    +           }
    +
    +           @Override
    +           public <UK, UV> MapState<UK, UV> 
getMapState(MapStateDescriptor<UK, UV> stateProperties) {
    +                   try {
    +                           return 
WindowOperator.this.getPartitionedState(window, windowSerializer, 
stateProperties);
    +                   } catch (Exception e) {
    +                           throw new RuntimeException("Could not retrieve 
state", e);
    +                   }
    +           }
    +   }
    +
    +   /**
    +    * {@code WindowContext} is a utility for handling {@code 
ProcessWindowFunction} invocations. It can be reused
    +    * by setting the {@code key} and {@code window} fields. No internal 
state must be kept in
    +    * the {@code WindowContext}
    +    */
    +   public class WindowContext implements 
InternalWindowFunction.InternalWindowContext {
    +           protected W window;
    +
    +           protected WindowPaneKeyStore windowPaneKeyStore;
    +           protected MergingKeyStore mergingKeyStore;
    +
    +           public WindowContext(W window) {
    +                   this.window = window;
    +                   this.windowPaneKeyStore = new WindowPaneKeyStore();
    +                   this.mergingKeyStore = new MergingKeyStore();
    +           }
    +
    +           @Override
    +           public String toString() {
    +                   return "WindowContext{Window = " + window.toString() + 
"}";
    +           }
    +
    +           public void clear() throws Exception {
    +                   userFunction.clear(window, this);
    +           }
    +
    +           @Override
    +           public KeyedStateStore windowState() {
    +                   if (windowAssigner instanceof MergingWindowAssigner) {
    +                           return mergingKeyStore;
    +                   } else {
    +                           this.windowPaneKeyStore.window = window;
    +                           return this.windowPaneKeyStore;
    +                   }
    +           }
    +
    +           @Override
    +           public KeyedStateStore globalState() {
    +                   if (windowAssigner instanceof MergingWindowAssigner) {
    --- End diff --
    
    I think access to global state is fine for merging windows.


> Allow Access to Per-Window State in ProcessWindowFunction
> ---------------------------------------------------------
>
>                 Key: FLINK-5929
>                 URL: https://issues.apache.org/jira/browse/FLINK-5929
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API
>            Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
>     /**
>      * @return The window that is being evaluated.
>      */
>     public abstract W window();
>     /**
>      * State accessor for per-key and per-window state.
>      */
>     KeyedStateStore windowState();
>     /**
>      * State accessor for per-key global state.
>      */
>     KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to