[ 
https://issues.apache.org/jira/browse/FLINK-23749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414826#comment-17414826
 ] 

Martijn Visser commented on FLINK-23749:
----------------------------------------

I've performed multiple tests successfully. I used the following statements and 
successfully validated the output:

{code:sql}
CREATE TABLE LeftTable (
    row_time   TIMESTAMP(3),
    num INT,
    id STRING,
    WATERMARK FOR row_time AS row_time - INTERVAL '1' SECOND
) WITH (
  'connector' = 'faker',
  'fields.row_time.expression' = '#{date.past ''1'',''SECONDS''}',
  'fields.num.expression' = '#{regexify ''(1|2|3)''}',
  'fields.id.expression' = '#{regexify ''(L1|L2|L3)''}',
  'rows-per-second' = '1'
);

CREATE TABLE RightTable (
    row_time   TIMESTAMP(3),
    num INT,
    id STRING,
    WATERMARK FOR row_time AS row_time - INTERVAL '1' SECOND
) WITH (
  'connector' = 'faker',
  'fields.row_time.expression' = '#{date.past ''1'',''SECONDS''}',
  'fields.num.expression' = '#{regexify ''(1|2|3)''}',
  'fields.id.expression' = '#{regexify ''(R1|R2|R3)''}',
  'rows-per-second' = '1'
);
{code}

Full outer join (also tested inner, left and right join)
{code:sql}
SELECT L.num as L_Num, L.id as L_Id, R.num as R_Num, R.id as R_Id, 
L.window_start, L.window_end
           FROM (
               SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, 
DESCRIPTOR(row_time), INTERVAL '5' SECONDS))
           ) L
           FULL JOIN (
               SELECT * FROM TABLE(TUMBLE(TABLE RightTable, 
DESCRIPTOR(row_time), INTERVAL '5' SECONDS))
           ) R
           ON L.num = R.num AND L.window_start = R.window_start AND 
L.window_end = R.window_end;
{code}

Also tried a semi window join and the anti window join, but in 1.14 RC0 they 
both suffer from https://issues.apache.org/jira/browse/FLINK-24186. 

{code:sql}
SELECT *
           FROM (
               SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, 
DESCRIPTOR(row_time), INTERVAL '5' SECONDS))
           ) L WHERE L.num IN (
             SELECT num FROM (   
               SELECT * FROM TABLE(TUMBLE(TABLE RightTable, 
DESCRIPTOR(row_time), INTERVAL '5' SECONDS))
             ) R WHERE L.window_start = R.window_start AND L.window_end = 
R.window_end);
{code}

{noformat}
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Found more than one rowtime field: 
[row_time, window_time] in the query when insert into 
'default_catalog.default_database.Unregistered_Collect_Sink_35'.
Please select the rowtime field that should be used as event-time timestamp for 
the DataStream by casting all other fields to TIMESTAMP.
{noformat}

I've pinged [~twalthr] and he'll also merge this fix to the release-1.14 
branch. So this right now throws an error, but that should be resolved in the 
next RC and final release.

Last but not least, I also tried out this one and works to:

{code:sql}
CREATE TABLE orders ( 
    item_id STRING,
    pay_time TIMESTAMP(3),
    price DOUBLE, 
    item STRING,
    user_id STRING,
    seller_id STRING,
    WATERMARK FOR pay_time AS pay_time
) WITH (
  'connector' = 'faker',
  'fields.item_id.expression' = '#{regexify 
''(Apples|Pears|Bananas|Oranges|Strawberries)''}',
  'fields.pay_time.expression' = '#{date.past ''1'',''SECONDS''}',
  'fields.price.expression' = '#{Number.randomDouble ''2'',''1'',''150''}',
  'fields.item.expression' = '#{Commerce.productName}',
  'fields.user_id.expression' = '#{Address.firstName}',
  'fields.seller_id.expression' = '#{regexify 
''(Alice|Bob|Carol|Alex|Joe|James|Jane|Jack)''}',
  'rows-per-second' = '100'
);

SELECT item_id, a.seller_id, a.window_start, a.window_end, sales, 
item_buyer_cnt, slr_buyer_cnt
FROM (
    SELECT item_id, seller_id, window_start, window_end, SUM(price) as sales, 
COUNT(DISTINCT user_id) as item_buyer_cnt
    FROM TABLE(
        CUMULATE(TABLE orders, DESCRIPTOR(pay_time), INTERVAL '10' SECONDS, 
INTERVAL '1' DAY))
    GROUP BY item_id, seller_id, window_start, window_end
) a
LEFT JOIN ( -- using window join to enrich the buyer count of the shop from 
00:00 to current minute
    SELECT seller_id, window_start, window_end, COUNT(DISTINCT user_id) as 
slr_buyer_cnt
    FROM TABLE(
        CUMULATE(TABLE orders, DESCRIPTOR(pay_time), INTERVAL '10' SECONDS, 
INTERVAL '1' DAY))
    GROUP BY seller_id, window_start, window_end
) b
ON a.seller_id = b.seller_id AND a.window_start = b.window_start AND 
a.window_end = b.window_end;
{code}

Conclusion: test was successful and we should be good to go after 
https://issues.apache.org/jira/browse/FLINK-24186 is also merged into 
release-1.14.

> Testing Window Join
> -------------------
>
>                 Key: FLINK-23749
>                 URL: https://issues.apache.org/jira/browse/FLINK-23749
>             Project: Flink
>          Issue Type: Improvement
>          Components: Tests
>            Reporter: JING ZHANG
>            Assignee: Martijn Visser
>            Priority: Blocker
>              Labels: release-testing
>             Fix For: 1.14.0
>
>
> The window join requires the join on condition contains window starts 
> equality of input tables and window ends equality of input tables. The 
> semantic of window join is the same to the [DataStream window 
> join|https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/joining.html#window-join].
> {code:java}
> SELECT ...
> FROM L [LEFT|RIGHT|FULL OUTER] JOIN R -- L and R are relations applied 
> windowing TVF
> ON L.window_start = R.window_start AND L.window_end = R.window_end AND ...
> {code}
> In the future, we can also simplify the join on clause to only include the 
> window start equality if the windowing TVF is {{TUMBLE}} or {{HOP}} . 
> Currently, the windowing TVFs must be the same of left and right inputs. This 
> can be extended in the future, for example, tumbling windows join sliding 
> windows with the same window size.
> Currently, Flink not only supports Window Join which follows after [Window 
> Aggregation|https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/].
>   But also supports Window Join which follows after [Windowing 
> TVF|https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/queries/window-tvf/]
>  .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to