[
https://issues.apache.org/jira/browse/FLINK-18053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sergii Mikhtoniuk updated FLINK-18053:
--------------------------------------
Description:
Flink produces invalid result when streaming SQL aggregation is stopped and
resumed from a savepoint.
*Steps to reproduce:*
1) Create an assembly from the attached file.
This job will be reading CSV files as a stream. Files contain fake stock
tickers which will be aggregated with following tumbling window query:
{code:java}
SELECT
TUMBLE_START(event_time, INTERVAL '1' DAY) as event_time,
symbol as symbol,
min(price) as `min`,
max(price) as `max`
FROM Tickers
GROUP BY TUMBLE(event_time, INTERVAL '1' DAY), symbol
{code}
Stream uses punctuated watermarks with max lateness of 1 day
2) Create two CSV files with fake stock tickers:
{{1.csv}}:
{code:java}
2000-01-01 01:00:00.0,A,10
2000-01-01 01:00:00.0,B,20
2000-01-01 02:00:00.0,A,10
2000-01-01 02:00:00.0,B,21
2000-01-02 01:00:00.0,A,12
2000-01-02 01:00:00.0,B,22
2000-01-02 02:00:00.0,A,13
2000-01-02 02:00:00.0,B,23
2000-01-01 03:00:00.0,A,11 // Late arrival - still above watermark
2000-01-03 01:00:00.0,A,14
2000-01-03 01:00:00.0,B,24
2000-01-03 02:00:00.0,A,15
2000-01-03 02:00:00.0,B,25
{code}
{{2.csv}}:
{code:java}
2000-01-01 04:00:00.0,A,12 // Late arrival - under watermark
2000-01-04 01:00:00.0,A,16 // Next values won't be visible in the result, they
only push watermark up
2000-01-04 01:00:00.0,B,26
2000-01-04 02:00:00.0,A,17
2000-01-04 02:00:00.0,B,27
2000-01-05 01:00:00.0,A,18
2000-01-05 01:00:00.0,B,28
{code}
3) Run the job on the folder containing both files. Observed result is as
expected:
{code:java}
2000-01-01,A,10,11
2000-01-01,B,20,21
2000-01-02,A,12,13
2000-01-02,B,22,23
2000-01-03,A,14,15
2000-01-03,B,24,25
{code}
4) Now run the job with only {{1.csv}} in the directory. Produces still correct:
{code:java}
2000-01-01,A,10,11
2000-01-01,B,20,21
{code}
5) Cancel job with savepoint, move {{2.csv}} into the directory. Restart job
from savepoint. Produces incorrect result:
{code:java}
2000-01-01,A,12,12
2000-01-02,A,12,13
2000-01-02,B,22,23
2000-01-03,A,14,15
2000-01-03,B,24,25
{code}
*Expectation:*
We were not supposed to see {{2000-01-01,A,12,12}} record, as it should not
have passed the watermark check. This tells me that Flink did not save the
watermark in the savepoint.
was:
Flink produces invalid result when streaming SQL aggregation is stopped and
resumed from a savepoint.
*Steps to reproduce:*
1) Create an assembly from the attached file.
This job will be reading CSV files as a stream. Files contain fake stock
tickers which will be aggregated with following tumbling window query:
{code:java}
SELECT
TUMBLE_START(event_time, INTERVAL '1' DAY) as event_time,
symbol as symbol,
min(price) as `min`,
max(price) as `max`
FROM Tickers
GROUP BY TUMBLE(event_time, INTERVAL '1' DAY), symbol
{code}
Stream uses punctuated watermarks with max lateness of 1 day
2) Create two CSV files with fake stock tickers:
{{1.csv}}:
{code:java}
2000-01-01 01:00:00.0,A,10
2000-01-01 01:00:00.0,B,20
2000-01-01 02:00:00.0,A,10
2000-01-01 02:00:00.0,B,21
2000-01-02 01:00:00.0,A,12
2000-01-02 01:00:00.0,B,22
2000-01-02 02:00:00.0,A,13
2000-01-02 02:00:00.0,B,23
2000-01-01 03:00:00.0,A,11 // Late arrival - still above watermark
2000-01-03 01:00:00.0,A,14
2000-01-03 01:00:00.0,B,24
2000-01-03 02:00:00.0,A,15
2000-01-03 02:00:00.0,B,25
{code}
{{2.csv}}:
{code:java}
2000-01-01 04:00:00.0,A,12 // Late arrival - under watermark
2000-01-04 01:00:00.0,A,16 // Next values won't be visible in the result, they
only push watermark up
2000-01-04 01:00:00.0,B,26
2000-01-04 02:00:00.0,A,17
2000-01-04 02:00:00.0,B,27
2000-01-05 01:00:00.0,A,18
2000-01-05 01:00:00.0,B,28
{code}
3) Run the job on the folder containing both files. Observed result is as
expected:
{code:java}
2000-01-01,A,10,11
2000-01-01,B,20,21
2000-01-02,A,12,13
2000-01-02,B,22,23
2000-01-03,A,14,15
2000-01-03,B,24,25
{code}
4) Now run the job with only {{1.csv}} in the directory. Produces still correct:
{code:java}
2000-01-01,A,10,11
2000-01-01,B,20,21
{code}
5) Cancel job with savepoint, move {{2.csv}} into the directory. Restart job
from savepoint. Produces incorrect result:
{code:java}
2000-01-01,A,12,12
2000-01-02,A,12,13
2000-01-02,B,22,23
2000-01-03,A,14,15
2000-01-03,B,24,25
{code}
*Expectation:*
We were not supposed to see {{20[^MyApp.scala][^1.csv]00-01-01,A,12,12}}
record, as it should not have passed the watermark check. This tells me that
Flink did not save the watermark in the savepoint.
> Savepoints do not preserve watermarks
> -------------------------------------
>
> Key: FLINK-18053
> URL: https://issues.apache.org/jira/browse/FLINK-18053
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing, Table SQL / Runtime
> Affects Versions: 1.10.1
> Reporter: Sergii Mikhtoniuk
> Priority: Major
> Attachments: 1.csv, 2.csv, MyApp.scala
>
>
> Flink produces invalid result when streaming SQL aggregation is stopped and
> resumed from a savepoint.
>
> *Steps to reproduce:*
> 1) Create an assembly from the attached file.
> This job will be reading CSV files as a stream. Files contain fake stock
> tickers which will be aggregated with following tumbling window query:
> {code:java}
> SELECT
> TUMBLE_START(event_time, INTERVAL '1' DAY) as event_time,
> symbol as symbol,
> min(price) as `min`,
> max(price) as `max`
> FROM Tickers
> GROUP BY TUMBLE(event_time, INTERVAL '1' DAY), symbol
> {code}
> Stream uses punctuated watermarks with max lateness of 1 day
> 2) Create two CSV files with fake stock tickers:
> {{1.csv}}:
> {code:java}
> 2000-01-01 01:00:00.0,A,10
> 2000-01-01 01:00:00.0,B,20
> 2000-01-01 02:00:00.0,A,10
> 2000-01-01 02:00:00.0,B,21
> 2000-01-02 01:00:00.0,A,12
> 2000-01-02 01:00:00.0,B,22
> 2000-01-02 02:00:00.0,A,13
> 2000-01-02 02:00:00.0,B,23
> 2000-01-01 03:00:00.0,A,11 // Late arrival - still above watermark
> 2000-01-03 01:00:00.0,A,14
> 2000-01-03 01:00:00.0,B,24
> 2000-01-03 02:00:00.0,A,15
> 2000-01-03 02:00:00.0,B,25
> {code}
> {{2.csv}}:
> {code:java}
> 2000-01-01 04:00:00.0,A,12 // Late arrival - under watermark
> 2000-01-04 01:00:00.0,A,16 // Next values won't be visible in the result,
> they only push watermark up
> 2000-01-04 01:00:00.0,B,26
> 2000-01-04 02:00:00.0,A,17
> 2000-01-04 02:00:00.0,B,27
> 2000-01-05 01:00:00.0,A,18
> 2000-01-05 01:00:00.0,B,28
> {code}
> 3) Run the job on the folder containing both files. Observed result is as
> expected:
> {code:java}
> 2000-01-01,A,10,11
> 2000-01-01,B,20,21
> 2000-01-02,A,12,13
> 2000-01-02,B,22,23
> 2000-01-03,A,14,15
> 2000-01-03,B,24,25
> {code}
> 4) Now run the job with only {{1.csv}} in the directory. Produces still
> correct:
> {code:java}
> 2000-01-01,A,10,11
> 2000-01-01,B,20,21
> {code}
> 5) Cancel job with savepoint, move {{2.csv}} into the directory. Restart job
> from savepoint. Produces incorrect result:
> {code:java}
> 2000-01-01,A,12,12
> 2000-01-02,A,12,13
> 2000-01-02,B,22,23
> 2000-01-03,A,14,15
> 2000-01-03,B,24,25
> {code}
>
> *Expectation:*
> We were not supposed to see {{2000-01-01,A,12,12}} record, as it should not
> have passed the watermark check. This tells me that Flink did not save the
> watermark in the savepoint.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)