This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a5223c49e5d0a3b221b5f59238dc765be90088bc Author: Sergey Nuyanzin <[email protected]> AuthorDate: Sun Feb 22 11:19:11 2026 +0100 [FLINK-38624][table] Type Mismatch Exception in StreamPhysicalOverAggregateRule This closes #27616. --- .../planner/calcite/RelTimeIndicatorConverter.java | 53 ++++++- .../nodes/logical/FlinkLogicalOverAggregate.scala | 13 +- .../plan/stream/sql/agg/OverAggregateTest.xml | 159 +++++++++++++++++++++ .../plan/stream/sql/agg/OverAggregateTest.scala | 138 ++++++++++++++++++ 4 files changed, 359 insertions(+), 4 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java index 6403708c216..168d4df4fb0 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java @@ -65,6 +65,7 @@ import org.apache.calcite.rel.core.Aggregate; import org.apache.calcite.rel.core.AggregateCall; import org.apache.calcite.rel.core.SetOp; import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.core.Window; import org.apache.calcite.rel.logical.LogicalCalc; import org.apache.calcite.rel.logical.LogicalTableModify; import org.apache.calcite.rel.type.RelDataType; @@ -78,6 +79,7 @@ import org.apache.calcite.rex.RexPatternFieldRef; import org.apache.calcite.rex.RexProgram; import org.apache.calcite.rex.RexProgramBuilder; import org.apache.calcite.rex.RexShuttle; +import org.apache.calcite.sql.SqlAggFunction; import org.apache.calcite.sql.SqlOperator; import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.type.SqlTypeName; @@ -143,10 +145,11 @@ public final class RelTimeIndicatorConverter extends RelHomogeneousShuttle { || node instanceof FlinkLogicalDistribution || node instanceof FlinkLogicalWatermarkAssigner || node instanceof FlinkLogicalSort - || node instanceof FlinkLogicalOverAggregate || node instanceof FlinkLogicalExpand || node instanceof FlinkLogicalScriptTransform) { return visitSimpleRel(node); + } else if (node instanceof FlinkLogicalOverAggregate) { + return visitLogicalOverAggregate((FlinkLogicalOverAggregate) node); } else if (node instanceof FlinkLogicalWindowAggregate) { return visitWindowAggregate((FlinkLogicalWindowAggregate) node); } else if (node instanceof FlinkLogicalWindowTableAggregate) { @@ -236,6 +239,54 @@ public final class RelTimeIndicatorConverter extends RelHomogeneousShuttle { newInterval); } + private RelNode visitLogicalOverAggregate(FlinkLogicalOverAggregate logical) { + final RelNode newInput = logical.getInput().accept(this); + final List<RelDataType> newDataTypeList = + new ArrayList<>( + newInput.getRowType().getFieldList().stream() + .map(RelDataTypeField::getType) + .collect(Collectors.toList())); + + final List<Window.Group> windowGroups = new ArrayList<>(); + for (Window.Group group : logical.groups) { + final List<Window.RexWinAggCall> winCalls = new ArrayList<>(); + for (Window.RexWinAggCall call : group.aggCalls) { + RelDataType callType; + if (isTimeIndicatorType(call.getType())) { + callType = + timestamp( + call.getType().isNullable(), + isTimestampLtzType(call.getType())); + } else { + callType = call.getType(); + } + winCalls.add( + new Window.RexWinAggCall( + (SqlAggFunction) call.op, + callType, + call.getOperands(), + call.ordinal, + call.distinct, + call.ignoreNulls)); + newDataTypeList.add(callType); + } + windowGroups.add( + new Window.Group( + group.keys, + group.isRows, + group.lowerBound, + group.upperBound, + group.orderKeys, + winCalls)); + } + + final RelDataType newType = + logical.getCluster() + .getTypeFactory() + .createStructType(newDataTypeList, logical.getRowType().getFieldNames()); + return logical.copy(logical.getTraitSet(), List.of(newInput), newType, windowGroups); + } + private RelNode visitCalc(FlinkLogicalCalc calc) { // visit children and update inputs RelNode newInput = calc.getInput().accept(this); diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalOverAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalOverAggregate.scala index 899a1719094..61a35b2fb00 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalOverAggregate.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalOverAggregate.scala @@ -52,15 +52,22 @@ class FlinkLogicalOverAggregate( with FlinkLogicalRel { override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = { + copy(traitSet, inputs, rowType, windowGroups) + } + + def copy( + traitSet: RelTraitSet, + inputs: JList[RelNode], + rowType: RelDataType, + groups: JList[Window.Group]): RelNode = { new FlinkLogicalOverAggregate( cluster, traitSet, inputs.get(0), windowConstants, - getRowType, - windowGroups) + rowType, + groups) } - } class FlinkLogicalOverAggregateConverter(config: Config) extends ConverterRule(config) { diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml index 40f7b8bf809..dc0185da0ec 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml @@ -523,6 +523,165 @@ Calc(select=[c, w0$o0 AS cnt1, CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS cnt2]) +- Exchange(distribution=[hash[c]]) +- Calc(select=[a, c, rowtime]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testTemporalJoinWithWatermarks"> + <Resource name="sql"> + <![CDATA[ +SELECT count(o.amount) OVER (PARTITION BY o.product_id) AS amount_count +FROM orders AS o +LEFT JOIN products FOR SYSTEM_TIME AS OF o.order_ts AS p +ON o.product_id = p.product_id +]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(amount_count=[COUNT($1) OVER (PARTITION BY $0)]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 2}]) + :- LogicalWatermarkAssigner(rowtime=[order_ts], watermark=[-($2, 5000:INTERVAL SECOND)]) + : +- LogicalTableScan(table=[[default_catalog, default_database, orders]]) + +- LogicalFilter(condition=[=($cor0.product_id, $0)]) + +- LogicalSnapshot(period=[$cor0.order_ts]) + +- LogicalWatermarkAssigner(rowtime=[mod_record_ts], watermark=[-($2, 60000:INTERVAL SECOND)]) + +- LogicalProject(product_id=[$0], record_ts=[$1], mod_record_ts=[TO_TIMESTAMP($1)]) + +- LogicalTableScan(table=[[default_catalog, default_database, products]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +Calc(select=[w0$o0 AS $0]) ++- OverAggregate(partitionBy=[product_id], orderBy=[], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[product_id, amount, order_ts, product_id0, record_ts, mod_record_ts, COUNT(amount) AS w0$o0]) + +- Exchange(distribution=[hash[product_id]]) + +- Calc(select=[product_id, amount, order_ts, product_id0, record_ts, CAST(mod_record_ts AS TIMESTAMP(3)) AS mod_record_ts]) + +- TemporalJoin(joinType=[LeftOuterJoin], where=[((product_id = product_id0) AND __TEMPORAL_JOIN_CONDITION(order_ts, mod_record_ts, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(product_id0), __TEMPORAL_JOIN_LEFT_KEY(product_id), __TEMPORAL_JOIN_RIGHT_KEY(product_id0)))], select=[product_id, amount, order_ts, product_id0, record_ts, mod_record_ts]) + :- Exchange(distribution=[hash[product_id]]) + : +- WatermarkAssigner(rowtime=[order_ts], watermark=[(order_ts - 5000:INTERVAL SECOND)]) + : +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[product_id, amount, order_ts]) + +- Exchange(distribution=[hash[product_id]]) + +- WatermarkAssigner(rowtime=[mod_record_ts], watermark=[(mod_record_ts - 60000:INTERVAL SECOND)]) + +- Calc(select=[product_id, record_ts, TO_TIMESTAMP(record_ts) AS mod_record_ts]) + +- TableSourceScan(table=[[default_catalog, default_database, products]], fields=[product_id, record_ts]) +]]> + </Resource> + </TestCase> + <TestCase name="testTemporalJoinWithWatermarksMix"> + <Resource name="sql"> + <![CDATA[ +SELECT first_value(o.order_ts) OVER (PARTITION BY o.product_id) AS first_order_ts, + min(o.order_ts) OVER (PARTITION BY o.product_id) AS min_order_ts, + max(o.amount) OVER (PARTITION BY o.product_id) AS max_amount +FROM orders AS o +LEFT JOIN products FOR SYSTEM_TIME AS OF o.order_ts AS p +ON o.product_id = p.product_id +]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(first_order_ts=[FIRST_VALUE($2) OVER (PARTITION BY $0)], min_order_ts=[MIN($2) OVER (PARTITION BY $0)], max_amount=[MAX($1) OVER (PARTITION BY $0)]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 2}]) + :- LogicalWatermarkAssigner(rowtime=[order_ts], watermark=[-($2, 5000:INTERVAL SECOND)]) + : +- LogicalTableScan(table=[[default_catalog, default_database, orders]]) + +- LogicalFilter(condition=[=($cor0.product_id, $0)]) + +- LogicalSnapshot(period=[$cor0.order_ts]) + +- LogicalWatermarkAssigner(rowtime=[mod_record_ts], watermark=[-($2, 60000:INTERVAL SECOND)]) + +- LogicalProject(product_id=[$0], record_ts=[$1], mod_record_ts=[TO_TIMESTAMP($1)]) + +- LogicalTableScan(table=[[default_catalog, default_database, products]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +Calc(select=[w0$o0 AS $0, w0$o1 AS $1, w0$o2 AS $2]) ++- OverAggregate(partitionBy=[product_id], orderBy=[], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[product_id, amount, order_ts, product_id0, record_ts, mod_record_ts, FIRST_VALUE(order_ts) AS w0$o0, MIN(order_ts) AS w0$o1, MAX(amount) AS w0$o2]) + +- Exchange(distribution=[hash[product_id]]) + +- Calc(select=[product_id, amount, order_ts, product_id0, record_ts, CAST(mod_record_ts AS TIMESTAMP(3)) AS mod_record_ts]) + +- TemporalJoin(joinType=[LeftOuterJoin], where=[((product_id = product_id0) AND __TEMPORAL_JOIN_CONDITION(order_ts, mod_record_ts, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(product_id0), __TEMPORAL_JOIN_LEFT_KEY(product_id), __TEMPORAL_JOIN_RIGHT_KEY(product_id0)))], select=[product_id, amount, order_ts, product_id0, record_ts, mod_record_ts]) + :- Exchange(distribution=[hash[product_id]]) + : +- WatermarkAssigner(rowtime=[order_ts], watermark=[(order_ts - 5000:INTERVAL SECOND)]) + : +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[product_id, amount, order_ts]) + +- Exchange(distribution=[hash[product_id]]) + +- WatermarkAssigner(rowtime=[mod_record_ts], watermark=[(mod_record_ts - 60000:INTERVAL SECOND)]) + +- Calc(select=[product_id, record_ts, TO_TIMESTAMP(record_ts) AS mod_record_ts]) + +- TableSourceScan(table=[[default_catalog, default_database, products]], fields=[product_id, record_ts]) +]]> + </Resource> + </TestCase> + <TestCase name="testTemporalJoinWithWatermarksSeveralFunctions"> + <Resource name="sql"> + <![CDATA[ +SELECT last_value(o.amount) OVER (PARTITION BY o.product_id ORDER BY o.order_ts) AS last_amount, + lag(o.amount) OVER (PARTITION BY o.product_id ORDER BY o.order_ts) AS prev_amount +FROM orders AS o +LEFT JOIN products FOR SYSTEM_TIME AS OF o.order_ts AS p +ON o.product_id = p.product_id +]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(last_amount=[LAST_VALUE($1) OVER (PARTITION BY $0 ORDER BY $2 NULLS FIRST)], prev_amount=[LAG($1) OVER (PARTITION BY $0 ORDER BY $2 NULLS FIRST)]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 2}]) + :- LogicalWatermarkAssigner(rowtime=[order_ts], watermark=[-($2, 5000:INTERVAL SECOND)]) + : +- LogicalTableScan(table=[[default_catalog, default_database, orders]]) + +- LogicalFilter(condition=[=($cor0.product_id, $0)]) + +- LogicalSnapshot(period=[$cor0.order_ts]) + +- LogicalWatermarkAssigner(rowtime=[mod_record_ts], watermark=[-($2, 60000:INTERVAL SECOND)]) + +- LogicalProject(product_id=[$0], record_ts=[$1], mod_record_ts=[TO_TIMESTAMP($1)]) + +- LogicalTableScan(table=[[default_catalog, default_database, products]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +Calc(select=[w0$o0 AS $0, w0$o1 AS $1]) ++- OverAggregate(partitionBy=[product_id], orderBy=[order_ts ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[product_id, amount, order_ts, product_id0, record_ts, mod_record_ts, LAST_VALUE(amount) AS w0$o0, LAG(amount) AS w0$o1]) + +- Exchange(distribution=[hash[product_id]]) + +- Calc(select=[product_id, amount, order_ts, product_id0, record_ts, CAST(mod_record_ts AS TIMESTAMP(3)) AS mod_record_ts]) + +- TemporalJoin(joinType=[LeftOuterJoin], where=[((product_id = product_id0) AND __TEMPORAL_JOIN_CONDITION(order_ts, mod_record_ts, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(product_id0), __TEMPORAL_JOIN_LEFT_KEY(product_id), __TEMPORAL_JOIN_RIGHT_KEY(product_id0)))], select=[product_id, amount, order_ts, product_id0, record_ts, mod_record_ts]) + :- Exchange(distribution=[hash[product_id]]) + : +- WatermarkAssigner(rowtime=[order_ts], watermark=[(order_ts - 5000:INTERVAL SECOND)]) + : +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[product_id, amount, order_ts]) + +- Exchange(distribution=[hash[product_id]]) + +- WatermarkAssigner(rowtime=[mod_record_ts], watermark=[(mod_record_ts - 60000:INTERVAL SECOND)]) + +- Calc(select=[product_id, record_ts, TO_TIMESTAMP(record_ts) AS mod_record_ts]) + +- TableSourceScan(table=[[default_catalog, default_database, products]], fields=[product_id, record_ts]) +]]> + </Resource> + </TestCase> + <TestCase name="testTemporalJoinWithWatermarksWithMaterializedTimeArg"> + <Resource name="sql"> + <![CDATA[ +SELECT count(o.order_ts) OVER (PARTITION BY o.product_id) AS total_order_ts +FROM orders AS o +LEFT JOIN products FOR SYSTEM_TIME AS OF o.order_ts AS p +ON o.product_id = p.product_id +]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(total_order_ts=[COUNT($2) OVER (PARTITION BY $0)]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 2}]) + :- LogicalWatermarkAssigner(rowtime=[order_ts], watermark=[-($2, 5000:INTERVAL SECOND)]) + : +- LogicalTableScan(table=[[default_catalog, default_database, orders]]) + +- LogicalFilter(condition=[=($cor0.product_id, $0)]) + +- LogicalSnapshot(period=[$cor0.order_ts]) + +- LogicalWatermarkAssigner(rowtime=[mod_record_ts], watermark=[-($2, 60000:INTERVAL SECOND)]) + +- LogicalProject(product_id=[$0], record_ts=[$1], mod_record_ts=[TO_TIMESTAMP($1)]) + +- LogicalTableScan(table=[[default_catalog, default_database, products]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +Calc(select=[w0$o0 AS $0]) ++- OverAggregate(partitionBy=[product_id], orderBy=[], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[product_id, amount, order_ts, product_id0, record_ts, mod_record_ts, COUNT(order_ts) AS w0$o0]) + +- Exchange(distribution=[hash[product_id]]) + +- Calc(select=[product_id, amount, order_ts, product_id0, record_ts, CAST(mod_record_ts AS TIMESTAMP(3)) AS mod_record_ts]) + +- TemporalJoin(joinType=[LeftOuterJoin], where=[((product_id = product_id0) AND __TEMPORAL_JOIN_CONDITION(order_ts, mod_record_ts, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(product_id0), __TEMPORAL_JOIN_LEFT_KEY(product_id), __TEMPORAL_JOIN_RIGHT_KEY(product_id0)))], select=[product_id, amount, order_ts, product_id0, record_ts, mod_record_ts]) + :- Exchange(distribution=[hash[product_id]]) + : +- WatermarkAssigner(rowtime=[order_ts], watermark=[(order_ts - 5000:INTERVAL SECOND)]) + : +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[product_id, amount, order_ts]) + +- Exchange(distribution=[hash[product_id]]) + +- WatermarkAssigner(rowtime=[mod_record_ts], watermark=[(mod_record_ts - 60000:INTERVAL SECOND)]) + +- Calc(select=[product_id, record_ts, TO_TIMESTAMP(record_ts) AS mod_record_ts]) + +- TableSourceScan(table=[[default_catalog, default_database, products]], fields=[product_id, record_ts]) ]]> </Resource> </TestCase> diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.scala index 97a5dbaf1b8..b42f6617ddc 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.scala @@ -510,4 +510,142 @@ class OverAggregateTest extends TableTestBase { util.verifyExecPlan(sql) } + + @Test + def testTemporalJoinWithWatermarks(): Unit = { + util.addTable(s""" + |CREATE TABLE orders ( + | product_id STRING, + | amount BIGINT, + | order_ts TIMESTAMP(3), + | WATERMARK FOR order_ts AS order_ts - INTERVAL '5' SECONDS + |) WITH ( + | 'connector' = 'values' + |) + |""".stripMargin) + + util.addTable(s""" + |CREATE TABLE products ( + | product_id STRING, + | record_ts STRING, + | mod_record_ts AS TO_TIMESTAMP(record_ts), + | PRIMARY KEY (product_id) NOT ENFORCED, + | WATERMARK FOR mod_record_ts AS mod_record_ts - INTERVAL '60' SECONDS + |) WITH ( + | 'connector' = 'values' + |) + |""".stripMargin) + + util.verifyExecPlan(s""" + |SELECT count(o.amount) OVER (PARTITION BY o.product_id) AS amount_count + |FROM orders AS o + |LEFT JOIN products FOR SYSTEM_TIME AS OF o.order_ts AS p + |ON o.product_id = p.product_id + |""".stripMargin) + } + + @Test + def testTemporalJoinWithWatermarksSeveralFunctions(): Unit = { + util.addTable(s""" + |CREATE TABLE orders ( + | product_id STRING, + | amount BIGINT, + | order_ts TIMESTAMP(3), + | WATERMARK FOR order_ts AS order_ts - INTERVAL '5' SECONDS + |) WITH ( + | 'connector' = 'values' + |) + |""".stripMargin) + + util.addTable(s""" + |CREATE TABLE products ( + | product_id STRING, + | record_ts STRING, + | mod_record_ts AS TO_TIMESTAMP(record_ts), + | PRIMARY KEY (product_id) NOT ENFORCED, + | WATERMARK FOR mod_record_ts AS mod_record_ts - INTERVAL '60' SECONDS + |) WITH ( + | 'connector' = 'values' + |) + |""".stripMargin) + + util.verifyExecPlan( + s""" + |SELECT last_value(o.amount) OVER (PARTITION BY o.product_id ORDER BY o.order_ts) AS last_amount, + | lag(o.amount) OVER (PARTITION BY o.product_id ORDER BY o.order_ts) AS prev_amount + |FROM orders AS o + |LEFT JOIN products FOR SYSTEM_TIME AS OF o.order_ts AS p + |ON o.product_id = p.product_id + |""".stripMargin) + } + + @Test + def testTemporalJoinWithWatermarksWithMaterializedTimeArg(): Unit = { + util.addTable(s""" + |CREATE TABLE orders ( + | product_id STRING, + | amount BIGINT, + | order_ts TIMESTAMP(3), + | WATERMARK FOR order_ts AS order_ts - INTERVAL '5' SECONDS + |) WITH ( + | 'connector' = 'values' + |) + |""".stripMargin) + + util.addTable(s""" + |CREATE TABLE products ( + | product_id STRING, + | record_ts STRING, + | mod_record_ts AS TO_TIMESTAMP(record_ts), + | PRIMARY KEY (product_id) NOT ENFORCED, + | WATERMARK FOR mod_record_ts AS mod_record_ts - INTERVAL '60' SECONDS + |) WITH ( + | 'connector' = 'values' + |) + |""".stripMargin) + + util.verifyExecPlan( + s""" + |SELECT count(o.order_ts) OVER (PARTITION BY o.product_id) AS total_order_ts + |FROM orders AS o + |LEFT JOIN products FOR SYSTEM_TIME AS OF o.order_ts AS p + |ON o.product_id = p.product_id + |""".stripMargin) + } + + @Test + def testTemporalJoinWithWatermarksMix(): Unit = { + util.addTable(s""" + |CREATE TABLE orders ( + | product_id STRING, + | amount BIGINT, + | order_ts TIMESTAMP(3), + | WATERMARK FOR order_ts AS order_ts - INTERVAL '5' SECONDS + |) WITH ( + | 'connector' = 'values' + |) + |""".stripMargin) + + util.addTable(s""" + |CREATE TABLE products ( + | product_id STRING, + | record_ts STRING, + | mod_record_ts AS TO_TIMESTAMP(record_ts), + | PRIMARY KEY (product_id) NOT ENFORCED, + | WATERMARK FOR mod_record_ts AS mod_record_ts - INTERVAL '60' SECONDS + |) WITH ( + | 'connector' = 'values' + |) + |""".stripMargin) + + util.verifyExecPlan( + s""" + |SELECT first_value(o.order_ts) OVER (PARTITION BY o.product_id) AS first_order_ts, + | min(o.order_ts) OVER (PARTITION BY o.product_id) AS min_order_ts, + | max(o.amount) OVER (PARTITION BY o.product_id) AS max_amount + |FROM orders AS o + |LEFT JOIN products FOR SYSTEM_TIME AS OF o.order_ts AS p + |ON o.product_id = p.product_id + |""".stripMargin) + } }
