[
https://issues.apache.org/jira/browse/FLINK-23749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414826#comment-17414826
]
Martijn Visser commented on FLINK-23749:
----------------------------------------
I've performed multiple tests successfully. I used the following statements and
successfully validated the output:
{code:sql}
CREATE TABLE LeftTable (
row_time TIMESTAMP(3),
num INT,
id STRING,
WATERMARK FOR row_time AS row_time - INTERVAL '1' SECOND
) WITH (
'connector' = 'faker',
'fields.row_time.expression' = '#{date.past ''1'',''SECONDS''}',
'fields.num.expression' = '#{regexify ''(1|2|3)''}',
'fields.id.expression' = '#{regexify ''(L1|L2|L3)''}',
'rows-per-second' = '1'
);
CREATE TABLE RightTable (
row_time TIMESTAMP(3),
num INT,
id STRING,
WATERMARK FOR row_time AS row_time - INTERVAL '1' SECOND
) WITH (
'connector' = 'faker',
'fields.row_time.expression' = '#{date.past ''1'',''SECONDS''}',
'fields.num.expression' = '#{regexify ''(1|2|3)''}',
'fields.id.expression' = '#{regexify ''(R1|R2|R3)''}',
'rows-per-second' = '1'
);
{code}
Full outer join (also tested inner, left and right join)
{code:sql}
SELECT L.num as L_Num, L.id as L_Id, R.num as R_Num, R.id as R_Id,
L.window_start, L.window_end
FROM (
SELECT * FROM TABLE(TUMBLE(TABLE LeftTable,
DESCRIPTOR(row_time), INTERVAL '5' SECONDS))
) L
FULL JOIN (
SELECT * FROM TABLE(TUMBLE(TABLE RightTable,
DESCRIPTOR(row_time), INTERVAL '5' SECONDS))
) R
ON L.num = R.num AND L.window_start = R.window_start AND
L.window_end = R.window_end;
{code}
Also tried a semi window join and the anti window join, but in 1.14 RC0 they
both suffer from https://issues.apache.org/jira/browse/FLINK-24186.
{code:sql}
SELECT *
FROM (
SELECT * FROM TABLE(TUMBLE(TABLE LeftTable,
DESCRIPTOR(row_time), INTERVAL '5' SECONDS))
) L WHERE L.num IN (
SELECT num FROM (
SELECT * FROM TABLE(TUMBLE(TABLE RightTable,
DESCRIPTOR(row_time), INTERVAL '5' SECONDS))
) R WHERE L.window_start = R.window_start AND L.window_end =
R.window_end);
{code}
{noformat}
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Found more than one rowtime field:
[row_time, window_time] in the query when insert into
'default_catalog.default_database.Unregistered_Collect_Sink_35'.
Please select the rowtime field that should be used as event-time timestamp for
the DataStream by casting all other fields to TIMESTAMP.
{noformat}
I've pinged [~twalthr] and he'll also merge this fix to the release-1.14
branch. So this right now throws an error, but that should be resolved in the
next RC and final release.
Last but not least, I also tried out this one and works to:
{code:sql}
CREATE TABLE orders (
item_id STRING,
pay_time TIMESTAMP(3),
price DOUBLE,
item STRING,
user_id STRING,
seller_id STRING,
WATERMARK FOR pay_time AS pay_time
) WITH (
'connector' = 'faker',
'fields.item_id.expression' = '#{regexify
''(Apples|Pears|Bananas|Oranges|Strawberries)''}',
'fields.pay_time.expression' = '#{date.past ''1'',''SECONDS''}',
'fields.price.expression' = '#{Number.randomDouble ''2'',''1'',''150''}',
'fields.item.expression' = '#{Commerce.productName}',
'fields.user_id.expression' = '#{Address.firstName}',
'fields.seller_id.expression' = '#{regexify
''(Alice|Bob|Carol|Alex|Joe|James|Jane|Jack)''}',
'rows-per-second' = '100'
);
SELECT item_id, a.seller_id, a.window_start, a.window_end, sales,
item_buyer_cnt, slr_buyer_cnt
FROM (
SELECT item_id, seller_id, window_start, window_end, SUM(price) as sales,
COUNT(DISTINCT user_id) as item_buyer_cnt
FROM TABLE(
CUMULATE(TABLE orders, DESCRIPTOR(pay_time), INTERVAL '10' SECONDS,
INTERVAL '1' DAY))
GROUP BY item_id, seller_id, window_start, window_end
) a
LEFT JOIN ( -- using window join to enrich the buyer count of the shop from
00:00 to current minute
SELECT seller_id, window_start, window_end, COUNT(DISTINCT user_id) as
slr_buyer_cnt
FROM TABLE(
CUMULATE(TABLE orders, DESCRIPTOR(pay_time), INTERVAL '10' SECONDS,
INTERVAL '1' DAY))
GROUP BY seller_id, window_start, window_end
) b
ON a.seller_id = b.seller_id AND a.window_start = b.window_start AND
a.window_end = b.window_end;
{code}
Conclusion: test was successful and we should be good to go after
https://issues.apache.org/jira/browse/FLINK-24186 is also merged into
release-1.14.
> Testing Window Join
> -------------------
>
> Key: FLINK-23749
> URL: https://issues.apache.org/jira/browse/FLINK-23749
> Project: Flink
> Issue Type: Improvement
> Components: Tests
> Reporter: JING ZHANG
> Assignee: Martijn Visser
> Priority: Blocker
> Labels: release-testing
> Fix For: 1.14.0
>
>
> The window join requires the join on condition contains window starts
> equality of input tables and window ends equality of input tables. The
> semantic of window join is the same to the [DataStream window
> join|https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/joining.html#window-join].
> {code:java}
> SELECT ...
> FROM L [LEFT|RIGHT|FULL OUTER] JOIN R -- L and R are relations applied
> windowing TVF
> ON L.window_start = R.window_start AND L.window_end = R.window_end AND ...
> {code}
> In the future, we can also simplify the join on clause to only include the
> window start equality if the windowing TVF is {{TUMBLE}} or {{HOP}} .
> Currently, the windowing TVFs must be the same of left and right inputs. This
> can be extended in the future, for example, tumbling windows join sliding
> windows with the same window size.
> Currently, Flink not only supports Window Join which follows after [Window
> Aggregation|https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/].
> But also supports Window Join which follows after [Windowing
> TVF|https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/queries/window-tvf/]
> .
--
This message was sent by Atlassian Jira
(v8.3.4#803005)