[
https://issues.apache.org/jira/browse/FLINK-31013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jane Chan updated FLINK-31013:
------------------------------
Description:
{code:sql}
-- test against Flink 1.16.0
create catalog fscat with (
'type' = 'table-store',
'warehouse' = 'file:///tmp/fscat'
);
use catalog fscat;
create table events (
`id` int,
`type` string,
`date` TIMESTAMP(3),
watermark for `date` AS `date`);
insert into events
values (1, 'T1', to_timestamp('2018-01-24', 'yyyy-MM-dd')),
(2, 'T1', to_timestamp('2018-01-26', 'yyyy-MM-dd')),
(1, 'T2', to_timestamp('2018-01-28', 'yyyy-MM-dd')),
(1, 'T2', to_timestamp('2018-01-28', 'yyyy-MM-dd'));
-- no output
select `id`,
`type`,
COUNT(1) as event_cnt,
session_start(`date`, interval '1' DAY) as ss,
session_end(`date`, interval '1' DAY) as se
from events group by `id`, `type`, session(`date`, interval '1' DAY);
-- explain plan
== Abstract Syntax Tree ==
LogicalProject(id=[$0], type=[$1], event_cnt=[$3], ss=[SESSION_START($2)],
se=[SESSION_END($2)])
+- LogicalAggregate(group=[{0, 1, 2}], event_cnt=[COUNT()])
+- LogicalProject(id=[$0], type=[$1], $f2=[$SESSION($2, 86400000:INTERVAL
DAY)])
+- LogicalWatermarkAssigner(rowtime=[date], watermark=[$2])
+- LogicalTableScan(table=[[fscat, default, events]])
== Optimized Physical Plan ==
Calc(select=[id, type, event_cnt, w$start AS ss, w$end AS se])
+- GroupWindowAggregate(groupBy=[id, type], window=[SessionGroupWindow('w$,
date, 86400000)], properties=[w$start, w$end, w$rowtime, w$proctime],
select=[id, type, COUNT(*) AS event_cnt, start('w$) AS w$start, end('w$) AS
w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[hash[id, type]])
+- TableSourceScan(table=[[fscat, default, events, watermark=[date]]],
fields=[id, type, date])
== Optimized Execution Plan ==
Calc(select=[id, type, event_cnt, w$start AS ss, w$end AS se])
+- GroupWindowAggregate(groupBy=[id, type], window=[SessionGroupWindow('w$,
date, 86400000)], properties=[w$start, w$end, w$rowtime, w$proctime],
select=[id, type, COUNT(*) AS event_cnt, start('w$) AS w$start, end('w$) AS
w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[hash[id, type]])
+- TableSourceScan(table=[[fscat, default, events, watermark=[date]]],
fields=[id, type, date])
-- however, if switch to filesystem source, the window can be triggered normally
CREATE TEMPORARY TABLE `fscat`.`default`.`event_file_source` (
`id` INT,
`type` VARCHAR(2147483647),
`date` TIMESTAMP(3),
WATERMARK FOR `date` AS `date`
) WITH (
'format' = 'csv',
'path' = '/tmp/events.csv',
'source.monitor-interval' = '1 min',
'connector' = 'filesystem'
);
// cat events.csv
1,T1,2018-01-24 00:00:00.000
2,T1,2018-01-26 00:00:00.000
1,T2,2018-01-28 00:00:00.000
1,T2,2018-01-28 00:00:00.000
-- same query using filesystem source
select `id`, `type`, COUNT(1) as event_cnt, session_start(`date`, interval '1'
DAY) as ss, session_end(`date`, interval '1' DAY) as se from event_file_source
group by `id`, `type`, session(`date`, interval '1' DAY);
-- output
id type event_cnt
ss se
1 T1 1 2018-01-24
00:00:00.000 2018-01-25 00:00:00.000
2 T1 1 2018-01-26
00:00:00.000 2018-01-27 00:00:00.000{code}
was:
{code:java}
-- test against Flink 1.16.0
create catalog fscat with (
'type' = 'table-store',
'warehouse' = 'file:///tmp/fscat'
);
use catalog fscat;
create table events (
`id` int,
`type` string,
`date` TIMESTAMP(3),
watermark for `date` AS `date`);
insert into events
values (1, 'T1', to_timestamp('2018-01-24', 'yyyy-MM-dd')),
(2, 'T1', to_timestamp('2018-01-26', 'yyyy-MM-dd')),
(1, 'T2', to_timestamp('2018-01-28', 'yyyy-MM-dd')),
(1, 'T2', to_timestamp('2018-01-28', 'yyyy-MM-dd'));
-- no output
select `id`,
`type`,
COUNT(1) as event_cnt,
session_start(`date`, interval '1' DAY) as ss,
session_end(`date`, interval '1' DAY) as se
from events group by `id`, `type`, session(`date`, interval '1' DAY);
-- explain plan
== Abstract Syntax Tree ==
LogicalProject(id=[$0], type=[$1], event_cnt=[$3], ss=[SESSION_START($2)],
se=[SESSION_END($2)])
+- LogicalAggregate(group=[{0, 1, 2}], event_cnt=[COUNT()])
+- LogicalProject(id=[$0], type=[$1], $f2=[$SESSION($2, 86400000:INTERVAL
DAY)])
+- LogicalWatermarkAssigner(rowtime=[date], watermark=[$2])
+- LogicalTableScan(table=[[fscat, default, events]])
== Optimized Physical Plan ==
Calc(select=[id, type, event_cnt, w$start AS ss, w$end AS se])
+- GroupWindowAggregate(groupBy=[id, type], window=[SessionGroupWindow('w$,
date, 86400000)], properties=[w$start, w$end, w$rowtime, w$proctime],
select=[id, type, COUNT(*) AS event_cnt, start('w$) AS w$start, end('w$) AS
w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[hash[id, type]])
+- TableSourceScan(table=[[fscat, default, events, watermark=[date]]],
fields=[id, type, date])
== Optimized Execution Plan ==
Calc(select=[id, type, event_cnt, w$start AS ss, w$end AS se])
+- GroupWindowAggregate(groupBy=[id, type], window=[SessionGroupWindow('w$,
date, 86400000)], properties=[w$start, w$end, w$rowtime, w$proctime],
select=[id, type, COUNT(*) AS event_cnt, start('w$) AS w$start, end('w$) AS
w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[hash[id, type]])
+- TableSourceScan(table=[[fscat, default, events, watermark=[date]]],
fields=[id, type, date])
-- however, if switch to filesystem source, the window can be triggered normally
CREATE TEMPORARY TABLE `fscat`.`default`.`event_file_source` (
`id` INT,
`type` VARCHAR(2147483647),
`date` TIMESTAMP(3),
WATERMARK FOR `date` AS `date`
) WITH (
'format' = 'csv',
'path' = '/tmp/events.csv',
'source.monitor-interval' = '1 min',
'connector' = 'filesystem'
);
// cat events.csv
1,T1,2018-01-24 00:00:00.000
2,T1,2018-01-26 00:00:00.000
1,T2,2018-01-28 00:00:00.000
1,T2,2018-01-28 00:00:00.000
-- same query using filesystem source
select `id`, `type`, COUNT(1) as event_cnt, session_start(`date`, interval '1'
DAY) as ss, session_end(`date`, interval '1' DAY) as se from event_file_source
group by `id`, `type`, session(`date`, interval '1' DAY);
-- output
id type event_cnt
ss se
1 T1 1 2018-01-24
00:00:00.000 2018-01-25 00:00:00.000
2 T1 1 2018-01-26
00:00:00.000 2018-01-27 00:00:00.000{code}
> Session window aggregation cannot trigger window using event time
> -----------------------------------------------------------------
>
> Key: FLINK-31013
> URL: https://issues.apache.org/jira/browse/FLINK-31013
> Project: Flink
> Issue Type: Bug
> Components: Table Store
> Affects Versions: table-store-0.4.0
> Reporter: Jane Chan
> Priority: Major
> Fix For: table-store-0.4.0
>
>
> {code:sql}
> -- test against Flink 1.16.0
> create catalog fscat with (
> 'type' = 'table-store',
> 'warehouse' = 'file:///tmp/fscat'
> );
> use catalog fscat;
> create table events (
> `id` int,
> `type` string,
> `date` TIMESTAMP(3),
> watermark for `date` AS `date`);
>
> insert into events
> values (1, 'T1', to_timestamp('2018-01-24', 'yyyy-MM-dd')),
> (2, 'T1', to_timestamp('2018-01-26', 'yyyy-MM-dd')),
> (1, 'T2', to_timestamp('2018-01-28', 'yyyy-MM-dd')),
> (1, 'T2', to_timestamp('2018-01-28', 'yyyy-MM-dd'));
> -- no output
> select `id`,
> `type`,
> COUNT(1) as event_cnt,
> session_start(`date`, interval '1' DAY) as ss,
> session_end(`date`, interval '1' DAY) as se
> from events group by `id`, `type`, session(`date`, interval '1' DAY);
> -- explain plan
> == Abstract Syntax Tree ==
> LogicalProject(id=[$0], type=[$1], event_cnt=[$3], ss=[SESSION_START($2)],
> se=[SESSION_END($2)])
> +- LogicalAggregate(group=[{0, 1, 2}], event_cnt=[COUNT()])
> +- LogicalProject(id=[$0], type=[$1], $f2=[$SESSION($2, 86400000:INTERVAL
> DAY)])
> +- LogicalWatermarkAssigner(rowtime=[date], watermark=[$2])
> +- LogicalTableScan(table=[[fscat, default, events]])
> == Optimized Physical Plan ==
> Calc(select=[id, type, event_cnt, w$start AS ss, w$end AS se])
> +- GroupWindowAggregate(groupBy=[id, type], window=[SessionGroupWindow('w$,
> date, 86400000)], properties=[w$start, w$end, w$rowtime, w$proctime],
> select=[id, type, COUNT(*) AS event_cnt, start('w$) AS w$start, end('w$) AS
> w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
> +- Exchange(distribution=[hash[id, type]])
> +- TableSourceScan(table=[[fscat, default, events, watermark=[date]]],
> fields=[id, type, date])
> == Optimized Execution Plan ==
> Calc(select=[id, type, event_cnt, w$start AS ss, w$end AS se])
> +- GroupWindowAggregate(groupBy=[id, type], window=[SessionGroupWindow('w$,
> date, 86400000)], properties=[w$start, w$end, w$rowtime, w$proctime],
> select=[id, type, COUNT(*) AS event_cnt, start('w$) AS w$start, end('w$) AS
> w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
> +- Exchange(distribution=[hash[id, type]])
> +- TableSourceScan(table=[[fscat, default, events, watermark=[date]]],
> fields=[id, type, date])
> -- however, if switch to filesystem source, the window can be triggered
> normally
> CREATE TEMPORARY TABLE `fscat`.`default`.`event_file_source` (
> `id` INT,
> `type` VARCHAR(2147483647),
> `date` TIMESTAMP(3),
> WATERMARK FOR `date` AS `date`
> ) WITH (
> 'format' = 'csv',
> 'path' = '/tmp/events.csv',
> 'source.monitor-interval' = '1 min',
> 'connector' = 'filesystem'
> );
> // cat events.csv
> 1,T1,2018-01-24 00:00:00.000
> 2,T1,2018-01-26 00:00:00.000
> 1,T2,2018-01-28 00:00:00.000
> 1,T2,2018-01-28 00:00:00.000
> -- same query using filesystem source
> select `id`, `type`, COUNT(1) as event_cnt, session_start(`date`, interval
> '1' DAY) as ss, session_end(`date`, interval '1' DAY) as se from
> event_file_source group by `id`, `type`, session(`date`, interval '1' DAY);
> -- output
> id type event_cnt
> ss se
> 1 T1 1 2018-01-24
> 00:00:00.000 2018-01-25 00:00:00.000
> 2 T1 1 2018-01-26
> 00:00:00.000 2018-01-27 00:00:00.000{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)