This is an automated email from the ASF dual-hosted git repository.

jchan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6be30b16799 [FLINK-34323][table-planner] Fix named params in session 
window tvf
6be30b16799 is described below

commit 6be30b167990c22765c244a703ab0424e7c3b4d9
Author: Xuyang <[email protected]>
AuthorDate: Mon Feb 5 10:06:03 2024 +0800

    [FLINK-34323][table-planner] Fix named params in session window tvf
    
    This close #24243
---
 .../functions/sql/SqlSessionTableFunction.java     |   2 +-
 .../functions/sql/SqlWindowTableFunction.java      |   3 +
 .../plan/stream/sql/WindowTableFunctionTest.xml    | 154 ++++++++++++++++++---
 .../plan/stream/sql/WindowTableFunctionTest.scala  |  65 +++++++++
 4 files changed, 204 insertions(+), 20 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlSessionTableFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlSessionTableFunction.java
index 454d1af6898..895dbeefe1a 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlSessionTableFunction.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlSessionTableFunction.java
@@ -68,7 +68,7 @@ public class SqlSessionTableFunction extends 
SqlWindowTableFunction {
     /** Operand type checker for SESSION. */
     private static class OperandMetadataImpl extends AbstractOperandMetadata {
         OperandMetadataImpl() {
-            super(ImmutableList.of(PARAM_DATA, PARAM_TIMECOL, PARAM_SIZE), 3);
+            super(ImmutableList.of(PARAM_DATA, PARAM_TIMECOL, GAP), 3);
         }
 
         @Override
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlWindowTableFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlWindowTableFunction.java
index 0e7879a6dcc..3f22bed1907 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlWindowTableFunction.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlWindowTableFunction.java
@@ -63,6 +63,9 @@ public class SqlWindowTableFunction extends 
org.apache.calcite.sql.SqlWindowTabl
     /** The slide interval, only used for HOP window. */
     protected static final String PARAM_STEP = "STEP";
 
+    /** The gap interval, only used for SESSION window. */
+    protected static final String GAP = "GAP";
+
     /**
      * Type-inference strategy whereby the row type of a table function call 
is a ROW, which is
      * combined from the row type of operand #0 (which is a TABLE) and two 
additional fields. The
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.xml
index fabcb77e269..bf45b5f6de5 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.xml
@@ -16,23 +16,18 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 -->
 <Root>
-  <TestCase name="testCumulateTVFWithNegativeOffset">
+  <TestCase name="testCumulateTVF">
     <Resource name="sql">
       <![CDATA[
 SELECT *
 FROM TABLE(
-  CUMULATE(
-    TABLE MyTable,
-    DESCRIPTOR(rowtime),
-    INTERVAL '1' MINUTE,
-    INTERVAL '15' MINUTE,
-    INTERVAL '-5' MINUTE))
+ CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL 
'1' HOUR))
 ]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5], 
window_start=[$6], window_end=[$7], window_time=[$8])
-+- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($4), 
60000:INTERVAL MINUTE, 900000:INTERVAL MINUTE, -300000:INTERVAL MINUTE)], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) 
d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, 
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* 
window_time)])
++- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($4), 
600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, 
BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* 
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, 
TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
    +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], 
proctime=[$5])
       +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 
1000:INTERVAL SECOND)])
          +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], 
proctime=[PROCTIME()])
@@ -42,25 +37,25 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], 
rowtime=[$4], proctime=[$5], wind
     <Resource name="optimized rel plan">
       <![CDATA[
 Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, 
window_start, window_end, window_time])
-+- WindowTableFunction(window=[CUMULATE(time_col=[rowtime], max_size=[15 min], 
step=[1 min], offset=[-5 min])])
++- WindowTableFunction(window=[CUMULATE(time_col=[rowtime], max_size=[1 h], 
step=[10 min])])
    +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL 
SECOND)])
       +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
          +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, rowtime])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testCumulateTVF">
+  <TestCase name="testCumulateTVFProctime">
     <Resource name="sql">
       <![CDATA[
 SELECT *
 FROM TABLE(
- CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL 
'1' HOUR))
+ CUMULATE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, INTERVAL 
'1' HOUR))
 ]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5], 
window_start=[$6], window_end=[$7], window_time=[$8])
-+- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($4), 
600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, 
BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* 
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, 
TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
++- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($5), 
600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, 
BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* 
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, 
TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
    +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], 
proctime=[$5])
       +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 
1000:INTERVAL SECOND)])
          +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], 
proctime=[PROCTIME()])
@@ -69,26 +64,59 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], 
rowtime=[$4], proctime=[$5], wind
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, 
window_start, window_end, window_time])
-+- WindowTableFunction(window=[CUMULATE(time_col=[rowtime], max_size=[1 h], 
step=[10 min])])
+Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, 
window_start, window_end, PROCTIME_MATERIALIZE(window_time) AS window_time])
++- WindowTableFunction(window=[CUMULATE(time_col=[proctime], max_size=[1 h], 
step=[10 min])])
    +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL 
SECOND)])
       +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
          +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, rowtime])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testCumulateTVFProctime">
+  <TestCase name="testSessionTVFWithPartitionKeys">
+    <Resource name="sql">
+      <![CDATA[
+SELECT *
+FROM TABLE(SESSION(TABLE MyTable PARTITION BY (b, a), DESCRIPTOR(rowtime), 
INTERVAL '15' MINUTE))
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5], 
window_start=[$6], window_end=[$7], window_time=[$8])
++- LogicalTableFunctionScan(invocation=[SESSION(PARTITION BY($1, $0), 
DESCRIPTOR($4), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT 
b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, 
TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], 
proctime=[$5])
+      +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 
1000:INTERVAL SECOND)])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], 
proctime=[PROCTIME()])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, 
window_start, window_end, window_time])
++- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[15 min], 
partition keys=[b, a])])
+   +- Exchange(distribution=[hash[b, a]])
+      +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+         +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+            +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCumulateTVFWithNegativeOffset">
     <Resource name="sql">
       <![CDATA[
 SELECT *
 FROM TABLE(
- CUMULATE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, INTERVAL 
'1' HOUR))
+  CUMULATE(
+    TABLE MyTable,
+    DESCRIPTOR(rowtime),
+    INTERVAL '1' MINUTE,
+    INTERVAL '15' MINUTE,
+    INTERVAL '-5' MINUTE))
 ]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5], 
window_start=[$6], window_end=[$7], window_time=[$8])
-+- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($5), 
600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, 
BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* 
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, 
TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
++- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($4), 
60000:INTERVAL MINUTE, 900000:INTERVAL MINUTE, -300000:INTERVAL MINUTE)], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) 
d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, 
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* 
window_time)])
    +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], 
proctime=[$5])
       +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 
1000:INTERVAL SECOND)])
          +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], 
proctime=[PROCTIME()])
@@ -97,8 +125,8 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], 
proctime=[$5], wind
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, 
window_start, window_end, PROCTIME_MATERIALIZE(window_time) AS window_time])
-+- WindowTableFunction(window=[CUMULATE(time_col=[proctime], max_size=[1 h], 
step=[10 min])])
+Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, 
window_start, window_end, window_time])
++- WindowTableFunction(window=[CUMULATE(time_col=[rowtime], max_size=[15 min], 
step=[1 min], offset=[-5 min])])
    +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL 
SECOND)])
       +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
          +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, rowtime])
@@ -288,6 +316,94 @@ Calc(select=[a, b, c, d, rowtime, 
PROCTIME_MATERIALIZE(proctime) AS proctime, wi
    +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL 
SECOND)])
       +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
          +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSessionTVF">
+    <Resource name="sql">
+      <![CDATA[
+SELECT *
+FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5], 
window_start=[$6], window_end=[$7], window_time=[$8])
++- LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($4), 
900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, 
VARCHAR(2147483647) c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, 
TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], 
proctime=[$5])
+      +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 
1000:INTERVAL SECOND)])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], 
proctime=[PROCTIME()])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, 
window_start, window_end, window_time])
++- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[15 min])])
+   +- Exchange(distribution=[single])
+      +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+         +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+            +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSessionTVFProctime">
+    <Resource name="sql">
+      <![CDATA[
+SELECT *
+FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '15' MINUTE))
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5], 
window_start=[$6], window_end=[$7], window_time=[$8])
++- LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($5), 
900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, 
VARCHAR(2147483647) c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, 
TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], 
proctime=[$5])
+      +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 
1000:INTERVAL SECOND)])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], 
proctime=[PROCTIME()])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, 
window_start, window_end, PROCTIME_MATERIALIZE(window_time) AS window_time])
++- WindowTableFunction(window=[SESSION(time_col=[proctime], gap=[15 min])])
+   +- Exchange(distribution=[single])
+      +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+         +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+            +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSessionTVFWithNamedParams">
+    <Resource name="sql">
+      <![CDATA[
+SELECT *
+FROM TABLE(
+     SESSION(
+         DATA => TABLE MyTable PARTITION BY (b, a),
+         TIMECOL => DESCRIPTOR(rowtime),
+         GAP => INTERVAL '15' MINUTE))
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5], 
window_start=[$6], window_end=[$7], window_time=[$8])
++- LogicalTableFunctionScan(invocation=[SESSION(PARTITION BY($1, $0), 
DESCRIPTOR($4), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT 
b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, 
TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], 
proctime=[$5])
+      +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 
1000:INTERVAL SECOND)])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], 
proctime=[PROCTIME()])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, 
window_start, window_end, window_time])
++- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[15 min], 
partition keys=[b, a])])
+   +- Exchange(distribution=[hash[b, a]])
+      +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+         +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+            +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, rowtime])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala
index ff64550e35d..a364882fb93 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala
@@ -257,4 +257,69 @@ class WindowTableFunctionTest extends TableTestBase {
     util.verifyRelPlan(sql)
   }
 
+  @Test
+  def testSessionTVF(): Unit = {
+    val sql =
+      """
+        |SELECT *
+        |FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' 
MINUTE))
+        |""".stripMargin
+    util.verifyRelPlan(sql)
+  }
+
+  @Test
+  def testSessionTVFProctime(): Unit = {
+    val sql =
+      """
+        |SELECT *
+        |FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '15' 
MINUTE))
+        |""".stripMargin
+    util.verifyRelPlan(sql)
+  }
+
+  @Test
+  def testSessionTVFWithPartitionKeys(): Unit = {
+    val sql =
+      """
+        |SELECT *
+        |FROM TABLE(SESSION(TABLE MyTable PARTITION BY (b, a), 
DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
+        |""".stripMargin
+    util.verifyRelPlan(sql)
+  }
+
+  @Test
+  def testSessionTVFWithNamedParams(): Unit = {
+    val sql =
+      """
+        |SELECT *
+        |FROM TABLE(
+        |     SESSION(
+        |         DATA => TABLE MyTable PARTITION BY (b, a),
+        |         TIMECOL => DESCRIPTOR(rowtime),
+        |         GAP => INTERVAL '15' MINUTE))
+        |""".stripMargin
+    util.verifyRelPlan(sql)
+  }
+
+  @Test
+  def testWindowTVFWithNamedParamsOrderChange(): Unit = {
+    // the DATA param must be the first in FLIP-145
+    // change the order about GAP and TIMECOL
+    // TODO fix it in FLINK-34338
+    val sql =
+      """
+        |SELECT *
+        |FROM TABLE(
+        |     SESSION(
+        |         DATA => TABLE MyTable PARTITION BY (b, a),
+        |         GAP => INTERVAL '15' MINUTE,
+        |         TIMECOL => DESCRIPTOR(rowtime)))
+        |""".stripMargin
+
+    assertThatThrownBy(() => util.verifyRelPlan(sql))
+      .hasMessage("fieldList must not be null, type = INTERVAL MINUTE")
+      .isInstanceOf[AssertionError]
+
+  }
+
 }

Reply via email to