Sergii Mikhtoniuk created FLINK-18053:
-----------------------------------------

             Summary: Savepoints do not preserve watermarks
                 Key: FLINK-18053
                 URL: https://issues.apache.org/jira/browse/FLINK-18053
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Checkpointing, Table SQL / Runtime
    Affects Versions: 1.10.1
            Reporter: Sergii Mikhtoniuk
         Attachments: 1.csv, 2.csv, MyApp.scala

Flink produces invalid result when streaming SQL aggregation is stopped and 
resumed from a savepoint.

 

*Steps to reproduce:*

1) Create an assembly from the attached file.

This job will be reading CSV files as a stream. Files contain fake stock 
tickers which will be aggregated with following tumbling window query:
{code:java}
SELECT
  TUMBLE_START(event_time, INTERVAL '1' DAY) as event_time,
  symbol as symbol,
  min(price) as `min`,
  max(price) as `max`
FROM Tickers
GROUP BY TUMBLE(event_time, INTERVAL '1' DAY), symbol
{code}
Stream uses punctuated watermarks with max lateness of 1 day

2) Create two CSV files with fake stock tickers:

{{1.csv}}:
{code:java}
2000-01-01 01:00:00.0,A,10
2000-01-01 01:00:00.0,B,20
2000-01-01 02:00:00.0,A,10
2000-01-01 02:00:00.0,B,21
2000-01-02 01:00:00.0,A,12
2000-01-02 01:00:00.0,B,22
2000-01-02 02:00:00.0,A,13
2000-01-02 02:00:00.0,B,23
2000-01-01 03:00:00.0,A,11 // Late arrival - still above watermark
2000-01-03 01:00:00.0,A,14
2000-01-03 01:00:00.0,B,24
2000-01-03 02:00:00.0,A,15
2000-01-03 02:00:00.0,B,25
{code}
{{2.csv}}:
{code:java}
2000-01-01 04:00:00.0,A,12 // Late arrival - under watermark
2000-01-04 01:00:00.0,A,16 // Next values won't be visible in the result, they 
only push watermark up
2000-01-04 01:00:00.0,B,26
2000-01-04 02:00:00.0,A,17
2000-01-04 02:00:00.0,B,27
2000-01-05 01:00:00.0,A,18
2000-01-05 01:00:00.0,B,28
{code}
3) Run the job on the folder containing both files. Observed result is as 
expected:
{code:java}
2000-01-01,A,10,11
2000-01-01,B,20,21
2000-01-02,A,12,13
2000-01-02,B,22,23
2000-01-03,A,14,15
2000-01-03,B,24,25
{code}
4) Now run the job with only {{1.csv}} in the directory. Produces still correct:
{code:java}
2000-01-01,A,10,11
2000-01-01,B,20,21
{code}
5) Cancel job with savepoint, move {{2.csv}} into the directory. Restart job 
from savepoint. Produces incorrect result:
{code:java}
2000-01-01,A,12,12
2000-01-02,A,12,13
2000-01-02,B,22,23
2000-01-03,A,14,15
2000-01-03,B,24,25
{code}
 

*Expectation:*
 We were not supposed to see {{20[^MyApp.scala][^1.csv]00-01-01,A,12,12}} 
record, as it should not have passed the watermark check. This tells me that 
Flink did not save the watermark in the savepoint.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to