[ 
https://issues.apache.org/jira/browse/FLINK-3714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15331694#comment-15331694
 ] 

ASF GitHub Bot commented on FLINK-3714:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2093#discussion_r67156546
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
    @@ -450,17 +436,140 @@ public final void trigger(long time) throws 
Exception {
                                context.key = timer.key;
                                context.window = timer.window;
                                setKeyContext(timer.key);
    +
    +                           AppendingState<IN, ACC> windowState;
    +                           MergingWindowSet<W> mergingWindows = null;
    +
    +                           if (windowAssigner instanceof 
MergingWindowAssigner) {
    +                                   mergingWindows = getMergingWindowSet();
    +                                   W stateWindow = 
mergingWindows.getStateWindow(context.window);
    +                                   windowState = 
getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
    +                           } else {
    +                                   windowState = 
getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
    +                           }
    +
                                TriggerResult triggerResult = 
context.onProcessingTime(timer.timestamp);
    -                           processTriggerResult(triggerResult, 
context.window);
    +                           fireOrContinue(triggerResult, context.window, 
windowState);
    +
    +                           if (triggerResult.isPurge() || 
(!windowAssigner.isEventTime() && isCleanupTime(timer.window, 
timer.timestamp))) {
    +                                   cleanup(timer.window, windowState, 
mergingWindows);
    +                           }
    +
                        } else {
                                fire = false;
                        }
                } while (fire);
    +   }
    +
    +   /**
    +    * Cleans up the window state if the provided {@link TriggerResult} 
requires so, or if it
    +    * is time to do so (see {@link #isCleanupTime(Window, long)}). The 
caller must ensure that the
    +    * correct key is set in the state backend and the context object.
    +    */
    +   private void cleanup(W window,
    +                                           AppendingState<IN, ACC> 
windowState,
    +                                           MergingWindowSet<W> 
mergingWindows) throws Exception {
    +           windowState.clear();
    +           if (mergingWindows != null) {
    +                   mergingWindows.retireWindow(window);
    +           }
    +           context.clear();
    +           deleteCleanupTimer(window);
    +   }
    +
    +   /**
    +    * Triggers the window computation if the provided {@link 
TriggerResult} requires so.
    +    * The caller must ensure that the correct key is set in the state 
backend and the context object.
    +    */
    +   @SuppressWarnings("unchecked")
    +   private void fireOrContinue(TriggerResult triggerResult,
    +                                                           W window,
    +                                                           
AppendingState<IN, ACC> windowState) throws Exception {
    +           if (!triggerResult.isFire()) {
    +                   return;
    +           }
    +
    +           
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
    +           ACC contents = windowState.get();
    +           userFunction.apply(context.key, context.window, contents, 
timestampedCollector);
    +   }
    +
    +   /**
    +    * Retrieves the {@link MergingWindowSet} for the currently active key.
    +    * The caller must ensure that the correct key is set in the state 
backend.
    +    */
    +   @SuppressWarnings("unchecked")
    +   protected MergingWindowSet<W> getMergingWindowSet() throws Exception {
    +           MergingWindowSet<W> mergingWindows = 
mergingWindowsByKey.get((K) getStateBackend().getCurrentKey());
    +           if (mergingWindows == null) {
    +                   // try to retrieve from state
     
    -           // Also check any watermark timers. We might have some in here 
since
    -           // Context.registerEventTimeTimer sets a trigger if an 
event-time trigger is registered
    -           // that is already behind the watermark.
    -           processTriggersFor(new Watermark(currentWatermark));
    +                   TupleSerializer<Tuple2<W, W>> tupleSerializer = new 
TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[] {windowSerializer, 
windowSerializer} );
    +                   ListStateDescriptor<Tuple2<W, W>> mergeStateDescriptor 
= new ListStateDescriptor<>("merging-window-set", tupleSerializer);
    +                   ListState<Tuple2<W, W>> mergeState = 
getStateBackend().getPartitionedState(null, VoidSerializer.INSTANCE, 
mergeStateDescriptor);
    +
    +                   mergingWindows = new 
MergingWindowSet<>((MergingWindowAssigner<? super IN, W>) windowAssigner, 
mergeState);
    +                   mergeState.clear();
    +
    +                   mergingWindowsByKey.put((K) 
getStateBackend().getCurrentKey(), mergingWindows);
    +           }
    +           return mergingWindows;
    +   }
    +
    +   /**
    +    * This method decides if a window is currently active, or not, based 
on the current
    --- End diff --
    
    I would prefer this to start with "Decides if a window is late nor not, 
based on ...". It is not clear what `active` means here and `late` describes 
pretty well what the method does.


> Add Support for "Allowed Lateness"
> ----------------------------------
>
>                 Key: FLINK-3714
>                 URL: https://issues.apache.org/jira/browse/FLINK-3714
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Streaming
>            Reporter: Aljoscha Krettek
>            Assignee: Kostas Kloudas
>
> As mentioned in 
> https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit#
>  we should add support for an allowed lateness setting.
> This includes several things:
>  - API for setting allowed lateness
>  - Dropping of late elements 
>  - Garbage collection of windows state/timers
> Depending on whether the {{WindowAssigner}} assigns windows based on event 
> time or processing time we have to adjust the GC behavior. For event-time 
> windows "allowed lateness" makes sense and we should garbage collect after 
> this expires. For processing-time windows "allowed lateness" does not make 
> sense and we should always GC window state/timers at the end timestamp of a 
> processing-time window. I think that we need a method for this on 
> {{WindowAssigner}} that allows to differentiate between event-time windows 
> and processing-time windows: {{boolean WindowAssigner.isEventTime()}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to