[ 
https://issues.apache.org/jira/browse/FLINK-27539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hjw updated FLINK-27539:
------------------------
    Description: 
custom_kafka is a cdc table

sql:
{code:java}
select DATE_FORMAT(window_end,'yyyy-MM-dd') as date_str,sum(money) as total,name
from TABLE(CUMULATE(TABLE custom_kafka,descriptor(createtime),interval '1' 
MINUTES,interval '1' DAY ))
where status='1'
group by name,window_start,window_end;
{code}

Error

{code:java}
Exception in thread "main" org.apache.flink.table.api.TableException: 
StreamPhysicalWindowAggregate doesn't support consuming update and delete 
changes which is produced by node TableSourceScan(table=[[default_catalog, 
default_database, custom_kafka, watermark=[-(createtime, 5000:INTERVAL 
SECOND)]]], fields=[name, money, status, createtime, operation_ts])
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:396)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:315)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:353)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:342)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:341)
 at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
 at scala.collection.immutable.Range.foreach(Range.scala:155)
 at scala.collection.TraversableLike.map(TraversableLike.scala:233)
 at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
 at scala.collection.AbstractTraversable.map(Traversable.scala:104)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:341)
{code}


But I found Group Window Aggregation is works when use cdc table
{code:java}
select DATE_FORMAT(TUMBLE_END(createtime,interval '10' MINUTES),'yyyy-MM-dd') 
as date_str,sum(money) as total,name
from custom_kafka
where status='1'
group by name,TUMBLE(createtime,interval '10' MINUTES)
{code}


  was:
custom_kafka is a cdc table

sql:
{code:java}
select DATE_FORMAT(window_end,'yyyy-MM-dd') as date_str,sum(money) as total,name
from TABLE(CUMULATE(TABLE custom_kafka,descriptor(createtime),interval '1' 
MINUTES,interval '1' DAY ))
where status='1'
group by name,window_start,window_end;
{code}

Error

{code:java}
org.apache.flink.table.api.TableException: StreamPhysicalWindowAggregate 
doesn't support consuming update and delete changes which is produced by node 
TableSourceScan(table=[[default_catalog, default_database,custom_kafka]], 
fields=[name, money, status,createtime,operation_ts])
{code}


But I found Group Window Aggregation is works when use cdc table
{code:java}
select DATE_FORMAT(TUMBLE_END(createtime,interval '10' MINUTES),'yyyy-MM-dd') 
as date_str,sum(money) as total,name
from custom_kafka
where status='1'
group by name,TUMBLE(createtime,interval '10' MINUTES)
{code}



> support consuming update and delete changes In Windowing TVFs
> -------------------------------------------------------------
>
>                 Key: FLINK-27539
>                 URL: https://issues.apache.org/jira/browse/FLINK-27539
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / API
>    Affects Versions: 1.15.0
>            Reporter: hjw
>            Priority: Major
>
> custom_kafka is a cdc table
> sql:
> {code:java}
> select DATE_FORMAT(window_end,'yyyy-MM-dd') as date_str,sum(money) as 
> total,name
> from TABLE(CUMULATE(TABLE custom_kafka,descriptor(createtime),interval '1' 
> MINUTES,interval '1' DAY ))
> where status='1'
> group by name,window_start,window_end;
> {code}
> Error
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.TableException: 
> StreamPhysicalWindowAggregate doesn't support consuming update and delete 
> changes which is produced by node TableSourceScan(table=[[default_catalog, 
> default_database, custom_kafka, watermark=[-(createtime, 5000:INTERVAL 
> SECOND)]]], fields=[name, money, status, createtime, operation_ts])
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:396)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:315)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:353)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:342)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:341)
>  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>  at scala.collection.immutable.Range.foreach(Range.scala:155)
>  at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>  at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:341)
> {code}
> But I found Group Window Aggregation is works when use cdc table
> {code:java}
> select DATE_FORMAT(TUMBLE_END(createtime,interval '10' MINUTES),'yyyy-MM-dd') 
> as date_str,sum(money) as total,name
> from custom_kafka
> where status='1'
> group by name,TUMBLE(createtime,interval '10' MINUTES)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to