[
https://issues.apache.org/jira/browse/FLINK-18053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17121485#comment-17121485
]
Jiayi Liao commented on FLINK-18053:
------------------------------------
Yes. Flink doesn't save watermark in savepoint. Duplicate of FLINK-5601.
> Savepoints do not preserve watermarks
> -------------------------------------
>
> Key: FLINK-18053
> URL: https://issues.apache.org/jira/browse/FLINK-18053
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing, Table SQL / Runtime
> Affects Versions: 1.10.1
> Reporter: Sergii Mikhtoniuk
> Priority: Major
> Attachments: 1.csv, 2.csv, MyApp.scala
>
>
> Flink produces invalid result when streaming SQL aggregation is stopped and
> resumed from a savepoint.
>
> *Steps to reproduce:*
> 1) Create an assembly from the attached file.
> This job will be reading CSV files as a stream. Files contain fake stock
> tickers which will be aggregated with following tumbling window query:
> {code:java}
> SELECT
> TUMBLE_START(event_time, INTERVAL '1' DAY) as event_time,
> symbol as symbol,
> min(price) as `min`,
> max(price) as `max`
> FROM Tickers
> GROUP BY TUMBLE(event_time, INTERVAL '1' DAY), symbol
> {code}
> Stream uses punctuated watermarks with max lateness of 1 day
> 2) Create two CSV files with fake stock tickers:
> {{1.csv}}:
> {code:java}
> 2000-01-01 01:00:00.0,A,10
> 2000-01-01 01:00:00.0,B,20
> 2000-01-01 02:00:00.0,A,10
> 2000-01-01 02:00:00.0,B,21
> 2000-01-02 01:00:00.0,A,12
> 2000-01-02 01:00:00.0,B,22
> 2000-01-02 02:00:00.0,A,13
> 2000-01-02 02:00:00.0,B,23
> 2000-01-01 03:00:00.0,A,11 // Late arrival - still above watermark
> 2000-01-03 01:00:00.0,A,14
> 2000-01-03 01:00:00.0,B,24
> 2000-01-03 02:00:00.0,A,15
> 2000-01-03 02:00:00.0,B,25
> {code}
> {{2.csv}}:
> {code:java}
> 2000-01-01 04:00:00.0,A,12 // Late arrival - under watermark
> 2000-01-04 01:00:00.0,A,16 // Next values won't be visible in the result,
> they only push watermark up
> 2000-01-04 01:00:00.0,B,26
> 2000-01-04 02:00:00.0,A,17
> 2000-01-04 02:00:00.0,B,27
> 2000-01-05 01:00:00.0,A,18
> 2000-01-05 01:00:00.0,B,28
> {code}
> 3) Run the job on the folder containing both files. Observed result is as
> expected:
> {code:java}
> 2000-01-01,A,10,11
> 2000-01-01,B,20,21
> 2000-01-02,A,12,13
> 2000-01-02,B,22,23
> 2000-01-03,A,14,15
> 2000-01-03,B,24,25
> {code}
> 4) Now run the job with only {{1.csv}} in the directory. Produces still
> correct:
> {code:java}
> 2000-01-01,A,10,11
> 2000-01-01,B,20,21
> {code}
> 5) Cancel job with savepoint, move {{2.csv}} into the directory. Restart job
> from savepoint. Produces incorrect result:
> {code:java}
> 2000-01-01,A,12,12
> 2000-01-02,A,12,13
> 2000-01-02,B,22,23
> 2000-01-03,A,14,15
> 2000-01-03,B,24,25
> {code}
>
> *Expectation:*
> We were not supposed to see {{2000-01-01,A,12,12}} record, as it should not
> have passed the watermark check. This tells me that Flink did not save the
> watermark in the savepoint.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)