Piotr Nowojski created FLINK-12872:
--------------------------------------

             Summary: WindowOperator may fail with 
UnsupportedOperationException when merging windows
                 Key: FLINK-12872
                 URL: https://issues.apache.org/jira/browse/FLINK-12872
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream
    Affects Versions: 1.8.0, 1.7.2, 1.6.4
            Reporter: Piotr Nowojski


[Reported 
|http://mail-archives.apache.org/mod_mbox/flink-user/201906.mbox/%3CCALDWsfhbP6D9+pnTzYuGaP0V4nReKJ4s9VsG_Xe1hZJq4O=z...@mail.gmail.com%3E]
 by a user.

{noformat}
I have a job that uses processing time session window with inactivity gap of 
60ms where I intermittently run into the following exception. I'm trying to 
figure out what happened here. Haven't been able to reproduce this scenario. 
Any thoughts?

java.lang.UnsupportedOperationException: The end timestamp of a processing-time 
window cannot become earlier than the current processing time by merging. 
Current processing time: 1560493731808 window: TimeWindow{start=1560493731654, 
end=1560493731778}
        at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:325)
        at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:311)
        at 
org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212)
        at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311)
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:745)
{noformat}

This is happening probably because {{System.currentTimeMillis()}} is not a 
monotonic function and {{WindowOperator}} accesses it at least twice: once when 
it creates a window and second time during performing the above mentioned check 
(that has failed). However I would guess there are more places like this, not 
only in {{WindowOperator}}.

The fix could be either to make sure that processing time is monotonic, or to 
access it only once per operator per record or to drop processing time in 
favour of ingestion time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to