[
https://issues.apache.org/jira/browse/FLINK-3714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15331694#comment-15331694
]
ASF GitHub Bot commented on FLINK-3714:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/2093#discussion_r67156546
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
---
@@ -450,17 +436,140 @@ public final void trigger(long time) throws
Exception {
context.key = timer.key;
context.window = timer.window;
setKeyContext(timer.key);
+
+ AppendingState<IN, ACC> windowState;
+ MergingWindowSet<W> mergingWindows = null;
+
+ if (windowAssigner instanceof
MergingWindowAssigner) {
+ mergingWindows = getMergingWindowSet();
+ W stateWindow =
mergingWindows.getStateWindow(context.window);
+ windowState =
getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
+ } else {
+ windowState =
getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
+ }
+
TriggerResult triggerResult =
context.onProcessingTime(timer.timestamp);
- processTriggerResult(triggerResult,
context.window);
+ fireOrContinue(triggerResult, context.window,
windowState);
+
+ if (triggerResult.isPurge() ||
(!windowAssigner.isEventTime() && isCleanupTime(timer.window,
timer.timestamp))) {
+ cleanup(timer.window, windowState,
mergingWindows);
+ }
+
} else {
fire = false;
}
} while (fire);
+ }
+
+ /**
+ * Cleans up the window state if the provided {@link TriggerResult}
requires so, or if it
+ * is time to do so (see {@link #isCleanupTime(Window, long)}). The
caller must ensure that the
+ * correct key is set in the state backend and the context object.
+ */
+ private void cleanup(W window,
+ AppendingState<IN, ACC>
windowState,
+ MergingWindowSet<W>
mergingWindows) throws Exception {
+ windowState.clear();
+ if (mergingWindows != null) {
+ mergingWindows.retireWindow(window);
+ }
+ context.clear();
+ deleteCleanupTimer(window);
+ }
+
+ /**
+ * Triggers the window computation if the provided {@link
TriggerResult} requires so.
+ * The caller must ensure that the correct key is set in the state
backend and the context object.
+ */
+ @SuppressWarnings("unchecked")
+ private void fireOrContinue(TriggerResult triggerResult,
+ W window,
+
AppendingState<IN, ACC> windowState) throws Exception {
+ if (!triggerResult.isFire()) {
+ return;
+ }
+
+
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
+ ACC contents = windowState.get();
+ userFunction.apply(context.key, context.window, contents,
timestampedCollector);
+ }
+
+ /**
+ * Retrieves the {@link MergingWindowSet} for the currently active key.
+ * The caller must ensure that the correct key is set in the state
backend.
+ */
+ @SuppressWarnings("unchecked")
+ protected MergingWindowSet<W> getMergingWindowSet() throws Exception {
+ MergingWindowSet<W> mergingWindows =
mergingWindowsByKey.get((K) getStateBackend().getCurrentKey());
+ if (mergingWindows == null) {
+ // try to retrieve from state
- // Also check any watermark timers. We might have some in here
since
- // Context.registerEventTimeTimer sets a trigger if an
event-time trigger is registered
- // that is already behind the watermark.
- processTriggersFor(new Watermark(currentWatermark));
+ TupleSerializer<Tuple2<W, W>> tupleSerializer = new
TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[] {windowSerializer,
windowSerializer} );
+ ListStateDescriptor<Tuple2<W, W>> mergeStateDescriptor
= new ListStateDescriptor<>("merging-window-set", tupleSerializer);
+ ListState<Tuple2<W, W>> mergeState =
getStateBackend().getPartitionedState(null, VoidSerializer.INSTANCE,
mergeStateDescriptor);
+
+ mergingWindows = new
MergingWindowSet<>((MergingWindowAssigner<? super IN, W>) windowAssigner,
mergeState);
+ mergeState.clear();
+
+ mergingWindowsByKey.put((K)
getStateBackend().getCurrentKey(), mergingWindows);
+ }
+ return mergingWindows;
+ }
+
+ /**
+ * This method decides if a window is currently active, or not, based
on the current
--- End diff --
I would prefer this to start with "Decides if a window is late nor not,
based on ...". It is not clear what `active` means here and `late` describes
pretty well what the method does.
> Add Support for "Allowed Lateness"
> ----------------------------------
>
> Key: FLINK-3714
> URL: https://issues.apache.org/jira/browse/FLINK-3714
> Project: Flink
> Issue Type: Sub-task
> Components: Streaming
> Reporter: Aljoscha Krettek
> Assignee: Kostas Kloudas
>
> As mentioned in
> https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit#
> we should add support for an allowed lateness setting.
> This includes several things:
> - API for setting allowed lateness
> - Dropping of late elements
> - Garbage collection of windows state/timers
> Depending on whether the {{WindowAssigner}} assigns windows based on event
> time or processing time we have to adjust the GC behavior. For event-time
> windows "allowed lateness" makes sense and we should garbage collect after
> this expires. For processing-time windows "allowed lateness" does not make
> sense and we should always GC window state/timers at the end timestamp of a
> processing-time window. I think that we need a method for this on
> {{WindowAssigner}} that allows to differentiate between event-time windows
> and processing-time windows: {{boolean WindowAssigner.isEventTime()}}.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)