This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch release-2.2
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.2 by this push:
new 7b9a4fe372e [FLINK-38624][table] Type Mismatch Exception in
StreamPhysicalOverAggregateRule
7b9a4fe372e is described below
commit 7b9a4fe372ed000a98007ad5f979ac5da666e1e2
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Mon Mar 2 15:58:59 2026 +0100
[FLINK-38624][table] Type Mismatch Exception in
StreamPhysicalOverAggregateRule
---
.../planner/calcite/RelTimeIndicatorConverter.java | 53 ++++++-
.../nodes/logical/FlinkLogicalOverAggregate.scala | 13 +-
.../plan/stream/sql/agg/OverAggregateTest.xml | 159 +++++++++++++++++++++
.../plan/stream/sql/agg/OverAggregateTest.scala | 137 ++++++++++++++++++
4 files changed, 358 insertions(+), 4 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
index 6403708c216..168d4df4fb0 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
@@ -65,6 +65,7 @@ import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.SetOp;
import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.Window;
import org.apache.calcite.rel.logical.LogicalCalc;
import org.apache.calcite.rel.logical.LogicalTableModify;
import org.apache.calcite.rel.type.RelDataType;
@@ -78,6 +79,7 @@ import org.apache.calcite.rex.RexPatternFieldRef;
import org.apache.calcite.rex.RexProgram;
import org.apache.calcite.rex.RexProgramBuilder;
import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -143,10 +145,11 @@ public final class RelTimeIndicatorConverter extends
RelHomogeneousShuttle {
|| node instanceof FlinkLogicalDistribution
|| node instanceof FlinkLogicalWatermarkAssigner
|| node instanceof FlinkLogicalSort
- || node instanceof FlinkLogicalOverAggregate
|| node instanceof FlinkLogicalExpand
|| node instanceof FlinkLogicalScriptTransform) {
return visitSimpleRel(node);
+ } else if (node instanceof FlinkLogicalOverAggregate) {
+ return visitLogicalOverAggregate((FlinkLogicalOverAggregate) node);
} else if (node instanceof FlinkLogicalWindowAggregate) {
return visitWindowAggregate((FlinkLogicalWindowAggregate) node);
} else if (node instanceof FlinkLogicalWindowTableAggregate) {
@@ -236,6 +239,54 @@ public final class RelTimeIndicatorConverter extends
RelHomogeneousShuttle {
newInterval);
}
+ private RelNode visitLogicalOverAggregate(FlinkLogicalOverAggregate
logical) {
+ final RelNode newInput = logical.getInput().accept(this);
+ final List<RelDataType> newDataTypeList =
+ new ArrayList<>(
+ newInput.getRowType().getFieldList().stream()
+ .map(RelDataTypeField::getType)
+ .collect(Collectors.toList()));
+
+ final List<Window.Group> windowGroups = new ArrayList<>();
+ for (Window.Group group : logical.groups) {
+ final List<Window.RexWinAggCall> winCalls = new ArrayList<>();
+ for (Window.RexWinAggCall call : group.aggCalls) {
+ RelDataType callType;
+ if (isTimeIndicatorType(call.getType())) {
+ callType =
+ timestamp(
+ call.getType().isNullable(),
+ isTimestampLtzType(call.getType()));
+ } else {
+ callType = call.getType();
+ }
+ winCalls.add(
+ new Window.RexWinAggCall(
+ (SqlAggFunction) call.op,
+ callType,
+ call.getOperands(),
+ call.ordinal,
+ call.distinct,
+ call.ignoreNulls));
+ newDataTypeList.add(callType);
+ }
+ windowGroups.add(
+ new Window.Group(
+ group.keys,
+ group.isRows,
+ group.lowerBound,
+ group.upperBound,
+ group.orderKeys,
+ winCalls));
+ }
+
+ final RelDataType newType =
+ logical.getCluster()
+ .getTypeFactory()
+ .createStructType(newDataTypeList,
logical.getRowType().getFieldNames());
+ return logical.copy(logical.getTraitSet(), List.of(newInput), newType,
windowGroups);
+ }
+
private RelNode visitCalc(FlinkLogicalCalc calc) {
// visit children and update inputs
RelNode newInput = calc.getInput().accept(this);
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalOverAggregate.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalOverAggregate.scala
index 899a1719094..61a35b2fb00 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalOverAggregate.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalOverAggregate.scala
@@ -52,15 +52,22 @@ class FlinkLogicalOverAggregate(
with FlinkLogicalRel {
override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
+ copy(traitSet, inputs, rowType, windowGroups)
+ }
+
+ def copy(
+ traitSet: RelTraitSet,
+ inputs: JList[RelNode],
+ rowType: RelDataType,
+ groups: JList[Window.Group]): RelNode = {
new FlinkLogicalOverAggregate(
cluster,
traitSet,
inputs.get(0),
windowConstants,
- getRowType,
- windowGroups)
+ rowType,
+ groups)
}
-
}
class FlinkLogicalOverAggregateConverter(config: Config) extends
ConverterRule(config) {
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml
index 40f7b8bf809..dc0185da0ec 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml
@@ -523,6 +523,165 @@ Calc(select=[c, w0$o0 AS cnt1, CASE((w0$o0 > 0), w0$o1,
null:INTEGER) AS cnt2])
+- Exchange(distribution=[hash[c]])
+- Calc(select=[a, c, rowtime])
+- DataStreamScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testTemporalJoinWithWatermarks">
+ <Resource name="sql">
+ <![CDATA[
+SELECT count(o.amount) OVER (PARTITION BY o.product_id) AS amount_count
+FROM orders AS o
+LEFT JOIN products FOR SYSTEM_TIME AS OF o.order_ts AS p
+ON o.product_id = p.product_id
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(amount_count=[COUNT($1) OVER (PARTITION BY $0)])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0,
2}])
+ :- LogicalWatermarkAssigner(rowtime=[order_ts], watermark=[-($2,
5000:INTERVAL SECOND)])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+ +- LogicalFilter(condition=[=($cor0.product_id, $0)])
+ +- LogicalSnapshot(period=[$cor0.order_ts])
+ +- LogicalWatermarkAssigner(rowtime=[mod_record_ts], watermark=[-($2,
60000:INTERVAL SECOND)])
+ +- LogicalProject(product_id=[$0], record_ts=[$1],
mod_record_ts=[TO_TIMESTAMP($1)])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
products]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[w0$o0 AS $0])
++- OverAggregate(partitionBy=[product_id], orderBy=[], window=[ RANGE BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[product_id, amount,
order_ts, product_id0, record_ts, mod_record_ts, COUNT(amount) AS w0$o0])
+ +- Exchange(distribution=[hash[product_id]])
+ +- Calc(select=[product_id, amount, order_ts, product_id0, record_ts,
CAST(mod_record_ts AS TIMESTAMP(3)) AS mod_record_ts])
+ +- TemporalJoin(joinType=[LeftOuterJoin], where=[((product_id =
product_id0) AND __TEMPORAL_JOIN_CONDITION(order_ts, mod_record_ts,
__TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(product_id0),
__TEMPORAL_JOIN_LEFT_KEY(product_id),
__TEMPORAL_JOIN_RIGHT_KEY(product_id0)))], select=[product_id, amount,
order_ts, product_id0, record_ts, mod_record_ts])
+ :- Exchange(distribution=[hash[product_id]])
+ : +- WatermarkAssigner(rowtime=[order_ts], watermark=[(order_ts -
5000:INTERVAL SECOND)])
+ : +- TableSourceScan(table=[[default_catalog,
default_database, orders]], fields=[product_id, amount, order_ts])
+ +- Exchange(distribution=[hash[product_id]])
+ +- WatermarkAssigner(rowtime=[mod_record_ts],
watermark=[(mod_record_ts - 60000:INTERVAL SECOND)])
+ +- Calc(select=[product_id, record_ts,
TO_TIMESTAMP(record_ts) AS mod_record_ts])
+ +- TableSourceScan(table=[[default_catalog,
default_database, products]], fields=[product_id, record_ts])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testTemporalJoinWithWatermarksMix">
+ <Resource name="sql">
+ <![CDATA[
+SELECT first_value(o.order_ts) OVER (PARTITION BY o.product_id) AS
first_order_ts,
+ min(o.order_ts) OVER (PARTITION BY o.product_id) AS min_order_ts,
+ max(o.amount) OVER (PARTITION BY o.product_id) AS max_amount
+FROM orders AS o
+LEFT JOIN products FOR SYSTEM_TIME AS OF o.order_ts AS p
+ON o.product_id = p.product_id
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(first_order_ts=[FIRST_VALUE($2) OVER (PARTITION BY $0)],
min_order_ts=[MIN($2) OVER (PARTITION BY $0)], max_amount=[MAX($1) OVER
(PARTITION BY $0)])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0,
2}])
+ :- LogicalWatermarkAssigner(rowtime=[order_ts], watermark=[-($2,
5000:INTERVAL SECOND)])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+ +- LogicalFilter(condition=[=($cor0.product_id, $0)])
+ +- LogicalSnapshot(period=[$cor0.order_ts])
+ +- LogicalWatermarkAssigner(rowtime=[mod_record_ts], watermark=[-($2,
60000:INTERVAL SECOND)])
+ +- LogicalProject(product_id=[$0], record_ts=[$1],
mod_record_ts=[TO_TIMESTAMP($1)])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
products]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[w0$o0 AS $0, w0$o1 AS $1, w0$o2 AS $2])
++- OverAggregate(partitionBy=[product_id], orderBy=[], window=[ RANGE BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[product_id, amount,
order_ts, product_id0, record_ts, mod_record_ts, FIRST_VALUE(order_ts) AS
w0$o0, MIN(order_ts) AS w0$o1, MAX(amount) AS w0$o2])
+ +- Exchange(distribution=[hash[product_id]])
+ +- Calc(select=[product_id, amount, order_ts, product_id0, record_ts,
CAST(mod_record_ts AS TIMESTAMP(3)) AS mod_record_ts])
+ +- TemporalJoin(joinType=[LeftOuterJoin], where=[((product_id =
product_id0) AND __TEMPORAL_JOIN_CONDITION(order_ts, mod_record_ts,
__TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(product_id0),
__TEMPORAL_JOIN_LEFT_KEY(product_id),
__TEMPORAL_JOIN_RIGHT_KEY(product_id0)))], select=[product_id, amount,
order_ts, product_id0, record_ts, mod_record_ts])
+ :- Exchange(distribution=[hash[product_id]])
+ : +- WatermarkAssigner(rowtime=[order_ts], watermark=[(order_ts -
5000:INTERVAL SECOND)])
+ : +- TableSourceScan(table=[[default_catalog,
default_database, orders]], fields=[product_id, amount, order_ts])
+ +- Exchange(distribution=[hash[product_id]])
+ +- WatermarkAssigner(rowtime=[mod_record_ts],
watermark=[(mod_record_ts - 60000:INTERVAL SECOND)])
+ +- Calc(select=[product_id, record_ts,
TO_TIMESTAMP(record_ts) AS mod_record_ts])
+ +- TableSourceScan(table=[[default_catalog,
default_database, products]], fields=[product_id, record_ts])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testTemporalJoinWithWatermarksSeveralFunctions">
+ <Resource name="sql">
+ <![CDATA[
+SELECT last_value(o.amount) OVER (PARTITION BY o.product_id ORDER BY
o.order_ts) AS last_amount,
+ lag(o.amount) OVER (PARTITION BY o.product_id ORDER BY o.order_ts) AS
prev_amount
+FROM orders AS o
+LEFT JOIN products FOR SYSTEM_TIME AS OF o.order_ts AS p
+ON o.product_id = p.product_id
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(last_amount=[LAST_VALUE($1) OVER (PARTITION BY $0 ORDER BY $2
NULLS FIRST)], prev_amount=[LAG($1) OVER (PARTITION BY $0 ORDER BY $2 NULLS
FIRST)])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0,
2}])
+ :- LogicalWatermarkAssigner(rowtime=[order_ts], watermark=[-($2,
5000:INTERVAL SECOND)])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+ +- LogicalFilter(condition=[=($cor0.product_id, $0)])
+ +- LogicalSnapshot(period=[$cor0.order_ts])
+ +- LogicalWatermarkAssigner(rowtime=[mod_record_ts], watermark=[-($2,
60000:INTERVAL SECOND)])
+ +- LogicalProject(product_id=[$0], record_ts=[$1],
mod_record_ts=[TO_TIMESTAMP($1)])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
products]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[w0$o0 AS $0, w0$o1 AS $1])
++- OverAggregate(partitionBy=[product_id], orderBy=[order_ts ASC], window=[
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[product_id, amount,
order_ts, product_id0, record_ts, mod_record_ts, LAST_VALUE(amount) AS w0$o0,
LAG(amount) AS w0$o1])
+ +- Exchange(distribution=[hash[product_id]])
+ +- Calc(select=[product_id, amount, order_ts, product_id0, record_ts,
CAST(mod_record_ts AS TIMESTAMP(3)) AS mod_record_ts])
+ +- TemporalJoin(joinType=[LeftOuterJoin], where=[((product_id =
product_id0) AND __TEMPORAL_JOIN_CONDITION(order_ts, mod_record_ts,
__TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(product_id0),
__TEMPORAL_JOIN_LEFT_KEY(product_id),
__TEMPORAL_JOIN_RIGHT_KEY(product_id0)))], select=[product_id, amount,
order_ts, product_id0, record_ts, mod_record_ts])
+ :- Exchange(distribution=[hash[product_id]])
+ : +- WatermarkAssigner(rowtime=[order_ts], watermark=[(order_ts -
5000:INTERVAL SECOND)])
+ : +- TableSourceScan(table=[[default_catalog,
default_database, orders]], fields=[product_id, amount, order_ts])
+ +- Exchange(distribution=[hash[product_id]])
+ +- WatermarkAssigner(rowtime=[mod_record_ts],
watermark=[(mod_record_ts - 60000:INTERVAL SECOND)])
+ +- Calc(select=[product_id, record_ts,
TO_TIMESTAMP(record_ts) AS mod_record_ts])
+ +- TableSourceScan(table=[[default_catalog,
default_database, products]], fields=[product_id, record_ts])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testTemporalJoinWithWatermarksWithMaterializedTimeArg">
+ <Resource name="sql">
+ <![CDATA[
+SELECT count(o.order_ts) OVER (PARTITION BY o.product_id) AS total_order_ts
+FROM orders AS o
+LEFT JOIN products FOR SYSTEM_TIME AS OF o.order_ts AS p
+ON o.product_id = p.product_id
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(total_order_ts=[COUNT($2) OVER (PARTITION BY $0)])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0,
2}])
+ :- LogicalWatermarkAssigner(rowtime=[order_ts], watermark=[-($2,
5000:INTERVAL SECOND)])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+ +- LogicalFilter(condition=[=($cor0.product_id, $0)])
+ +- LogicalSnapshot(period=[$cor0.order_ts])
+ +- LogicalWatermarkAssigner(rowtime=[mod_record_ts], watermark=[-($2,
60000:INTERVAL SECOND)])
+ +- LogicalProject(product_id=[$0], record_ts=[$1],
mod_record_ts=[TO_TIMESTAMP($1)])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
products]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[w0$o0 AS $0])
++- OverAggregate(partitionBy=[product_id], orderBy=[], window=[ RANGE BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[product_id, amount,
order_ts, product_id0, record_ts, mod_record_ts, COUNT(order_ts) AS w0$o0])
+ +- Exchange(distribution=[hash[product_id]])
+ +- Calc(select=[product_id, amount, order_ts, product_id0, record_ts,
CAST(mod_record_ts AS TIMESTAMP(3)) AS mod_record_ts])
+ +- TemporalJoin(joinType=[LeftOuterJoin], where=[((product_id =
product_id0) AND __TEMPORAL_JOIN_CONDITION(order_ts, mod_record_ts,
__TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(product_id0),
__TEMPORAL_JOIN_LEFT_KEY(product_id),
__TEMPORAL_JOIN_RIGHT_KEY(product_id0)))], select=[product_id, amount,
order_ts, product_id0, record_ts, mod_record_ts])
+ :- Exchange(distribution=[hash[product_id]])
+ : +- WatermarkAssigner(rowtime=[order_ts], watermark=[(order_ts -
5000:INTERVAL SECOND)])
+ : +- TableSourceScan(table=[[default_catalog,
default_database, orders]], fields=[product_id, amount, order_ts])
+ +- Exchange(distribution=[hash[product_id]])
+ +- WatermarkAssigner(rowtime=[mod_record_ts],
watermark=[(mod_record_ts - 60000:INTERVAL SECOND)])
+ +- Calc(select=[product_id, record_ts,
TO_TIMESTAMP(record_ts) AS mod_record_ts])
+ +- TableSourceScan(table=[[default_catalog,
default_database, products]], fields=[product_id, record_ts])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.scala
index 97a5dbaf1b8..99f342ae665 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.scala
@@ -510,4 +510,141 @@ class OverAggregateTest extends TableTestBase {
util.verifyExecPlan(sql)
}
+
+ def testTemporalJoinWithWatermarks(): Unit = {
+ util.addTable(s"""
+ |CREATE TABLE orders (
+ | product_id STRING,
+ | amount BIGINT,
+ | order_ts TIMESTAMP(3),
+ | WATERMARK FOR order_ts AS order_ts - INTERVAL '5'
SECONDS
+ |) WITH (
+ | 'connector' = 'values'
+ |)
+ |""".stripMargin)
+
+ util.addTable(s"""
+ |CREATE TABLE products (
+ | product_id STRING,
+ | record_ts STRING,
+ | mod_record_ts AS TO_TIMESTAMP(record_ts),
+ | PRIMARY KEY (product_id) NOT ENFORCED,
+ | WATERMARK FOR mod_record_ts AS mod_record_ts -
INTERVAL '60' SECONDS
+ |) WITH (
+ | 'connector' = 'values'
+ |)
+ |""".stripMargin)
+
+ util.verifyExecPlan(s"""
+ |SELECT count(o.amount) OVER (PARTITION BY
o.product_id) AS amount_count
+ |FROM orders AS o
+ |LEFT JOIN products FOR SYSTEM_TIME AS OF
o.order_ts AS p
+ |ON o.product_id = p.product_id
+ |""".stripMargin)
+ }
+
+ @Test
+ def testTemporalJoinWithWatermarksSeveralFunctions(): Unit = {
+ util.addTable(s"""
+ |CREATE TABLE orders (
+ | product_id STRING,
+ | amount BIGINT,
+ | order_ts TIMESTAMP(3),
+ | WATERMARK FOR order_ts AS order_ts - INTERVAL '5'
SECONDS
+ |) WITH (
+ | 'connector' = 'values'
+ |)
+ |""".stripMargin)
+
+ util.addTable(s"""
+ |CREATE TABLE products (
+ | product_id STRING,
+ | record_ts STRING,
+ | mod_record_ts AS TO_TIMESTAMP(record_ts),
+ | PRIMARY KEY (product_id) NOT ENFORCED,
+ | WATERMARK FOR mod_record_ts AS mod_record_ts -
INTERVAL '60' SECONDS
+ |) WITH (
+ | 'connector' = 'values'
+ |)
+ |""".stripMargin)
+
+ util.verifyExecPlan(
+ s"""
+ |SELECT last_value(o.amount) OVER (PARTITION BY o.product_id ORDER BY
o.order_ts) AS last_amount,
+ | lag(o.amount) OVER (PARTITION BY o.product_id ORDER BY
o.order_ts) AS prev_amount
+ |FROM orders AS o
+ |LEFT JOIN products FOR SYSTEM_TIME AS OF o.order_ts AS p
+ |ON o.product_id = p.product_id
+ |""".stripMargin)
+ }
+
+ @Test
+ def testTemporalJoinWithWatermarksWithMaterializedTimeArg(): Unit = {
+ util.addTable(s"""
+ |CREATE TABLE orders (
+ | product_id STRING,
+ | amount BIGINT,
+ | order_ts TIMESTAMP(3),
+ | WATERMARK FOR order_ts AS order_ts - INTERVAL '5'
SECONDS
+ |) WITH (
+ | 'connector' = 'values'
+ |)
+ |""".stripMargin)
+
+ util.addTable(s"""
+ |CREATE TABLE products (
+ | product_id STRING,
+ | record_ts STRING,
+ | mod_record_ts AS TO_TIMESTAMP(record_ts),
+ | PRIMARY KEY (product_id) NOT ENFORCED,
+ | WATERMARK FOR mod_record_ts AS mod_record_ts -
INTERVAL '60' SECONDS
+ |) WITH (
+ | 'connector' = 'values'
+ |)
+ |""".stripMargin)
+
+ util.verifyExecPlan(
+ s"""
+ |SELECT count(o.order_ts) OVER (PARTITION BY o.product_id) AS
total_order_ts
+ |FROM orders AS o
+ |LEFT JOIN products FOR SYSTEM_TIME AS OF o.order_ts AS p
+ |ON o.product_id = p.product_id
+ |""".stripMargin)
+ }
+
+ @Test
+ def testTemporalJoinWithWatermarksMix(): Unit = {
+ util.addTable(s"""
+ |CREATE TABLE orders (
+ | product_id STRING,
+ | amount BIGINT,
+ | order_ts TIMESTAMP(3),
+ | WATERMARK FOR order_ts AS order_ts - INTERVAL '5'
SECONDS
+ |) WITH (
+ | 'connector' = 'values'
+ |)
+ |""".stripMargin)
+
+ util.addTable(s"""
+ |CREATE TABLE products (
+ | product_id STRING,
+ | record_ts STRING,
+ | mod_record_ts AS TO_TIMESTAMP(record_ts),
+ | PRIMARY KEY (product_id) NOT ENFORCED,
+ | WATERMARK FOR mod_record_ts AS mod_record_ts -
INTERVAL '60' SECONDS
+ |) WITH (
+ | 'connector' = 'values'
+ |)
+ |""".stripMargin)
+
+ util.verifyExecPlan(
+ s"""
+ |SELECT first_value(o.order_ts) OVER (PARTITION BY o.product_id) AS
first_order_ts,
+ | min(o.order_ts) OVER (PARTITION BY o.product_id) AS
min_order_ts,
+ | max(o.amount) OVER (PARTITION BY o.product_id) AS max_amount
+ |FROM orders AS o
+ |LEFT JOIN products FOR SYSTEM_TIME AS OF o.order_ts AS p
+ |ON o.product_id = p.product_id
+ |""".stripMargin)
+ }
}