Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/2093#discussion_r67156546
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
---
@@ -450,17 +436,140 @@ public final void trigger(long time) throws
Exception {
context.key = timer.key;
context.window = timer.window;
setKeyContext(timer.key);
+
+ AppendingState<IN, ACC> windowState;
+ MergingWindowSet<W> mergingWindows = null;
+
+ if (windowAssigner instanceof
MergingWindowAssigner) {
+ mergingWindows = getMergingWindowSet();
+ W stateWindow =
mergingWindows.getStateWindow(context.window);
+ windowState =
getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
+ } else {
+ windowState =
getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
+ }
+
TriggerResult triggerResult =
context.onProcessingTime(timer.timestamp);
- processTriggerResult(triggerResult,
context.window);
+ fireOrContinue(triggerResult, context.window,
windowState);
+
+ if (triggerResult.isPurge() ||
(!windowAssigner.isEventTime() && isCleanupTime(timer.window,
timer.timestamp))) {
+ cleanup(timer.window, windowState,
mergingWindows);
+ }
+
} else {
fire = false;
}
} while (fire);
+ }
+
+ /**
+ * Cleans up the window state if the provided {@link TriggerResult}
requires so, or if it
+ * is time to do so (see {@link #isCleanupTime(Window, long)}). The
caller must ensure that the
+ * correct key is set in the state backend and the context object.
+ */
+ private void cleanup(W window,
+ AppendingState<IN, ACC>
windowState,
+ MergingWindowSet<W>
mergingWindows) throws Exception {
+ windowState.clear();
+ if (mergingWindows != null) {
+ mergingWindows.retireWindow(window);
+ }
+ context.clear();
+ deleteCleanupTimer(window);
+ }
+
+ /**
+ * Triggers the window computation if the provided {@link
TriggerResult} requires so.
+ * The caller must ensure that the correct key is set in the state
backend and the context object.
+ */
+ @SuppressWarnings("unchecked")
+ private void fireOrContinue(TriggerResult triggerResult,
+ W window,
+
AppendingState<IN, ACC> windowState) throws Exception {
+ if (!triggerResult.isFire()) {
+ return;
+ }
+
+
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
+ ACC contents = windowState.get();
+ userFunction.apply(context.key, context.window, contents,
timestampedCollector);
+ }
+
+ /**
+ * Retrieves the {@link MergingWindowSet} for the currently active key.
+ * The caller must ensure that the correct key is set in the state
backend.
+ */
+ @SuppressWarnings("unchecked")
+ protected MergingWindowSet<W> getMergingWindowSet() throws Exception {
+ MergingWindowSet<W> mergingWindows =
mergingWindowsByKey.get((K) getStateBackend().getCurrentKey());
+ if (mergingWindows == null) {
+ // try to retrieve from state
- // Also check any watermark timers. We might have some in here
since
- // Context.registerEventTimeTimer sets a trigger if an
event-time trigger is registered
- // that is already behind the watermark.
- processTriggersFor(new Watermark(currentWatermark));
+ TupleSerializer<Tuple2<W, W>> tupleSerializer = new
TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[] {windowSerializer,
windowSerializer} );
+ ListStateDescriptor<Tuple2<W, W>> mergeStateDescriptor
= new ListStateDescriptor<>("merging-window-set", tupleSerializer);
+ ListState<Tuple2<W, W>> mergeState =
getStateBackend().getPartitionedState(null, VoidSerializer.INSTANCE,
mergeStateDescriptor);
+
+ mergingWindows = new
MergingWindowSet<>((MergingWindowAssigner<? super IN, W>) windowAssigner,
mergeState);
+ mergeState.clear();
+
+ mergingWindowsByKey.put((K)
getStateBackend().getCurrentKey(), mergingWindows);
+ }
+ return mergingWindows;
+ }
+
+ /**
+ * This method decides if a window is currently active, or not, based
on the current
--- End diff --
I would prefer this to start with "Decides if a window is late nor not,
based on ...". It is not clear what `active` means here and `late` describes
pretty well what the method does.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---