[ 
https://issues.apache.org/jira/browse/FLINK-23919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuval Itzchakov updated FLINK-23919:
------------------------------------
    Description: 
Given the following Window TVF:
{code:java}
SELECT window_time, 
       MIN(alert_timestamp) as start_time, 
       MAX(alert_timestamp) as end_time 
FROM TABLE(TUMBLE(TABLE alert_table, DESCRIPTOR(alert_timestamp), INTERVAL '3' 
MINUTE)) 
WHERE service_source = 'source' GROUP BY window_start, window_end, window_time
{code}
Where the schema of alert_table is:
{code:java}
alert_timestamp: TIMESTAMP(3) ROWTIME INDICATOR
service_source: VARCHAR{code}
The following generates an invalid RowType:
{code:java}
Error while applying rule PullUpWindowTableFunctionIntoWindowAggregateRule, 
args [rel#358:StreamPhysicalWindowAggregate.STREAM_PHYSICAL.any.None: 
0.[NONE].[NONE](input=RelSubset#356,window=TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[3 min]),select=MIN(alert_timestamp) AS start_time, 
MAX(alert_timestamp) AS end_time, start('w$) AS window_start, end('w$) AS 
window_end, rowtime('w$) AS window_time), 
rel#367:StreamPhysicalExchange.STREAM_PHYSICAL.single.None: 
0.[NONE].[NONE](input=RelSubset#355,distribution=single), 
rel#354:StreamPhysicalCalc.STREAM_PHYSICAL.any.None: 
0.[NONE].[NONE](input=RelSubset#353,select=window_start, window_end, 
window_time, CAST(alert_timestamp) AS alert_timestamp,where==(service_source, 
_UTF-16LE'my source':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 
rel#352:StreamPhysicalWindowTableFunction.STREAM_PHYSICAL.any.None: 
0.[NONE].[NONE](input=RelSubset#351,window=TUMBLE(time_col=[alert_timestamp], 
size=[3 min]))]Error while applying rule 
PullUpWindowTableFunctionIntoWindowAggregateRule, args 
[rel#358:StreamPhysicalWindowAggregate.STREAM_PHYSICAL.any.None: 
0.[NONE].[NONE](input=RelSubset#356,window=TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[3 min]),select=MIN(alert_timestamp) AS start_time, 
MAX(alert_timestamp) AS end_time, start('w$) AS window_start, end('w$) AS 
window_end, rowtime('w$) AS window_time), 
rel#367:StreamPhysicalExchange.STREAM_PHYSICAL.single.None: 
0.[NONE].[NONE](input=RelSubset#355,distribution=single), 
rel#354:StreamPhysicalCalc.STREAM_PHYSICAL.any.None: 
0.[NONE].[NONE](input=RelSubset#353,select=window_start, window_end, 
window_time, CAST(alert_timestamp) AS alert_timestamp,where==(service_source, 
_UTF-16LE'Microsoft Defender for Identity':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE")), 
rel#352:StreamPhysicalWindowTableFunction.STREAM_PHYSICAL.any.None: 
0.[NONE].[NONE](input=RelSubset#351,window=TUMBLE(time_col=[alert_timestamp], 
size=[3 min]))] at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256)
 at 
org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58)
 at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510)
 at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:69)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
 at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) 
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) 
at scala.collection.Iterator.foreach(Iterator.scala:943) at 
scala.collection.Iterator.foreach$(Iterator.scala:943) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at 
scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at 
scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at 
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
 at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
 at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
 at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740)
 at 
org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:99)
 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 at java.base/java.lang.Thread.run(Thread.java:834)Caused by: 
java.lang.RuntimeException: Error occurred while applying rule 
PullUpWindowTableFunctionIntoWindowAggregateRule at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:161)
 at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:268) 
at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:283) 
at 
org.apache.flink.table.planner.plan.rules.physical.stream.PullUpWindowTableFunctionIntoWindowAggregateRule.onMatch(PullUpWindowTableFunctionIntoWindowAggregateRule.scala:143)
 at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:229)
 ... 31 moreCaused by: org.apache.flink.table.api.ValidationException: Field 
names must be unique. Found duplicates: [alert_timestamp] at 
org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:272) 
at org.apache.flink.table.types.logical.RowType.<init>(RowType.java:157) at 
org.apache.flink.table.types.logical.RowType.of(RowType.java:297) at 
org.apache.flink.table.types.logical.RowType.of(RowType.java:289) at 
org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalRowType(FlinkTypeFactory.scala:657)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowAggregate.aggInfoList$lzycompute(StreamPhysicalWindowAggregate.scala:60)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowAggregate.aggInfoList(StreamPhysicalWindowAggregate.scala:59)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowAggregate.explainTerms(StreamPhysicalWindowAggregate.scala:86)
 at 
org.apache.calcite.rel.AbstractRelNode.getDigestItems(AbstractRelNode.java:409) 
at 
org.apache.calcite.rel.AbstractRelNode.deepHashCode(AbstractRelNode.java:391) 
at 
org.apache.calcite.rel.AbstractRelNode$InnerRelDigest.hashCode(AbstractRelNode.java:443)
 at java.base/java.util.HashMap.hash(HashMap.java:339) at 
java.base/java.util.HashMap.get(HashMap.java:552) at 
org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1150)
 at 
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
 at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
 at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:148)
{code}
Looking at the code, it seems that when 
PullUpWindowTableFunctionIntoWindowAggregateRule is building the new Calc in 
WindowUtil.buildNewProgramWithoutWindowColumns, it is adding the rowtime column 
from the input row to the new calc without checking to see if there are any 
name collisions. Also, TBH I'm not entirely sure yet why the rowtime column of 
the input table is being added to the projected output row like that?

 

[~jark] would appreciate your help with this.

  was:
Given the following Window TVF:
{code:java}
SELECT window_time, 
       MIN(alert_timestamp) as start_time, 
       MAX(alert_timestamp) as end_time 
FROM TABLE(TUMBLE(TABLE alert_table, DESCRIPTOR(alert_timestamp), INTERVAL '3' 
MINUTE)) 
WHERE service_source = 'source' GROUP BY window_start, window_end, window_time
{code}
Where the schema of alert_table is:
{code:java}
alert_timestamp: TIMESTAMP(3) ROWTIME INDICATOR
service_source: VARCHAR{code}
The following generates an invalid RowType:
{code:java}
Error while applying rule PullUpWindowTableFunctionIntoWindowAggregateRule, 
args [rel#358:StreamPhysicalWindowAggregate.STREAM_PHYSICAL.any.None: 
0.[NONE].[NONE](input=RelSubset#356,window=TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[3 min]),select=MIN(alert_timestamp) AS start_time, 
MAX(alert_timestamp) AS end_time, start('w$) AS window_start, end('w$) AS 
window_end, rowtime('w$) AS window_time), 
rel#367:StreamPhysicalExchange.STREAM_PHYSICAL.single.None: 
0.[NONE].[NONE](input=RelSubset#355,distribution=single), 
rel#354:StreamPhysicalCalc.STREAM_PHYSICAL.any.None: 
0.[NONE].[NONE](input=RelSubset#353,select=window_start, window_end, 
window_time, CAST(alert_timestamp) AS alert_timestamp,where==(service_source, 
_UTF-16LE'my source':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 
rel#352:StreamPhysicalWindowTableFunction.STREAM_PHYSICAL.any.None: 
0.[NONE].[NONE](input=RelSubset#351,window=TUMBLE(time_col=[alert_timestamp], 
size=[3 min]))]Error while applying rule 
PullUpWindowTableFunctionIntoWindowAggregateRule, args 
[rel#358:StreamPhysicalWindowAggregate.STREAM_PHYSICAL.any.None: 
0.[NONE].[NONE](input=RelSubset#356,window=TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[3 min]),select=MIN(alert_timestamp) AS start_time, 
MAX(alert_timestamp) AS end_time, start('w$) AS window_start, end('w$) AS 
window_end, rowtime('w$) AS window_time), 
rel#367:StreamPhysicalExchange.STREAM_PHYSICAL.single.None: 
0.[NONE].[NONE](input=RelSubset#355,distribution=single), 
rel#354:StreamPhysicalCalc.STREAM_PHYSICAL.any.None: 
0.[NONE].[NONE](input=RelSubset#353,select=window_start, window_end, 
window_time, CAST(alert_timestamp) AS alert_timestamp,where==(service_source, 
_UTF-16LE'Microsoft Defender for Identity':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE")), 
rel#352:StreamPhysicalWindowTableFunction.STREAM_PHYSICAL.any.None: 
0.[NONE].[NONE](input=RelSubset#351,window=TUMBLE(time_col=[alert_timestamp], 
size=[3 min]))] at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256)
 at 
org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58)
 at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510)
 at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:69)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
 at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) 
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) 
at scala.collection.Iterator.foreach(Iterator.scala:943) at 
scala.collection.Iterator.foreach$(Iterator.scala:943) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at 
scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at 
scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at 
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
 at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
 at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
 at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740)
 at 
org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:99)
 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 at java.base/java.lang.Thread.run(Thread.java:834)Caused by: 
java.lang.RuntimeException: Error occurred while applying rule 
PullUpWindowTableFunctionIntoWindowAggregateRule at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:161)
 at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:268) 
at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:283) 
at 
org.apache.flink.table.planner.plan.rules.physical.stream.PullUpWindowTableFunctionIntoWindowAggregateRule.onMatch(PullUpWindowTableFunctionIntoWindowAggregateRule.scala:143)
 at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:229)
 ... 31 moreCaused by: org.apache.flink.table.api.ValidationException: Field 
names must be unique. Found duplicates: [alert_timestamp] at 
org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:272) 
at org.apache.flink.table.types.logical.RowType.<init>(RowType.java:157) at 
org.apache.flink.table.types.logical.RowType.of(RowType.java:297) at 
org.apache.flink.table.types.logical.RowType.of(RowType.java:289) at 
org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalRowType(FlinkTypeFactory.scala:657)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowAggregate.aggInfoList$lzycompute(StreamPhysicalWindowAggregate.scala:60)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowAggregate.aggInfoList(StreamPhysicalWindowAggregate.scala:59)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowAggregate.explainTerms(StreamPhysicalWindowAggregate.scala:86)
 at 
org.apache.calcite.rel.AbstractRelNode.getDigestItems(AbstractRelNode.java:409) 
at 
org.apache.calcite.rel.AbstractRelNode.deepHashCode(AbstractRelNode.java:391) 
at 
org.apache.calcite.rel.AbstractRelNode$InnerRelDigest.hashCode(AbstractRelNode.java:443)
 at java.base/java.util.HashMap.hash(HashMap.java:339) at 
java.base/java.util.HashMap.get(HashMap.java:552) at 
org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1150)
 at 
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
 at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
 at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:148)
{code}
Looking at the code, it seems that when 
PullUpWindowTableFunctionIntoWindowAggregateRule is building the new Calc in 
WindowUtil.buildNewProgramWithoutWindowColumns, it is adding the rowtime column 
from the input row to the new calc without checking to see if there are any 
name collisions. Also, TBH I'm not entirely sure yet why the rowtime column of 
the input table is being added to the projected output row like that?


> PullUpWindowTableFunctionIntoWindowAggregateRule generates invalid Calc for 
> Window TVF
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-23919
>                 URL: https://issues.apache.org/jira/browse/FLINK-23919
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.13.2
>            Reporter: Yuval Itzchakov
>            Priority: Major
>
> Given the following Window TVF:
> {code:java}
> SELECT window_time, 
>        MIN(alert_timestamp) as start_time, 
>        MAX(alert_timestamp) as end_time 
> FROM TABLE(TUMBLE(TABLE alert_table, DESCRIPTOR(alert_timestamp), INTERVAL 
> '3' MINUTE)) 
> WHERE service_source = 'source' GROUP BY window_start, window_end, window_time
> {code}
> Where the schema of alert_table is:
> {code:java}
> alert_timestamp: TIMESTAMP(3) ROWTIME INDICATOR
> service_source: VARCHAR{code}
> The following generates an invalid RowType:
> {code:java}
> Error while applying rule PullUpWindowTableFunctionIntoWindowAggregateRule, 
> args [rel#358:StreamPhysicalWindowAggregate.STREAM_PHYSICAL.any.None: 
> 0.[NONE].[NONE](input=RelSubset#356,window=TUMBLE(win_start=[window_start], 
> win_end=[window_end], size=[3 min]),select=MIN(alert_timestamp) AS 
> start_time, MAX(alert_timestamp) AS end_time, start('w$) AS window_start, 
> end('w$) AS window_end, rowtime('w$) AS window_time), 
> rel#367:StreamPhysicalExchange.STREAM_PHYSICAL.single.None: 
> 0.[NONE].[NONE](input=RelSubset#355,distribution=single), 
> rel#354:StreamPhysicalCalc.STREAM_PHYSICAL.any.None: 
> 0.[NONE].[NONE](input=RelSubset#353,select=window_start, window_end, 
> window_time, CAST(alert_timestamp) AS alert_timestamp,where==(service_source, 
> _UTF-16LE'my source':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 
> rel#352:StreamPhysicalWindowTableFunction.STREAM_PHYSICAL.any.None: 
> 0.[NONE].[NONE](input=RelSubset#351,window=TUMBLE(time_col=[alert_timestamp], 
> size=[3 min]))]Error while applying rule 
> PullUpWindowTableFunctionIntoWindowAggregateRule, args 
> [rel#358:StreamPhysicalWindowAggregate.STREAM_PHYSICAL.any.None: 
> 0.[NONE].[NONE](input=RelSubset#356,window=TUMBLE(win_start=[window_start], 
> win_end=[window_end], size=[3 min]),select=MIN(alert_timestamp) AS 
> start_time, MAX(alert_timestamp) AS end_time, start('w$) AS window_start, 
> end('w$) AS window_end, rowtime('w$) AS window_time), 
> rel#367:StreamPhysicalExchange.STREAM_PHYSICAL.single.None: 
> 0.[NONE].[NONE](input=RelSubset#355,distribution=single), 
> rel#354:StreamPhysicalCalc.STREAM_PHYSICAL.any.None: 
> 0.[NONE].[NONE](input=RelSubset#353,select=window_start, window_end, 
> window_time, CAST(alert_timestamp) AS alert_timestamp,where==(service_source, 
> _UTF-16LE'Microsoft Defender for Identity':VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE")), 
> rel#352:StreamPhysicalWindowTableFunction.STREAM_PHYSICAL.any.None: 
> 0.[NONE].[NONE](input=RelSubset#351,window=TUMBLE(time_col=[alert_timestamp], 
> size=[3 min]))] at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256)
>  at 
> org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58)
>  at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510)
>  at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312) 
> at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:69)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
>  at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at 
> scala.collection.Iterator.foreach(Iterator.scala:943) at 
> scala.collection.Iterator.foreach$(Iterator.scala:943) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at 
> scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at 
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at 
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
>  at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
>  at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
>  at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>  at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
>  at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740)
>  at 
> org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:99)
>  
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  at java.base/java.lang.Thread.run(Thread.java:834)Caused by: 
> java.lang.RuntimeException: Error occurred while applying rule 
> PullUpWindowTableFunctionIntoWindowAggregateRule at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:161)
>  at 
> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:268) 
> at 
> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:283) 
> at 
> org.apache.flink.table.planner.plan.rules.physical.stream.PullUpWindowTableFunctionIntoWindowAggregateRule.onMatch(PullUpWindowTableFunctionIntoWindowAggregateRule.scala:143)
>  at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:229)
>  ... 31 moreCaused by: org.apache.flink.table.api.ValidationException: Field 
> names must be unique. Found duplicates: [alert_timestamp] at 
> org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:272) 
> at org.apache.flink.table.types.logical.RowType.<init>(RowType.java:157) at 
> org.apache.flink.table.types.logical.RowType.of(RowType.java:297) at 
> org.apache.flink.table.types.logical.RowType.of(RowType.java:289) at 
> org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalRowType(FlinkTypeFactory.scala:657)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowAggregate.aggInfoList$lzycompute(StreamPhysicalWindowAggregate.scala:60)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowAggregate.aggInfoList(StreamPhysicalWindowAggregate.scala:59)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowAggregate.explainTerms(StreamPhysicalWindowAggregate.scala:86)
>  at 
> org.apache.calcite.rel.AbstractRelNode.getDigestItems(AbstractRelNode.java:409)
>  at 
> org.apache.calcite.rel.AbstractRelNode.deepHashCode(AbstractRelNode.java:391) 
> at 
> org.apache.calcite.rel.AbstractRelNode$InnerRelDigest.hashCode(AbstractRelNode.java:443)
>  at java.base/java.util.HashMap.hash(HashMap.java:339) at 
> java.base/java.util.HashMap.get(HashMap.java:552) at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1150)
>  at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
>  at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
>  at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:148)
> {code}
> Looking at the code, it seems that when 
> PullUpWindowTableFunctionIntoWindowAggregateRule is building the new Calc in 
> WindowUtil.buildNewProgramWithoutWindowColumns, it is adding the rowtime 
> column from the input row to the new calc without checking to see if there 
> are any name collisions. Also, TBH I'm not entirely sure yet why the rowtime 
> column of the input table is being added to the projected output row like 
> that?
>  
> [~jark] would appreciate your help with this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to