This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4101316ef37 [FLINK-36417][table] Add support for hints in
WatermarkAssigner
4101316ef37 is described below
commit 4101316ef37d3a7082bff57cb732f4e9e0349d09
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Thu Oct 3 21:32:16 2024 +0200
[FLINK-36417][table] Add support for hints in WatermarkAssigner
---
.../table/planner/calcite/FlinkRelBuilder.java | 3 +-
.../flink/table/planner/hint/FlinkHints.java | 4 +-
.../logical/EventTimeTemporalJoinRewriteRule.java | 6 +-
...arkAssignerChangelogNormalizeTransposeRule.java | 1 +
.../nodes/calcite/LogicalWatermarkAssigner.scala | 16 ++++-
.../plan/nodes/calcite/WatermarkAssigner.scala | 22 +++++-
.../logical/FlinkLogicalWatermarkAssigner.scala | 27 +++++++-
.../stream/StreamPhysicalWatermarkAssigner.scala | 19 +++++-
.../StreamPhysicalWatermarkAssignerRule.scala | 3 +
.../plan/hints/stream/StateTtlHintTest.java | 45 +++++++++++++
.../planner/plan/hints/stream/StateTtlHintTest.xml | 78 ++++++++++++++++++++++
.../ProjectWatermarkAssignerTransposeRuleTest.xml | 4 +-
.../flink/table/planner/utils/TableTestBase.scala | 1 +
13 files changed, 213 insertions(+), 16 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java
index 71129e26d65..60a0c897eee 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java
@@ -253,7 +253,8 @@ public final class FlinkRelBuilder extends RelBuilder {
public RelBuilder watermark(int rowtimeFieldIndex, RexNode watermarkExpr) {
final RelNode input = build();
final RelNode relNode =
- LogicalWatermarkAssigner.create(cluster, input,
rowtimeFieldIndex, watermarkExpr);
+ LogicalWatermarkAssigner.create(
+ cluster, input, Collections.emptyList(),
rowtimeFieldIndex, watermarkExpr);
return push(relNode);
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java
index 18f13d75472..bcbc1dca533 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.planner.hint;
import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.planner.plan.nodes.calcite.WatermarkAssigner;
import
org.apache.flink.table.planner.plan.rules.logical.WrapJsonAggFunctionArgumentsRule;
import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
@@ -119,7 +120,8 @@ public abstract class FlinkHints {
public static boolean canTransposeToTableScan(RelNode node) {
return node instanceof LogicalProject // computed column on table
|| node instanceof LogicalFilter
- || node instanceof LogicalSnapshot;
+ || node instanceof LogicalSnapshot
+ || node instanceof WatermarkAssigner;
}
/** Returns the qualified name of a table scan, otherwise returns empty. */
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/EventTimeTemporalJoinRewriteRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/EventTimeTemporalJoinRewriteRule.java
index 89feca8a2b6..64df95c979e 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/EventTimeTemporalJoinRewriteRule.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/EventTimeTemporalJoinRewriteRule.java
@@ -140,7 +140,11 @@ public class EventTimeTemporalJoinRewriteRule
final RelNode newChild = transmitSnapshotRequirement(child);
if (newChild != child) {
return wma.copy(
- wma.getTraitSet(), newChild, wma.rowtimeFieldIndex(),
wma.watermarkExpr());
+ wma.getTraitSet(),
+ newChild,
+ wma.getHints(),
+ wma.rowtimeFieldIndex(),
+ wma.watermarkExpr());
}
return wma;
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java
index 15d8f485994..51963d79eb3 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java
@@ -288,6 +288,7 @@ public class
WatermarkAssignerChangelogNormalizeTransposeRule
watermark.copy(
watermark.getTraitSet().plus(FlinkRelDistribution.DEFAULT()),
exchange.getInput(),
+ Collections.emptyList(),
newRowTimeFieldIndex,
newWatermarkExpr);
final RelNode newChangelogNormalize =
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWatermarkAssigner.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWatermarkAssigner.scala
index 89a9cb7e3c4..c9d3443f526 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWatermarkAssigner.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWatermarkAssigner.scala
@@ -19,8 +19,11 @@ package org.apache.flink.table.planner.plan.nodes.calcite
import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.hint.RelHint
import org.apache.calcite.rex.RexNode
+import java.util
+
/**
* Sub-class of [[WatermarkAssigner]] that is a relational operator which
generates
* [[org.apache.flink.streaming.api.watermark.Watermark]]. This class
corresponds to Calcite logical
@@ -30,16 +33,22 @@ final class LogicalWatermarkAssigner(
cluster: RelOptCluster,
traits: RelTraitSet,
input: RelNode,
+ hints: util.List[RelHint],
rowtimeFieldIndex: Int,
watermarkExpr: RexNode)
- extends WatermarkAssigner(cluster, traits, input, rowtimeFieldIndex,
watermarkExpr) {
+ extends WatermarkAssigner(cluster, traits, input, hints, rowtimeFieldIndex,
watermarkExpr) {
override def copy(
traitSet: RelTraitSet,
input: RelNode,
+ hints: util.List[RelHint],
rowtime: Int,
watermark: RexNode): RelNode = {
- new LogicalWatermarkAssigner(cluster, traitSet, input, rowtime, watermark)
+ new LogicalWatermarkAssigner(cluster, traitSet, input, hints, rowtime,
watermark)
+ }
+
+ override def withHints(hintList: util.List[RelHint]): RelNode = {
+ new LogicalWatermarkAssigner(cluster, traits, input, hintList,
rowtimeFieldIndex, watermarkExpr)
}
}
@@ -48,9 +57,10 @@ object LogicalWatermarkAssigner {
def create(
cluster: RelOptCluster,
input: RelNode,
+ hints: util.List[RelHint],
rowtimeFieldIndex: Int,
watermarkExpr: RexNode): LogicalWatermarkAssigner = {
val traits = cluster.traitSetOf(Convention.NONE)
- new LogicalWatermarkAssigner(cluster, traits, input, rowtimeFieldIndex,
watermarkExpr)
+ new LogicalWatermarkAssigner(cluster, traits, input, hints,
rowtimeFieldIndex, watermarkExpr)
}
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WatermarkAssigner.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WatermarkAssigner.scala
index e4baf13567a..58137c195ad 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WatermarkAssigner.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WatermarkAssigner.scala
@@ -19,13 +19,16 @@ package org.apache.flink.table.planner.plan.nodes.calcite
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import com.google.common.collect.ImmutableList
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFieldImpl}
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rel.hint.{Hintable, RelHint}
import org.apache.calcite.rex.RexNode
import org.apache.calcite.sql.`type`.SqlTypeName
import java.util
+import java.util.ArrayList
import scala.collection.JavaConversions._
@@ -34,9 +37,11 @@ abstract class WatermarkAssigner(
cluster: RelOptCluster,
traits: RelTraitSet,
inputRel: RelNode,
+ val hints: util.List[RelHint],
val rowtimeFieldIndex: Int,
val watermarkExpr: RexNode)
- extends SingleRel(cluster, traits, inputRel) {
+ extends SingleRel(cluster, traits, inputRel)
+ with Hintable {
override def deriveRowType(): RelDataType = {
val inputRowType = inputRel.getRowType
@@ -68,10 +73,21 @@ abstract class WatermarkAssigner(
}
override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]):
RelNode = {
- copy(traitSet, inputs.get(0), rowtimeFieldIndex, watermarkExpr)
+ copy(traitSet, inputs.get(0), hints, rowtimeFieldIndex, watermarkExpr)
}
/** Copies a new WatermarkAssigner. */
- def copy(traitSet: RelTraitSet, input: RelNode, rowtime: Int, watermark:
RexNode): RelNode
+ def copy(
+ traitSet: RelTraitSet,
+ input: RelNode,
+ hints: util.List[RelHint],
+ rowtime: Int,
+ watermark: RexNode): RelNode
+ override def getHints: ImmutableList[RelHint] = {
+ val arrayHints = hints.toArray(new Array[RelHint](0))
+ ImmutableList.copyOf(arrayHints)
+ }
+
+ def withHints(hintList: util.List[RelHint]): RelNode
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWatermarkAssigner.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWatermarkAssigner.scala
index 795304a9f38..7468c83142f 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWatermarkAssigner.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWatermarkAssigner.scala
@@ -24,8 +24,12 @@ import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.convert.ConverterRule.Config
+import org.apache.calcite.rel.hint.RelHint
import org.apache.calcite.rex.RexNode
+import java.util
+import java.util.Collections
+
/**
* Sub-class of [[WatermarkAssigner]] that is a relational operator which
generates
* [[org.apache.flink.streaming.api.watermark.Watermark]].
@@ -34,20 +38,31 @@ class FlinkLogicalWatermarkAssigner(
cluster: RelOptCluster,
traits: RelTraitSet,
input: RelNode,
+ hints: util.List[RelHint],
rowtimeFieldIndex: Int,
watermarkExpr: RexNode)
- extends WatermarkAssigner(cluster, traits, input, rowtimeFieldIndex,
watermarkExpr)
+ extends WatermarkAssigner(cluster, traits, input, hints, rowtimeFieldIndex,
watermarkExpr)
with FlinkLogicalRel {
/** Copies a new WatermarkAssigner. */
override def copy(
traitSet: RelTraitSet,
input: RelNode,
+ hints: util.List[RelHint],
rowtime: Int,
watermark: RexNode): RelNode = {
- new FlinkLogicalWatermarkAssigner(cluster, traitSet, input, rowtime,
watermark)
+ new FlinkLogicalWatermarkAssigner(cluster, traitSet, input, hints,
rowtime, watermark)
}
+ override def withHints(hintList: util.List[RelHint]): RelNode = {
+ new FlinkLogicalWatermarkAssigner(
+ cluster,
+ traitSet,
+ input,
+ hints,
+ rowtimeFieldIndex,
+ watermarkExpr)
+ }
}
class FlinkLogicalWatermarkAssignerConverter(config: Config) extends
ConverterRule(config) {
@@ -76,6 +91,12 @@ object FlinkLogicalWatermarkAssigner {
watermarkExpr: RexNode): FlinkLogicalWatermarkAssigner = {
val cluster = input.getCluster
val traitSet =
cluster.traitSet().replace(FlinkConventions.LOGICAL).simplify()
- new FlinkLogicalWatermarkAssigner(cluster, traitSet, input,
rowtimeFieldIndex, watermarkExpr)
+ new FlinkLogicalWatermarkAssigner(
+ cluster,
+ traitSet,
+ input,
+ Collections.emptyList(),
+ rowtimeFieldIndex,
+ watermarkExpr)
}
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWatermarkAssigner.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWatermarkAssigner.scala
index 69973a8e6a3..f9c7ebb6861 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWatermarkAssigner.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWatermarkAssigner.scala
@@ -26,8 +26,11 @@ import
org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.{RelNode, RelWriter}
+import org.apache.calcite.rel.hint.RelHint
import org.apache.calcite.rex.RexNode
+import java.util
+
import scala.collection.JavaConversions._
/** Stream physical RelNode for [[WatermarkAssigner]]. */
@@ -35,9 +38,10 @@ class StreamPhysicalWatermarkAssigner(
cluster: RelOptCluster,
traits: RelTraitSet,
inputRel: RelNode,
+ hints: util.List[RelHint],
rowtimeFieldIndex: Int,
watermarkExpr: RexNode)
- extends WatermarkAssigner(cluster, traits, inputRel, rowtimeFieldIndex,
watermarkExpr)
+ extends WatermarkAssigner(cluster, traits, inputRel, hints,
rowtimeFieldIndex, watermarkExpr)
with StreamPhysicalRel {
override def requireWatermark: Boolean = false
@@ -45,9 +49,10 @@ class StreamPhysicalWatermarkAssigner(
override def copy(
traitSet: RelTraitSet,
input: RelNode,
+ hints: util.List[RelHint],
rowtime: Int,
watermark: RexNode): RelNode = {
- new StreamPhysicalWatermarkAssigner(cluster, traitSet, input, rowtime,
watermark)
+ new StreamPhysicalWatermarkAssigner(cluster, traitSet, input, hints,
rowtime, watermark)
}
/** Fully override this method to have a better display name of this
RelNode. */
@@ -75,4 +80,14 @@ class StreamPhysicalWatermarkAssigner(
FlinkTypeFactory.toLogicalRowType(getRowType),
getRelDetailedDescription)
}
+
+ override def withHints(hintList: util.List[RelHint]): RelNode = {
+ new StreamPhysicalWatermarkAssigner(
+ cluster,
+ traitSet,
+ input,
+ hints,
+ rowtimeFieldIndex,
+ watermarkExpr)
+ }
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWatermarkAssignerRule.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWatermarkAssignerRule.scala
index c6b62b0d09e..ba2e10f99d4 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWatermarkAssignerRule.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWatermarkAssignerRule.scala
@@ -26,6 +26,8 @@ import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.convert.ConverterRule.Config
+import java.util.Collections
+
/** Rule that converts [[FlinkLogicalWatermarkAssigner]] to
[[StreamPhysicalWatermarkAssigner]]. */
class StreamPhysicalWatermarkAssignerRule(config: Config) extends
ConverterRule(config) {
@@ -39,6 +41,7 @@ class StreamPhysicalWatermarkAssignerRule(config: Config)
extends ConverterRule(
watermarkAssigner.getCluster,
traitSet,
convertInput,
+ Collections.emptyList(),
watermarkAssigner.rowtimeFieldIndex,
watermarkAssigner.watermarkExpr
)
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.java
index 674fdc602ba..d23f440b51b 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.java
@@ -66,6 +66,31 @@ class StateTtlHintTest extends TableTestBase {
+ " 'connector' = 'values'\n"
+ ")");
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE tableWithWatermark1 (\n"
+ + " a INT,\n"
+ + " b BIGINT,\n"
+ + " c TIMESTAMP(3),"
+ + " WATERMARK FOR c AS c"
+ + ") WITH ("
+ + " 'connector' = 'values',\n"
+ + " 'bounded' = 'false'\n"
+ + ")");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE tableWithWatermark2 ("
+ + " a int,\n"
+ + " b BIGINT,\n"
+ + " c ROW<c1 TIMESTAMP(3)>,\n"
+ + " d AS c.c1 + INTERVAL '5' SECOND,\n"
+ + " WATERMARK FOR d as d - INTERVAL '5'
second"
+ + ") WITH ("
+ + " 'connector' = 'values',\n"
+ + " 'bounded' = 'false'\n"
+ + ")");
+
util.tableEnv().executeSql("CREATE View V4 as select a3 as a4, b3 as
b4 from T3");
util.tableEnv()
@@ -297,6 +322,26 @@ class StateTtlHintTest extends TableTestBase {
"Invalid STATE_TTL hint, expecting at least one
key-value options specified.");
}
+ @Test
+ void testWatermarkAssigner() {
+ String sql =
+ "\n"
+ + "SELECT /*+ STATE_TTL('tableWithWatermark1'='1d',
'tww2' = '3d') */ tableWithWatermark1.* FROM tableWithWatermark1\n"
+ + "LEFT JOIN(SELECT DISTINCT b FROM
tableWithWatermark2) tww2\n"
+ + "ON tableWithWatermark1.b = tww2.b WHERE tww2.b IS
NOT NULL";
+ verify(sql);
+ }
+
+ @Test
+ void testWatermarkAssignerWithAliases() {
+ String sql =
+ "\n"
+ + "SELECT /*+ STATE_TTL('tww1'='1d', 'tww2' = '3d') */
tww1.* FROM tableWithWatermark1 tww1\n"
+ + "LEFT JOIN(SELECT DISTINCT b FROM
tableWithWatermark2) tww2\n"
+ + "ON tww1.b = tww2.b WHERE tww2.b IS NOT NULL";
+ verify(sql);
+ }
+
private void verify(String sql) {
util.doVerifyPlan(
sql,
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.xml
index b395e6002b2..8b1a70096f7 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.xml
@@ -494,4 +494,82 @@ GroupAggregate(groupBy=[b1], select=[b1, SUM(a1) AS
EXPR$1])
]]>
</Resource>
</TestCase>
+ <TestCase name="testWatermarkAssigner">
+ <Resource name="sql">
+ <![CDATA[
+SELECT /*+ STATE_TTL('tableWithWatermark1'='1d', 'tww2' = '3d') */
tableWithWatermark1.* FROM tableWithWatermark1
+LEFT JOIN(SELECT DISTINCT b FROM tableWithWatermark2) tww2
+ON tableWithWatermark1.b = tww2.b WHERE tww2.b IS NOT NULL]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[IS NOT NULL($3)])
+ +- LogicalJoin(condition=[=($1, $3)], joinType=[left],
stateTtlHints=[[[STATE_TTL inheritPath:[0, 0] options:{tww2=3d,
tableWithWatermark1=1d}]]])
+ :- LogicalWatermarkAssigner(rowtime=[c], watermark=[$2], hints=[[[ALIAS
options:[tableWithWatermark1]]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
tableWithWatermark1]])
+ +- LogicalAggregate(group=[{0}], hints=[[[ALIAS options:[tww2]]]])
+ +- LogicalProject(b=[$1])
+ +- LogicalWatermarkAssigner(rowtime=[d], watermark=[-($3,
5000:INTERVAL SECOND)], hints=[[[ALIAS options:[tableWithWatermark2]]]])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[+($2.c1,
5000:INTERVAL SECOND)])
+ +- LogicalTableScan(table=[[default_catalog,
default_database, tableWithWatermark2]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[a, b, c])
++- Join(joinType=[InnerJoin], where=[=(b, b0)], select=[a, b, c, b0],
leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey],
stateTtlHints=[[[STATE_TTL options:{LEFT=1d, RIGHT=3d}]]])
+ :- Exchange(distribution=[hash[b]])
+ : +- Calc(select=[a, b, CAST(c AS TIMESTAMP(3)) AS c], where=[IS NOT
NULL(b)])
+ : +- WatermarkAssigner(rowtime=[c], watermark=[c])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
tableWithWatermark1]], fields=[a, b, c])
+ +- Exchange(distribution=[hash[b]])
+ +- GroupAggregate(groupBy=[b], select=[b])
+ +- Exchange(distribution=[hash[b]])
+ +- Calc(select=[b], where=[IS NOT NULL(b)])
+ +- WatermarkAssigner(rowtime=[d], watermark=[-(d, 5000:INTERVAL
SECOND)])
+ +- Calc(select=[b, +(c.c1, 5000:INTERVAL SECOND) AS d])
+ +- TableSourceScan(table=[[default_catalog,
default_database, tableWithWatermark2, project=[b, c], metadata=[]]],
fields=[b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testWatermarkAssignerWithAliases">
+ <Resource name="sql">
+ <![CDATA[
+SELECT /*+ STATE_TTL('tww1'='1d', 'tww2' = '3d') */ tww1.* FROM
tableWithWatermark1 tww1
+LEFT JOIN(SELECT DISTINCT b FROM tableWithWatermark2) tww2
+ON tww1.b = tww2.b WHERE tww2.b IS NOT NULL]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[IS NOT NULL($3)])
+ +- LogicalJoin(condition=[=($1, $3)], joinType=[left],
stateTtlHints=[[[STATE_TTL inheritPath:[0, 0] options:{tww2=3d, tww1=1d}]]])
+ :- LogicalWatermarkAssigner(rowtime=[c], watermark=[$2], hints=[[[ALIAS
options:[tww1]]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
tableWithWatermark1]])
+ +- LogicalAggregate(group=[{0}], hints=[[[ALIAS options:[tww2]]]])
+ +- LogicalProject(b=[$1])
+ +- LogicalWatermarkAssigner(rowtime=[d], watermark=[-($3,
5000:INTERVAL SECOND)], hints=[[[ALIAS options:[tableWithWatermark2]]]])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[+($2.c1,
5000:INTERVAL SECOND)])
+ +- LogicalTableScan(table=[[default_catalog,
default_database, tableWithWatermark2]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[a, b, c])
++- Join(joinType=[InnerJoin], where=[=(b, b0)], select=[a, b, c, b0],
leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey],
stateTtlHints=[[[STATE_TTL options:{LEFT=1d, RIGHT=3d}]]])
+ :- Exchange(distribution=[hash[b]])
+ : +- Calc(select=[a, b, CAST(c AS TIMESTAMP(3)) AS c], where=[IS NOT
NULL(b)])
+ : +- WatermarkAssigner(rowtime=[c], watermark=[c])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
tableWithWatermark1]], fields=[a, b, c])
+ +- Exchange(distribution=[hash[b]])
+ +- GroupAggregate(groupBy=[b], select=[b])
+ +- Exchange(distribution=[hash[b]])
+ +- Calc(select=[b], where=[IS NOT NULL(b)])
+ +- WatermarkAssigner(rowtime=[d], watermark=[-(d, 5000:INTERVAL
SECOND)])
+ +- Calc(select=[b, +(c.c1, 5000:INTERVAL SECOND) AS d])
+ +- TableSourceScan(table=[[default_catalog,
default_database, tableWithWatermark2, project=[b, c], metadata=[]]],
fields=[b, c])
+]]>
+ </Resource>
+ </TestCase>
</Root>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRuleTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRuleTest.xml
index 261f438fb83..a584fbbe866 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRuleTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRuleTest.xml
@@ -181,8 +181,8 @@ LogicalProject(a=[$0])
+- LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database,
UdfTable]])
]]>
- </Resource>
- </TestCase>
+ </Resource>
+ </TestCase>
<TestCase name="transposeWithIncludeComputedRowTime">
<Resource name="sql">
<![CDATA[SELECT a, b, d FROM VirtualTable]]>
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index 832ab36255a..ba57c2bca3a 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -1321,6 +1321,7 @@ case class StreamTableTestUtil(
sourceRel.getCluster,
sourceRel.getTraitSet,
sourceRel,
+ Collections.emptyList(),
rowtimeFieldIdx,
expr
)