This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a5223c49e5d0a3b221b5f59238dc765be90088bc
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Sun Feb 22 11:19:11 2026 +0100

    [FLINK-38624][table] Type Mismatch Exception in 
StreamPhysicalOverAggregateRule
    
    This closes #27616.
---
 .../planner/calcite/RelTimeIndicatorConverter.java |  53 ++++++-
 .../nodes/logical/FlinkLogicalOverAggregate.scala  |  13 +-
 .../plan/stream/sql/agg/OverAggregateTest.xml      | 159 +++++++++++++++++++++
 .../plan/stream/sql/agg/OverAggregateTest.scala    | 138 ++++++++++++++++++
 4 files changed, 359 insertions(+), 4 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
index 6403708c216..168d4df4fb0 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
@@ -65,6 +65,7 @@ import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.core.SetOp;
 import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.Window;
 import org.apache.calcite.rel.logical.LogicalCalc;
 import org.apache.calcite.rel.logical.LogicalTableModify;
 import org.apache.calcite.rel.type.RelDataType;
@@ -78,6 +79,7 @@ import org.apache.calcite.rex.RexPatternFieldRef;
 import org.apache.calcite.rex.RexProgram;
 import org.apache.calcite.rex.RexProgramBuilder;
 import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.sql.SqlAggFunction;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.type.SqlTypeName;
@@ -143,10 +145,11 @@ public final class RelTimeIndicatorConverter extends 
RelHomogeneousShuttle {
                 || node instanceof FlinkLogicalDistribution
                 || node instanceof FlinkLogicalWatermarkAssigner
                 || node instanceof FlinkLogicalSort
-                || node instanceof FlinkLogicalOverAggregate
                 || node instanceof FlinkLogicalExpand
                 || node instanceof FlinkLogicalScriptTransform) {
             return visitSimpleRel(node);
+        } else if (node instanceof FlinkLogicalOverAggregate) {
+            return visitLogicalOverAggregate((FlinkLogicalOverAggregate) node);
         } else if (node instanceof FlinkLogicalWindowAggregate) {
             return visitWindowAggregate((FlinkLogicalWindowAggregate) node);
         } else if (node instanceof FlinkLogicalWindowTableAggregate) {
@@ -236,6 +239,54 @@ public final class RelTimeIndicatorConverter extends 
RelHomogeneousShuttle {
                 newInterval);
     }
 
+    private RelNode visitLogicalOverAggregate(FlinkLogicalOverAggregate 
logical) {
+        final RelNode newInput = logical.getInput().accept(this);
+        final List<RelDataType> newDataTypeList =
+                new ArrayList<>(
+                        newInput.getRowType().getFieldList().stream()
+                                .map(RelDataTypeField::getType)
+                                .collect(Collectors.toList()));
+
+        final List<Window.Group> windowGroups = new ArrayList<>();
+        for (Window.Group group : logical.groups) {
+            final List<Window.RexWinAggCall> winCalls = new ArrayList<>();
+            for (Window.RexWinAggCall call : group.aggCalls) {
+                RelDataType callType;
+                if (isTimeIndicatorType(call.getType())) {
+                    callType =
+                            timestamp(
+                                    call.getType().isNullable(),
+                                    isTimestampLtzType(call.getType()));
+                } else {
+                    callType = call.getType();
+                }
+                winCalls.add(
+                        new Window.RexWinAggCall(
+                                (SqlAggFunction) call.op,
+                                callType,
+                                call.getOperands(),
+                                call.ordinal,
+                                call.distinct,
+                                call.ignoreNulls));
+                newDataTypeList.add(callType);
+            }
+            windowGroups.add(
+                    new Window.Group(
+                            group.keys,
+                            group.isRows,
+                            group.lowerBound,
+                            group.upperBound,
+                            group.orderKeys,
+                            winCalls));
+        }
+
+        final RelDataType newType =
+                logical.getCluster()
+                        .getTypeFactory()
+                        .createStructType(newDataTypeList, 
logical.getRowType().getFieldNames());
+        return logical.copy(logical.getTraitSet(), List.of(newInput), newType, 
windowGroups);
+    }
+
     private RelNode visitCalc(FlinkLogicalCalc calc) {
         // visit children and update inputs
         RelNode newInput = calc.getInput().accept(this);
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalOverAggregate.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalOverAggregate.scala
index 899a1719094..61a35b2fb00 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalOverAggregate.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalOverAggregate.scala
@@ -52,15 +52,22 @@ class FlinkLogicalOverAggregate(
   with FlinkLogicalRel {
 
   override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
+    copy(traitSet, inputs, rowType, windowGroups)
+  }
+
+  def copy(
+      traitSet: RelTraitSet,
+      inputs: JList[RelNode],
+      rowType: RelDataType,
+      groups: JList[Window.Group]): RelNode = {
     new FlinkLogicalOverAggregate(
       cluster,
       traitSet,
       inputs.get(0),
       windowConstants,
-      getRowType,
-      windowGroups)
+      rowType,
+      groups)
   }
-
 }
 
 class FlinkLogicalOverAggregateConverter(config: Config) extends 
ConverterRule(config) {
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml
index 40f7b8bf809..dc0185da0ec 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml
@@ -523,6 +523,165 @@ Calc(select=[c, w0$o0 AS cnt1, CASE((w0$o0 > 0), w0$o1, 
null:INTEGER) AS cnt2])
    +- Exchange(distribution=[hash[c]])
       +- Calc(select=[a, c, rowtime])
          +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTemporalJoinWithWatermarks">
+    <Resource name="sql">
+      <![CDATA[
+SELECT count(o.amount) OVER (PARTITION BY o.product_id) AS amount_count
+FROM orders AS o
+LEFT JOIN products FOR SYSTEM_TIME AS OF o.order_ts AS p
+ON o.product_id = p.product_id
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(amount_count=[COUNT($1) OVER (PARTITION BY $0)])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 
2}])
+   :- LogicalWatermarkAssigner(rowtime=[order_ts], watermark=[-($2, 
5000:INTERVAL SECOND)])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+   +- LogicalFilter(condition=[=($cor0.product_id, $0)])
+      +- LogicalSnapshot(period=[$cor0.order_ts])
+         +- LogicalWatermarkAssigner(rowtime=[mod_record_ts], watermark=[-($2, 
60000:INTERVAL SECOND)])
+            +- LogicalProject(product_id=[$0], record_ts=[$1], 
mod_record_ts=[TO_TIMESTAMP($1)])
+               +- LogicalTableScan(table=[[default_catalog, default_database, 
products]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[w0$o0 AS $0])
++- OverAggregate(partitionBy=[product_id], orderBy=[], window=[ RANGE BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[product_id, amount, 
order_ts, product_id0, record_ts, mod_record_ts, COUNT(amount) AS w0$o0])
+   +- Exchange(distribution=[hash[product_id]])
+      +- Calc(select=[product_id, amount, order_ts, product_id0, record_ts, 
CAST(mod_record_ts AS TIMESTAMP(3)) AS mod_record_ts])
+         +- TemporalJoin(joinType=[LeftOuterJoin], where=[((product_id = 
product_id0) AND __TEMPORAL_JOIN_CONDITION(order_ts, mod_record_ts, 
__TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(product_id0), 
__TEMPORAL_JOIN_LEFT_KEY(product_id), 
__TEMPORAL_JOIN_RIGHT_KEY(product_id0)))], select=[product_id, amount, 
order_ts, product_id0, record_ts, mod_record_ts])
+            :- Exchange(distribution=[hash[product_id]])
+            :  +- WatermarkAssigner(rowtime=[order_ts], watermark=[(order_ts - 
5000:INTERVAL SECOND)])
+            :     +- TableSourceScan(table=[[default_catalog, 
default_database, orders]], fields=[product_id, amount, order_ts])
+            +- Exchange(distribution=[hash[product_id]])
+               +- WatermarkAssigner(rowtime=[mod_record_ts], 
watermark=[(mod_record_ts - 60000:INTERVAL SECOND)])
+                  +- Calc(select=[product_id, record_ts, 
TO_TIMESTAMP(record_ts) AS mod_record_ts])
+                     +- TableSourceScan(table=[[default_catalog, 
default_database, products]], fields=[product_id, record_ts])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTemporalJoinWithWatermarksMix">
+    <Resource name="sql">
+      <![CDATA[
+SELECT first_value(o.order_ts) OVER (PARTITION BY o.product_id) AS 
first_order_ts,
+       min(o.order_ts) OVER (PARTITION BY o.product_id) AS min_order_ts,
+       max(o.amount) OVER (PARTITION BY o.product_id) AS max_amount
+FROM orders AS o
+LEFT JOIN products FOR SYSTEM_TIME AS OF o.order_ts AS p
+ON o.product_id = p.product_id
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(first_order_ts=[FIRST_VALUE($2) OVER (PARTITION BY $0)], 
min_order_ts=[MIN($2) OVER (PARTITION BY $0)], max_amount=[MAX($1) OVER 
(PARTITION BY $0)])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 
2}])
+   :- LogicalWatermarkAssigner(rowtime=[order_ts], watermark=[-($2, 
5000:INTERVAL SECOND)])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+   +- LogicalFilter(condition=[=($cor0.product_id, $0)])
+      +- LogicalSnapshot(period=[$cor0.order_ts])
+         +- LogicalWatermarkAssigner(rowtime=[mod_record_ts], watermark=[-($2, 
60000:INTERVAL SECOND)])
+            +- LogicalProject(product_id=[$0], record_ts=[$1], 
mod_record_ts=[TO_TIMESTAMP($1)])
+               +- LogicalTableScan(table=[[default_catalog, default_database, 
products]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[w0$o0 AS $0, w0$o1 AS $1, w0$o2 AS $2])
++- OverAggregate(partitionBy=[product_id], orderBy=[], window=[ RANGE BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[product_id, amount, 
order_ts, product_id0, record_ts, mod_record_ts, FIRST_VALUE(order_ts) AS 
w0$o0, MIN(order_ts) AS w0$o1, MAX(amount) AS w0$o2])
+   +- Exchange(distribution=[hash[product_id]])
+      +- Calc(select=[product_id, amount, order_ts, product_id0, record_ts, 
CAST(mod_record_ts AS TIMESTAMP(3)) AS mod_record_ts])
+         +- TemporalJoin(joinType=[LeftOuterJoin], where=[((product_id = 
product_id0) AND __TEMPORAL_JOIN_CONDITION(order_ts, mod_record_ts, 
__TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(product_id0), 
__TEMPORAL_JOIN_LEFT_KEY(product_id), 
__TEMPORAL_JOIN_RIGHT_KEY(product_id0)))], select=[product_id, amount, 
order_ts, product_id0, record_ts, mod_record_ts])
+            :- Exchange(distribution=[hash[product_id]])
+            :  +- WatermarkAssigner(rowtime=[order_ts], watermark=[(order_ts - 
5000:INTERVAL SECOND)])
+            :     +- TableSourceScan(table=[[default_catalog, 
default_database, orders]], fields=[product_id, amount, order_ts])
+            +- Exchange(distribution=[hash[product_id]])
+               +- WatermarkAssigner(rowtime=[mod_record_ts], 
watermark=[(mod_record_ts - 60000:INTERVAL SECOND)])
+                  +- Calc(select=[product_id, record_ts, 
TO_TIMESTAMP(record_ts) AS mod_record_ts])
+                     +- TableSourceScan(table=[[default_catalog, 
default_database, products]], fields=[product_id, record_ts])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTemporalJoinWithWatermarksSeveralFunctions">
+    <Resource name="sql">
+      <![CDATA[
+SELECT last_value(o.amount) OVER (PARTITION BY o.product_id ORDER BY 
o.order_ts) AS last_amount,
+       lag(o.amount) OVER (PARTITION BY o.product_id ORDER BY o.order_ts) AS 
prev_amount
+FROM orders AS o
+LEFT JOIN products FOR SYSTEM_TIME AS OF o.order_ts AS p
+ON o.product_id = p.product_id
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(last_amount=[LAST_VALUE($1) OVER (PARTITION BY $0 ORDER BY $2 
NULLS FIRST)], prev_amount=[LAG($1) OVER (PARTITION BY $0 ORDER BY $2 NULLS 
FIRST)])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 
2}])
+   :- LogicalWatermarkAssigner(rowtime=[order_ts], watermark=[-($2, 
5000:INTERVAL SECOND)])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+   +- LogicalFilter(condition=[=($cor0.product_id, $0)])
+      +- LogicalSnapshot(period=[$cor0.order_ts])
+         +- LogicalWatermarkAssigner(rowtime=[mod_record_ts], watermark=[-($2, 
60000:INTERVAL SECOND)])
+            +- LogicalProject(product_id=[$0], record_ts=[$1], 
mod_record_ts=[TO_TIMESTAMP($1)])
+               +- LogicalTableScan(table=[[default_catalog, default_database, 
products]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[w0$o0 AS $0, w0$o1 AS $1])
++- OverAggregate(partitionBy=[product_id], orderBy=[order_ts ASC], window=[ 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[product_id, amount, 
order_ts, product_id0, record_ts, mod_record_ts, LAST_VALUE(amount) AS w0$o0, 
LAG(amount) AS w0$o1])
+   +- Exchange(distribution=[hash[product_id]])
+      +- Calc(select=[product_id, amount, order_ts, product_id0, record_ts, 
CAST(mod_record_ts AS TIMESTAMP(3)) AS mod_record_ts])
+         +- TemporalJoin(joinType=[LeftOuterJoin], where=[((product_id = 
product_id0) AND __TEMPORAL_JOIN_CONDITION(order_ts, mod_record_ts, 
__TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(product_id0), 
__TEMPORAL_JOIN_LEFT_KEY(product_id), 
__TEMPORAL_JOIN_RIGHT_KEY(product_id0)))], select=[product_id, amount, 
order_ts, product_id0, record_ts, mod_record_ts])
+            :- Exchange(distribution=[hash[product_id]])
+            :  +- WatermarkAssigner(rowtime=[order_ts], watermark=[(order_ts - 
5000:INTERVAL SECOND)])
+            :     +- TableSourceScan(table=[[default_catalog, 
default_database, orders]], fields=[product_id, amount, order_ts])
+            +- Exchange(distribution=[hash[product_id]])
+               +- WatermarkAssigner(rowtime=[mod_record_ts], 
watermark=[(mod_record_ts - 60000:INTERVAL SECOND)])
+                  +- Calc(select=[product_id, record_ts, 
TO_TIMESTAMP(record_ts) AS mod_record_ts])
+                     +- TableSourceScan(table=[[default_catalog, 
default_database, products]], fields=[product_id, record_ts])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTemporalJoinWithWatermarksWithMaterializedTimeArg">
+    <Resource name="sql">
+      <![CDATA[
+SELECT count(o.order_ts) OVER (PARTITION BY o.product_id) AS total_order_ts
+FROM orders AS o
+LEFT JOIN products FOR SYSTEM_TIME AS OF o.order_ts AS p
+ON o.product_id = p.product_id
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(total_order_ts=[COUNT($2) OVER (PARTITION BY $0)])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 
2}])
+   :- LogicalWatermarkAssigner(rowtime=[order_ts], watermark=[-($2, 
5000:INTERVAL SECOND)])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+   +- LogicalFilter(condition=[=($cor0.product_id, $0)])
+      +- LogicalSnapshot(period=[$cor0.order_ts])
+         +- LogicalWatermarkAssigner(rowtime=[mod_record_ts], watermark=[-($2, 
60000:INTERVAL SECOND)])
+            +- LogicalProject(product_id=[$0], record_ts=[$1], 
mod_record_ts=[TO_TIMESTAMP($1)])
+               +- LogicalTableScan(table=[[default_catalog, default_database, 
products]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[w0$o0 AS $0])
++- OverAggregate(partitionBy=[product_id], orderBy=[], window=[ RANGE BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[product_id, amount, 
order_ts, product_id0, record_ts, mod_record_ts, COUNT(order_ts) AS w0$o0])
+   +- Exchange(distribution=[hash[product_id]])
+      +- Calc(select=[product_id, amount, order_ts, product_id0, record_ts, 
CAST(mod_record_ts AS TIMESTAMP(3)) AS mod_record_ts])
+         +- TemporalJoin(joinType=[LeftOuterJoin], where=[((product_id = 
product_id0) AND __TEMPORAL_JOIN_CONDITION(order_ts, mod_record_ts, 
__TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(product_id0), 
__TEMPORAL_JOIN_LEFT_KEY(product_id), 
__TEMPORAL_JOIN_RIGHT_KEY(product_id0)))], select=[product_id, amount, 
order_ts, product_id0, record_ts, mod_record_ts])
+            :- Exchange(distribution=[hash[product_id]])
+            :  +- WatermarkAssigner(rowtime=[order_ts], watermark=[(order_ts - 
5000:INTERVAL SECOND)])
+            :     +- TableSourceScan(table=[[default_catalog, 
default_database, orders]], fields=[product_id, amount, order_ts])
+            +- Exchange(distribution=[hash[product_id]])
+               +- WatermarkAssigner(rowtime=[mod_record_ts], 
watermark=[(mod_record_ts - 60000:INTERVAL SECOND)])
+                  +- Calc(select=[product_id, record_ts, 
TO_TIMESTAMP(record_ts) AS mod_record_ts])
+                     +- TableSourceScan(table=[[default_catalog, 
default_database, products]], fields=[product_id, record_ts])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.scala
index 97a5dbaf1b8..b42f6617ddc 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.scala
@@ -510,4 +510,142 @@ class OverAggregateTest extends TableTestBase {
 
     util.verifyExecPlan(sql)
   }
+
+  @Test
+  def testTemporalJoinWithWatermarks(): Unit = {
+    util.addTable(s"""
+                     |CREATE TABLE orders (
+                     |  product_id STRING,
+                     |  amount BIGINT,
+                     |  order_ts TIMESTAMP(3),
+                     |  WATERMARK FOR order_ts AS order_ts - INTERVAL '5' 
SECONDS
+                     |) WITH (
+                     |  'connector' = 'values'
+                     |)
+                     |""".stripMargin)
+
+    util.addTable(s"""
+                     |CREATE TABLE products (
+                     |  product_id STRING,
+                     |  record_ts STRING,
+                     |  mod_record_ts AS TO_TIMESTAMP(record_ts),
+                     |  PRIMARY KEY (product_id) NOT ENFORCED,
+                     |  WATERMARK FOR mod_record_ts AS mod_record_ts - 
INTERVAL '60' SECONDS
+                     |) WITH (
+                     |  'connector' = 'values'
+                     |)
+                     |""".stripMargin)
+
+    util.verifyExecPlan(s"""
+                           |SELECT count(o.amount) OVER (PARTITION BY 
o.product_id) AS amount_count
+                           |FROM orders AS o
+                           |LEFT JOIN products FOR SYSTEM_TIME AS OF 
o.order_ts AS p
+                           |ON o.product_id = p.product_id
+                           |""".stripMargin)
+  }
+
+  @Test
+  def testTemporalJoinWithWatermarksSeveralFunctions(): Unit = {
+    util.addTable(s"""
+                     |CREATE TABLE orders (
+                     |  product_id STRING,
+                     |  amount BIGINT,
+                     |  order_ts TIMESTAMP(3),
+                     |  WATERMARK FOR order_ts AS order_ts - INTERVAL '5' 
SECONDS
+                     |) WITH (
+                     |  'connector' = 'values'
+                     |)
+                     |""".stripMargin)
+
+    util.addTable(s"""
+                     |CREATE TABLE products (
+                     |  product_id STRING,
+                     |  record_ts STRING,
+                     |  mod_record_ts AS TO_TIMESTAMP(record_ts),
+                     |  PRIMARY KEY (product_id) NOT ENFORCED,
+                     |  WATERMARK FOR mod_record_ts AS mod_record_ts - 
INTERVAL '60' SECONDS
+                     |) WITH (
+                     |  'connector' = 'values'
+                     |)
+                     |""".stripMargin)
+
+    util.verifyExecPlan(
+      s"""
+         |SELECT last_value(o.amount) OVER (PARTITION BY o.product_id ORDER BY 
o.order_ts) AS last_amount,
+         |       lag(o.amount) OVER (PARTITION BY o.product_id ORDER BY 
o.order_ts) AS prev_amount
+         |FROM orders AS o
+         |LEFT JOIN products FOR SYSTEM_TIME AS OF o.order_ts AS p
+         |ON o.product_id = p.product_id
+         |""".stripMargin)
+  }
+
+  @Test
+  def testTemporalJoinWithWatermarksWithMaterializedTimeArg(): Unit = {
+    util.addTable(s"""
+                     |CREATE TABLE orders (
+                     |  product_id STRING,
+                     |  amount BIGINT,
+                     |  order_ts TIMESTAMP(3),
+                     |  WATERMARK FOR order_ts AS order_ts - INTERVAL '5' 
SECONDS
+                     |) WITH (
+                     |  'connector' = 'values'
+                     |)
+                     |""".stripMargin)
+
+    util.addTable(s"""
+                     |CREATE TABLE products (
+                     |  product_id STRING,
+                     |  record_ts STRING,
+                     |  mod_record_ts AS TO_TIMESTAMP(record_ts),
+                     |  PRIMARY KEY (product_id) NOT ENFORCED,
+                     |  WATERMARK FOR mod_record_ts AS mod_record_ts - 
INTERVAL '60' SECONDS
+                     |) WITH (
+                     |  'connector' = 'values'
+                     |)
+                     |""".stripMargin)
+
+    util.verifyExecPlan(
+      s"""
+         |SELECT count(o.order_ts) OVER (PARTITION BY o.product_id) AS 
total_order_ts
+         |FROM orders AS o
+         |LEFT JOIN products FOR SYSTEM_TIME AS OF o.order_ts AS p
+         |ON o.product_id = p.product_id
+         |""".stripMargin)
+  }
+
+  @Test
+  def testTemporalJoinWithWatermarksMix(): Unit = {
+    util.addTable(s"""
+                     |CREATE TABLE orders (
+                     |  product_id STRING,
+                     |  amount BIGINT,
+                     |  order_ts TIMESTAMP(3),
+                     |  WATERMARK FOR order_ts AS order_ts - INTERVAL '5' 
SECONDS
+                     |) WITH (
+                     |  'connector' = 'values'
+                     |)
+                     |""".stripMargin)
+
+    util.addTable(s"""
+                     |CREATE TABLE products (
+                     |  product_id STRING,
+                     |  record_ts STRING,
+                     |  mod_record_ts AS TO_TIMESTAMP(record_ts),
+                     |  PRIMARY KEY (product_id) NOT ENFORCED,
+                     |  WATERMARK FOR mod_record_ts AS mod_record_ts - 
INTERVAL '60' SECONDS
+                     |) WITH (
+                     |  'connector' = 'values'
+                     |)
+                     |""".stripMargin)
+
+    util.verifyExecPlan(
+      s"""
+         |SELECT first_value(o.order_ts) OVER (PARTITION BY o.product_id) AS 
first_order_ts,
+         |       min(o.order_ts) OVER (PARTITION BY o.product_id) AS 
min_order_ts,
+         |       max(o.amount) OVER (PARTITION BY o.product_id) AS max_amount
+         |FROM orders AS o
+         |LEFT JOIN products FOR SYSTEM_TIME AS OF o.order_ts AS p
+         |ON o.product_id = p.product_id
+         |""".stripMargin)
+  }
 }

Reply via email to