[
https://issues.apache.org/jira/browse/FLINK-27539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
hjw updated FLINK-27539:
------------------------
Description:
custom_kafka is a cdc table
sql:
{code:java}
select DATE_FORMAT(window_end,'yyyy-MM-dd') as date_str,sum(money) as total,name
from TABLE(CUMULATE(TABLE custom_kafka,descriptor(createtime),interval '1'
MINUTES,interval '1' DAY ))
where status='1'
group by name,window_start,window_end;
{code}
Error
{code:java}
org.apache.flink.table.api.TableException: StreamPhysicalWindowAggregate
doesn't support consuming update and delete changes which is produced by node
TableSourceScan(table=[[default_catalog, default_database,custom_kafka]],
fields=[name, money, status,createtime,operation_ts])
{code}
But I found Group Window Aggregation is works when use cdc table
{code:java}
select DATE_FORMAT(TUMBLE_END(createtime,interval '10' MINUTES),'yyyy-MM-dd')
as date_str,sum(money) as total,name
from custom_kafka
where status='1'
group by name,TUMBLE(createtime,interval '10' MINUTES)
{code}
was:
custom_kafka is a cdc table
sql:
{code:java}
select DATE_FORMAT(window_end,'yyyy-MM-dd') as date_str,sum(money) as total,name
from TABLE(CUMULATE(TABLE custom_kafka,descriptor(createtime),interval '1'
MINUTES,interval '1' DAY ))
where status='1'
group by name,window_start,window_end;
{code}
Error
org.apache.flink.table.api.TableException: StreamPhysicalWindowAggregate
doesn't support consuming update and delete changes which is produced by node
TableSourceScan(table=[[default_catalog, default_database,custom_kafka]],
fields=[name, money, status,createtime,operation_ts])
But I found Group Window Aggregation is works when use cdc table
select DATE_FORMAT(TUMBLE_END(createtime,interval '10' MINUTES),'yyyy-MM-dd')
as date_str,sum(money) as total,name
from custom_kafka
where status='1'
group by name,TUMBLE(createtime,interval '10' MINUTES)
> support consuming update and delete changes In Windowing TVFs
> -------------------------------------------------------------
>
> Key: FLINK-27539
> URL: https://issues.apache.org/jira/browse/FLINK-27539
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / API
> Affects Versions: 1.15.0
> Reporter: hjw
> Priority: Major
>
> custom_kafka is a cdc table
> sql:
> {code:java}
> select DATE_FORMAT(window_end,'yyyy-MM-dd') as date_str,sum(money) as
> total,name
> from TABLE(CUMULATE(TABLE custom_kafka,descriptor(createtime),interval '1'
> MINUTES,interval '1' DAY ))
> where status='1'
> group by name,window_start,window_end;
> {code}
> Error
> {code:java}
> org.apache.flink.table.api.TableException: StreamPhysicalWindowAggregate
> doesn't support consuming update and delete changes which is produced by node
> TableSourceScan(table=[[default_catalog, default_database,custom_kafka]],
> fields=[name, money, status,createtime,operation_ts])
> {code}
> But I found Group Window Aggregation is works when use cdc table
> {code:java}
> select DATE_FORMAT(TUMBLE_END(createtime,interval '10' MINUTES),'yyyy-MM-dd')
> as date_str,sum(money) as total,name
> from custom_kafka
> where status='1'
> group by name,TUMBLE(createtime,interval '10' MINUTES)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)