Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2093#discussion_r67156546
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
    @@ -450,17 +436,140 @@ public final void trigger(long time) throws 
Exception {
                                context.key = timer.key;
                                context.window = timer.window;
                                setKeyContext(timer.key);
    +
    +                           AppendingState<IN, ACC> windowState;
    +                           MergingWindowSet<W> mergingWindows = null;
    +
    +                           if (windowAssigner instanceof 
MergingWindowAssigner) {
    +                                   mergingWindows = getMergingWindowSet();
    +                                   W stateWindow = 
mergingWindows.getStateWindow(context.window);
    +                                   windowState = 
getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
    +                           } else {
    +                                   windowState = 
getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
    +                           }
    +
                                TriggerResult triggerResult = 
context.onProcessingTime(timer.timestamp);
    -                           processTriggerResult(triggerResult, 
context.window);
    +                           fireOrContinue(triggerResult, context.window, 
windowState);
    +
    +                           if (triggerResult.isPurge() || 
(!windowAssigner.isEventTime() && isCleanupTime(timer.window, 
timer.timestamp))) {
    +                                   cleanup(timer.window, windowState, 
mergingWindows);
    +                           }
    +
                        } else {
                                fire = false;
                        }
                } while (fire);
    +   }
    +
    +   /**
    +    * Cleans up the window state if the provided {@link TriggerResult} 
requires so, or if it
    +    * is time to do so (see {@link #isCleanupTime(Window, long)}). The 
caller must ensure that the
    +    * correct key is set in the state backend and the context object.
    +    */
    +   private void cleanup(W window,
    +                                           AppendingState<IN, ACC> 
windowState,
    +                                           MergingWindowSet<W> 
mergingWindows) throws Exception {
    +           windowState.clear();
    +           if (mergingWindows != null) {
    +                   mergingWindows.retireWindow(window);
    +           }
    +           context.clear();
    +           deleteCleanupTimer(window);
    +   }
    +
    +   /**
    +    * Triggers the window computation if the provided {@link 
TriggerResult} requires so.
    +    * The caller must ensure that the correct key is set in the state 
backend and the context object.
    +    */
    +   @SuppressWarnings("unchecked")
    +   private void fireOrContinue(TriggerResult triggerResult,
    +                                                           W window,
    +                                                           
AppendingState<IN, ACC> windowState) throws Exception {
    +           if (!triggerResult.isFire()) {
    +                   return;
    +           }
    +
    +           
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
    +           ACC contents = windowState.get();
    +           userFunction.apply(context.key, context.window, contents, 
timestampedCollector);
    +   }
    +
    +   /**
    +    * Retrieves the {@link MergingWindowSet} for the currently active key.
    +    * The caller must ensure that the correct key is set in the state 
backend.
    +    */
    +   @SuppressWarnings("unchecked")
    +   protected MergingWindowSet<W> getMergingWindowSet() throws Exception {
    +           MergingWindowSet<W> mergingWindows = 
mergingWindowsByKey.get((K) getStateBackend().getCurrentKey());
    +           if (mergingWindows == null) {
    +                   // try to retrieve from state
     
    -           // Also check any watermark timers. We might have some in here 
since
    -           // Context.registerEventTimeTimer sets a trigger if an 
event-time trigger is registered
    -           // that is already behind the watermark.
    -           processTriggersFor(new Watermark(currentWatermark));
    +                   TupleSerializer<Tuple2<W, W>> tupleSerializer = new 
TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[] {windowSerializer, 
windowSerializer} );
    +                   ListStateDescriptor<Tuple2<W, W>> mergeStateDescriptor 
= new ListStateDescriptor<>("merging-window-set", tupleSerializer);
    +                   ListState<Tuple2<W, W>> mergeState = 
getStateBackend().getPartitionedState(null, VoidSerializer.INSTANCE, 
mergeStateDescriptor);
    +
    +                   mergingWindows = new 
MergingWindowSet<>((MergingWindowAssigner<? super IN, W>) windowAssigner, 
mergeState);
    +                   mergeState.clear();
    +
    +                   mergingWindowsByKey.put((K) 
getStateBackend().getCurrentKey(), mergingWindows);
    +           }
    +           return mergingWindows;
    +   }
    +
    +   /**
    +    * This method decides if a window is currently active, or not, based 
on the current
    --- End diff --
    
    I would prefer this to start with "Decides if a window is late nor not, 
based on ...". It is not clear what `active` means here and `late` describes 
pretty well what the method does.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to