[ 
https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16131364#comment-16131364
 ] 

Fabian Hueske commented on FLINK-6233:
--------------------------------------

Hi [~xccui], sorry for the late reply.

You are right that the clean-up of a window needs to be triggered by the 
watermark of the other input. Basically, the watermark tells the operator that 
no more record from that input are expected that can join with some records of 
the other input. Those records can be discarded from the state.
A benefit of handling the watermarks of both inputs separately would be to 
immediately and completely join records from the slower input with the state of 
the faster input and not putting them into state to wait for other records from 
the faster input because we would know that those have all already been 
received due to the higher watermark.

Regarding your question
{quote}Did you mean that even for the rowtime join, the clean up timers should 
also use the ctx.registerProcessingTimeTimer() instead of 
ctx.registerEventTimeTimer()? I noticed that there's another issue (FLINK-7388) 
about the onTimer() method, but not sure if it's relative.{quote}

Yes, also event-time operators should implement the state retention time policy 
based on processing time. However, we don't need this for the windowed join 
operator. Windowed operators can (and must) automatically clear their complete 
state as time progresses. The state retention timers were added for operators 
that need to keep state forever to ensure correct semantics. By removing state, 
we give up correct semantics (in some cases) but ensure that the query does not 
leak state and can run for a very long time.

I'm currently on vacation and don't have much time for reviewing and no dev 
machine with me. I'll try to take a look at your code in the next days but 
can't promise.

Best, Fabian

> Support rowtime inner equi-join between two streams in the SQL API
> ------------------------------------------------------------------
>
>                 Key: FLINK-6233
>                 URL: https://issues.apache.org/jira/browse/FLINK-6233
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: hongyuhong
>            Assignee: Xingcan Cui
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL 
> '1' HOUR}} only can use rowtime that is a system attribute, the time 
> condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime 
> - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support 
> unbounded like {{o.rowtime < s.rowtime}} ,  and  should include both two 
> stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + 
> 1}} should also not be supported.
> An row-time streams join will not be able to handle late data, because this 
> would mean in insert a row into a sorted order shift all other computations. 
> This would be too expensive to maintain. Therefore, we will throw an error if 
> a user tries to use an row-time stream join with late data handling.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to