[
https://issues.apache.org/jira/browse/FLINK-6205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15947603#comment-15947603
]
ASF GitHub Bot commented on FLINK-6205:
---------------------------------------
Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/3644#discussion_r108741567
--- Diff:
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
---
@@ -196,19 +213,45 @@ public void processElement(StreamRecord<IN> element)
throws Exception {
updateNFA(nfa);
} else {
-
getInternalWatermarkCallbackService().registerKeyForWatermarkCallback(keySelector.getKey(element.getValue()));
- PriorityQueue<StreamRecord<IN>> priorityQueue =
getPriorityQueue();
+ // In event-time processing we assume correctness of
the watermark.
+ // Events with timestamp smaller than the last seen
watermark are considered late.
+ // Late events are put in a dedicated side output, if
the user has specified one.
+
+ if (element.getTimestamp() >= lastWatermark) {
- // event time processing
- // we have to buffer the elements until we receive the
proper watermark
- if (getExecutionConfig().isObjectReuseEnabled()) {
- // copy the StreamRecord so that it cannot be
changed
- priorityQueue.offer(new
StreamRecord<IN>(inputSerializer.copy(element.getValue()),
element.getTimestamp()));
+ // we have an event with a valid timestamp, so
+ // we buffer it until we receive the proper
watermark.
+
+
getInternalWatermarkCallbackService().registerKeyForWatermarkCallback(keySelector.getKey(element.getValue()));
+
+ PriorityQueue<StreamRecord<IN>> priorityQueue =
getPriorityQueue();
+ if
(getExecutionConfig().isObjectReuseEnabled()) {
+ // copy the StreamRecord so that it
cannot be changed
+ priorityQueue.offer(new
StreamRecord<>(inputSerializer.copy(element.getValue()),
element.getTimestamp()));
+ } else {
+ priorityQueue.offer(element);
+ }
+ updatePriorityQueue(priorityQueue);
} else {
- priorityQueue.offer(element);
+ sideOutputLateElement(element);
}
- updatePriorityQueue(priorityQueue);
+ }
+ }
+
+ private void updateLastSeenWatermark(Watermark watermark) {
+ this.lastWatermark = watermark.getTimestamp();
--- End diff --
Shouldn't the lastWatermark be stored in a StateBackend? How will it behave
after restoring from a checkpoint?
> Put late elements in side output.
> ---------------------------------
>
> Key: FLINK-6205
> URL: https://issues.apache.org/jira/browse/FLINK-6205
> Project: Flink
> Issue Type: Bug
> Components: CEP
> Affects Versions: 1.3.0
> Reporter: Kostas Kloudas
> Assignee: Kostas Kloudas
> Fix For: 1.3.0
>
>
> Currently the CEP library had a somehow fuzzy way to handle late events.
> Essentially:
> 1) it accepts all events (late and early)
> 2) it sorts them based on event time
> 3) whenever a watermark arrives, it feeds them into the NFA.
> This does not respect event time, as late events are still processed.
> In addition, given that the order in which elements are processed matters,
> this could lead to wrong results as events may be processed by the NFA
> out-of-order with respect to their timestamps.
> This issue proposes to assume correctness of the watermark and consider as
> late, events that arrive having a timestamp smaller than that of the last
> seen watermark. In addition, late events are not silently dropped, but the
> user can specify to send them to a side output, as done in the case of the
> {{WindowOperator}}.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)