[
https://issues.apache.org/jira/browse/FLINK-3714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15327334#comment-15327334
]
ASF GitHub Bot commented on FLINK-3714:
---------------------------------------
Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/2093#discussion_r66789406
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
---
@@ -121,6 +124,23 @@ public WindowedStream(KeyedStream<T, K> input,
}
/**
+ * Sets the allowed lateness. If the {@link WindowAssigner} used
+ * is in processing time, then the allowed lateness is set to 0.
+ */
+ @PublicEvolving
+ public WindowedStream<T, K, W> setAllowedLateness(Time lateness) {
+ long millis = lateness.toMilliseconds();
+ if (allowedLateness < 0) {
+ throw new IllegalArgumentException("The allowed
lateness cannot be negative.");
+ } else if (allowedLateness != 0 &&
!windowAssigner.isEventTime()) {
+ this.allowedLateness = 0;
--- End diff --
LOG is more natural. There is no reason to stop the job for the user (as it
is not an error). We should just tell him that
it does not make that much sense, so that he knows how to interpret the
results he gets. Throwing an exception is somehow more drastic.
> Add Support for "Allowed Lateness"
> ----------------------------------
>
> Key: FLINK-3714
> URL: https://issues.apache.org/jira/browse/FLINK-3714
> Project: Flink
> Issue Type: Sub-task
> Components: Streaming
> Reporter: Aljoscha Krettek
> Assignee: Kostas Kloudas
>
> As mentioned in
> https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit#
> we should add support for an allowed lateness setting.
> This includes several things:
> - API for setting allowed lateness
> - Dropping of late elements
> - Garbage collection of windows state/timers
> Depending on whether the {{WindowAssigner}} assigns windows based on event
> time or processing time we have to adjust the GC behavior. For event-time
> windows "allowed lateness" makes sense and we should garbage collect after
> this expires. For processing-time windows "allowed lateness" does not make
> sense and we should always GC window state/timers at the end timestamp of a
> processing-time window. I think that we need a method for this on
> {{WindowAssigner}} that allows to differentiate between event-time windows
> and processing-time windows: {{boolean WindowAssigner.isEventTime()}}.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)