[ 
https://issues.apache.org/jira/browse/FLINK-6205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15948653#comment-15948653
 ] 

ASF GitHub Bot commented on FLINK-6205:
---------------------------------------

Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3644#discussion_r108866698
  
    --- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 ---
    @@ -196,19 +213,45 @@ public void processElement(StreamRecord<IN> element) 
throws Exception {
                        updateNFA(nfa);
     
                } else {
    -                   
getInternalWatermarkCallbackService().registerKeyForWatermarkCallback(keySelector.getKey(element.getValue()));
     
    -                   PriorityQueue<StreamRecord<IN>> priorityQueue = 
getPriorityQueue();
    +                   // In event-time processing we assume correctness of 
the watermark.
    +                   // Events with timestamp smaller than the last seen 
watermark are considered late.
    +                   // Late events are put in a dedicated side output, if 
the user has specified one.
    +
    +                   if (element.getTimestamp() >= lastWatermark) {
     
    -                   // event time processing
    -                   // we have to buffer the elements until we receive the 
proper watermark
    -                   if (getExecutionConfig().isObjectReuseEnabled()) {
    -                           // copy the StreamRecord so that it cannot be 
changed
    -                           priorityQueue.offer(new 
StreamRecord<IN>(inputSerializer.copy(element.getValue()), 
element.getTimestamp()));
    +                           // we have an event with a valid timestamp, so
    +                           // we buffer it until we receive the proper 
watermark.
    +
    +                           
getInternalWatermarkCallbackService().registerKeyForWatermarkCallback(keySelector.getKey(element.getValue()));
    +
    +                           PriorityQueue<StreamRecord<IN>> priorityQueue = 
getPriorityQueue();
    +                           if 
(getExecutionConfig().isObjectReuseEnabled()) {
    +                                   // copy the StreamRecord so that it 
cannot be changed
    +                                   priorityQueue.offer(new 
StreamRecord<>(inputSerializer.copy(element.getValue()), 
element.getTimestamp()));
    +                           } else {
    +                                   priorityQueue.offer(element);
    +                           }
    +                           updatePriorityQueue(priorityQueue);
                        } else {
    -                           priorityQueue.offer(element);
    +                           sideOutputLateElement(element);
                        }
    -                   updatePriorityQueue(priorityQueue);
    +           }
    +   }
    +
    +   private void updateLastSeenWatermark(Watermark watermark) {
    +           this.lastWatermark = watermark.getTimestamp();
    --- End diff --
    
    Anyway would be nice to see a testcase like watermark(10) 
restoreFromCheckpoint event(7)


> Put late elements in side output.
> ---------------------------------
>
>                 Key: FLINK-6205
>                 URL: https://issues.apache.org/jira/browse/FLINK-6205
>             Project: Flink
>          Issue Type: Bug
>          Components: CEP
>    Affects Versions: 1.3.0
>            Reporter: Kostas Kloudas
>            Assignee: Kostas Kloudas
>             Fix For: 1.3.0
>
>
> Currently the CEP library had a somehow fuzzy way to handle late events. 
> Essentially:
> 1) it accepts all events (late and early)
> 2) it sorts them based on event time
> 3) whenever a watermark arrives, it feeds them into the NFA.
> This does not respect event time, as late events are still processed.
> In addition, given that the order in which elements are processed matters, 
> this could lead to wrong results as events may be processed by the NFA 
> out-of-order with respect to their timestamps.
> This issue proposes to assume correctness of the watermark and consider as 
> late, events that arrive having  a timestamp smaller than that of the last 
> seen watermark. In addition, late events are not silently dropped, but the 
> user can specify to send them to a side output, as done in the case of the 
> {{WindowOperator}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to