[ 
https://issues.apache.org/jira/browse/FLINK-18053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergii Mikhtoniuk updated FLINK-18053:
--------------------------------------
    Description: 
Flink produces invalid result when streaming SQL aggregation is stopped and 
resumed from a savepoint.

 

*Steps to reproduce:*

1) Create an assembly from the attached file.

This job will be reading CSV files as a stream. Files contain fake stock 
tickers which will be aggregated with following tumbling window query:
{code:java}
SELECT
  TUMBLE_START(event_time, INTERVAL '1' DAY) as event_time,
  symbol as symbol,
  min(price) as `min`,
  max(price) as `max`
FROM Tickers
GROUP BY TUMBLE(event_time, INTERVAL '1' DAY), symbol
{code}
Stream uses punctuated watermarks with max lateness of 1 day

2) Create two CSV files with fake stock tickers:

{{1.csv}}:
{code:java}
2000-01-01 01:00:00.0,A,10
2000-01-01 01:00:00.0,B,20
2000-01-01 02:00:00.0,A,10
2000-01-01 02:00:00.0,B,21
2000-01-02 01:00:00.0,A,12
2000-01-02 01:00:00.0,B,22
2000-01-02 02:00:00.0,A,13
2000-01-02 02:00:00.0,B,23
2000-01-01 03:00:00.0,A,11 // Late arrival - still above watermark
2000-01-03 01:00:00.0,A,14
2000-01-03 01:00:00.0,B,24
2000-01-03 02:00:00.0,A,15
2000-01-03 02:00:00.0,B,25
{code}
{{2.csv}}:
{code:java}
2000-01-01 04:00:00.0,A,12 // Late arrival - under watermark
2000-01-04 01:00:00.0,A,16 // Next values won't be visible in the result, they 
only push watermark up
2000-01-04 01:00:00.0,B,26
2000-01-04 02:00:00.0,A,17
2000-01-04 02:00:00.0,B,27
2000-01-05 01:00:00.0,A,18
2000-01-05 01:00:00.0,B,28
{code}
3) Run the job on the folder containing both files. Observed result is as 
expected:
{code:java}
2000-01-01,A,10,11
2000-01-01,B,20,21
2000-01-02,A,12,13
2000-01-02,B,22,23
2000-01-03,A,14,15
2000-01-03,B,24,25
{code}
4) Now run the job with only {{1.csv}} in the directory. Produces still correct:
{code:java}
2000-01-01,A,10,11
2000-01-01,B,20,21
{code}
5) Cancel job with savepoint, move {{2.csv}} into the directory. Restart job 
from savepoint. Produces incorrect result:
{code:java}
2000-01-01,A,12,12
2000-01-02,A,12,13
2000-01-02,B,22,23
2000-01-03,A,14,15
2000-01-03,B,24,25
{code}
 

*Expectation:*
 We were not supposed to see {{2000-01-01,A,12,12}} record, as it should not 
have passed the watermark check. This tells me that Flink did not save the 
watermark in the savepoint.

  was:
Flink produces invalid result when streaming SQL aggregation is stopped and 
resumed from a savepoint.

 

*Steps to reproduce:*

1) Create an assembly from the attached file.

This job will be reading CSV files as a stream. Files contain fake stock 
tickers which will be aggregated with following tumbling window query:
{code:java}
SELECT
  TUMBLE_START(event_time, INTERVAL '1' DAY) as event_time,
  symbol as symbol,
  min(price) as `min`,
  max(price) as `max`
FROM Tickers
GROUP BY TUMBLE(event_time, INTERVAL '1' DAY), symbol
{code}
Stream uses punctuated watermarks with max lateness of 1 day

2) Create two CSV files with fake stock tickers:

{{1.csv}}:
{code:java}
2000-01-01 01:00:00.0,A,10
2000-01-01 01:00:00.0,B,20
2000-01-01 02:00:00.0,A,10
2000-01-01 02:00:00.0,B,21
2000-01-02 01:00:00.0,A,12
2000-01-02 01:00:00.0,B,22
2000-01-02 02:00:00.0,A,13
2000-01-02 02:00:00.0,B,23
2000-01-01 03:00:00.0,A,11 // Late arrival - still above watermark
2000-01-03 01:00:00.0,A,14
2000-01-03 01:00:00.0,B,24
2000-01-03 02:00:00.0,A,15
2000-01-03 02:00:00.0,B,25
{code}
{{2.csv}}:
{code:java}
2000-01-01 04:00:00.0,A,12 // Late arrival - under watermark
2000-01-04 01:00:00.0,A,16 // Next values won't be visible in the result, they 
only push watermark up
2000-01-04 01:00:00.0,B,26
2000-01-04 02:00:00.0,A,17
2000-01-04 02:00:00.0,B,27
2000-01-05 01:00:00.0,A,18
2000-01-05 01:00:00.0,B,28
{code}
3) Run the job on the folder containing both files. Observed result is as 
expected:
{code:java}
2000-01-01,A,10,11
2000-01-01,B,20,21
2000-01-02,A,12,13
2000-01-02,B,22,23
2000-01-03,A,14,15
2000-01-03,B,24,25
{code}
4) Now run the job with only {{1.csv}} in the directory. Produces still correct:
{code:java}
2000-01-01,A,10,11
2000-01-01,B,20,21
{code}
5) Cancel job with savepoint, move {{2.csv}} into the directory. Restart job 
from savepoint. Produces incorrect result:
{code:java}
2000-01-01,A,12,12
2000-01-02,A,12,13
2000-01-02,B,22,23
2000-01-03,A,14,15
2000-01-03,B,24,25
{code}
 

*Expectation:*
 We were not supposed to see {{20[^MyApp.scala][^1.csv]00-01-01,A,12,12}} 
record, as it should not have passed the watermark check. This tells me that 
Flink did not save the watermark in the savepoint.


> Savepoints do not preserve watermarks
> -------------------------------------
>
>                 Key: FLINK-18053
>                 URL: https://issues.apache.org/jira/browse/FLINK-18053
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing, Table SQL / Runtime
>    Affects Versions: 1.10.1
>            Reporter: Sergii Mikhtoniuk
>            Priority: Major
>         Attachments: 1.csv, 2.csv, MyApp.scala
>
>
> Flink produces invalid result when streaming SQL aggregation is stopped and 
> resumed from a savepoint.
>  
> *Steps to reproduce:*
> 1) Create an assembly from the attached file.
> This job will be reading CSV files as a stream. Files contain fake stock 
> tickers which will be aggregated with following tumbling window query:
> {code:java}
> SELECT
>   TUMBLE_START(event_time, INTERVAL '1' DAY) as event_time,
>   symbol as symbol,
>   min(price) as `min`,
>   max(price) as `max`
> FROM Tickers
> GROUP BY TUMBLE(event_time, INTERVAL '1' DAY), symbol
> {code}
> Stream uses punctuated watermarks with max lateness of 1 day
> 2) Create two CSV files with fake stock tickers:
> {{1.csv}}:
> {code:java}
> 2000-01-01 01:00:00.0,A,10
> 2000-01-01 01:00:00.0,B,20
> 2000-01-01 02:00:00.0,A,10
> 2000-01-01 02:00:00.0,B,21
> 2000-01-02 01:00:00.0,A,12
> 2000-01-02 01:00:00.0,B,22
> 2000-01-02 02:00:00.0,A,13
> 2000-01-02 02:00:00.0,B,23
> 2000-01-01 03:00:00.0,A,11 // Late arrival - still above watermark
> 2000-01-03 01:00:00.0,A,14
> 2000-01-03 01:00:00.0,B,24
> 2000-01-03 02:00:00.0,A,15
> 2000-01-03 02:00:00.0,B,25
> {code}
> {{2.csv}}:
> {code:java}
> 2000-01-01 04:00:00.0,A,12 // Late arrival - under watermark
> 2000-01-04 01:00:00.0,A,16 // Next values won't be visible in the result, 
> they only push watermark up
> 2000-01-04 01:00:00.0,B,26
> 2000-01-04 02:00:00.0,A,17
> 2000-01-04 02:00:00.0,B,27
> 2000-01-05 01:00:00.0,A,18
> 2000-01-05 01:00:00.0,B,28
> {code}
> 3) Run the job on the folder containing both files. Observed result is as 
> expected:
> {code:java}
> 2000-01-01,A,10,11
> 2000-01-01,B,20,21
> 2000-01-02,A,12,13
> 2000-01-02,B,22,23
> 2000-01-03,A,14,15
> 2000-01-03,B,24,25
> {code}
> 4) Now run the job with only {{1.csv}} in the directory. Produces still 
> correct:
> {code:java}
> 2000-01-01,A,10,11
> 2000-01-01,B,20,21
> {code}
> 5) Cancel job with savepoint, move {{2.csv}} into the directory. Restart job 
> from savepoint. Produces incorrect result:
> {code:java}
> 2000-01-01,A,12,12
> 2000-01-02,A,12,13
> 2000-01-02,B,22,23
> 2000-01-03,A,14,15
> 2000-01-03,B,24,25
> {code}
>  
> *Expectation:*
>  We were not supposed to see {{2000-01-01,A,12,12}} record, as it should not 
> have passed the watermark check. This tells me that Flink did not save the 
> watermark in the savepoint.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to