[ 
https://issues.apache.org/jira/browse/FLINK-9717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-9717:
----------------------------------
    Description: 
Whenever one side of join receives {{MAX_WATERMARK}}, other side in joins (both 
normal and versioned joins) could flush the state from other side.

This highly useful optimisation that would speed up versioned joins and would 
allow normal joins of large unbounded streams with bounded tables (for example 
some static data).

edit:
Currently problem is that Flink doesn't keep & restore the last previous 
watermark after restoring from checkpoint and this is hard to workaround.

In other words, now we can easily "flush" one side of the join when we receive 
MAX_WATERMARK, but what should happen after restoring from checkpoint? There is 
no easy way to store the information that MAX_WATERMARK was previously reached. 
As far as I have thought about this, it can not be stored on the state of the 
Join operator and even if it could be done this way, it's probably not the 
proper/elegant solution. Probably the correct solution is to store 
MAX_WATERMARK in the state around watermark emitter/source operator and the 
last previously emitted watermark should be re-emitted when the job is restored.

  was:
Whenever one side of join receives {{MAX_WATERMARK}}, other side in joins (both 
normal and versioned joins) could flush the state from other side.

This highly useful optimisation that would speed up versioned joins and would 
allow normal joins of large unbounded streams with bounded tables (for example 
some static data).


> Flush state of one side of the join if other side is bounded
> ------------------------------------------------------------
>
>                 Key: FLINK-9717
>                 URL: https://issues.apache.org/jira/browse/FLINK-9717
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>    Affects Versions: 1.5.0
>            Reporter: Piotr Nowojski
>            Priority: Major
>
> Whenever one side of join receives {{MAX_WATERMARK}}, other side in joins 
> (both normal and versioned joins) could flush the state from other side.
> This highly useful optimisation that would speed up versioned joins and would 
> allow normal joins of large unbounded streams with bounded tables (for 
> example some static data).
> edit:
> Currently problem is that Flink doesn't keep & restore the last previous 
> watermark after restoring from checkpoint and this is hard to workaround.
> In other words, now we can easily "flush" one side of the join when we 
> receive MAX_WATERMARK, but what should happen after restoring from 
> checkpoint? There is no easy way to store the information that MAX_WATERMARK 
> was previously reached. As far as I have thought about this, it can not be 
> stored on the state of the Join operator and even if it could be done this 
> way, it's probably not the proper/elegant solution. Probably the correct 
> solution is to store MAX_WATERMARK in the state around watermark 
> emitter/source operator and the last previously emitted watermark should be 
> re-emitted when the job is restored.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to