This is an automated email from the ASF dual-hosted git repository.
jchan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 6be30b16799 [FLINK-34323][table-planner] Fix named params in session
window tvf
6be30b16799 is described below
commit 6be30b167990c22765c244a703ab0424e7c3b4d9
Author: Xuyang <[email protected]>
AuthorDate: Mon Feb 5 10:06:03 2024 +0800
[FLINK-34323][table-planner] Fix named params in session window tvf
This close #24243
---
.../functions/sql/SqlSessionTableFunction.java | 2 +-
.../functions/sql/SqlWindowTableFunction.java | 3 +
.../plan/stream/sql/WindowTableFunctionTest.xml | 154 ++++++++++++++++++---
.../plan/stream/sql/WindowTableFunctionTest.scala | 65 +++++++++
4 files changed, 204 insertions(+), 20 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlSessionTableFunction.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlSessionTableFunction.java
index 454d1af6898..895dbeefe1a 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlSessionTableFunction.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlSessionTableFunction.java
@@ -68,7 +68,7 @@ public class SqlSessionTableFunction extends
SqlWindowTableFunction {
/** Operand type checker for SESSION. */
private static class OperandMetadataImpl extends AbstractOperandMetadata {
OperandMetadataImpl() {
- super(ImmutableList.of(PARAM_DATA, PARAM_TIMECOL, PARAM_SIZE), 3);
+ super(ImmutableList.of(PARAM_DATA, PARAM_TIMECOL, GAP), 3);
}
@Override
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlWindowTableFunction.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlWindowTableFunction.java
index 0e7879a6dcc..3f22bed1907 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlWindowTableFunction.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlWindowTableFunction.java
@@ -63,6 +63,9 @@ public class SqlWindowTableFunction extends
org.apache.calcite.sql.SqlWindowTabl
/** The slide interval, only used for HOP window. */
protected static final String PARAM_STEP = "STEP";
+ /** The gap interval, only used for SESSION window. */
+ protected static final String GAP = "GAP";
+
/**
* Type-inference strategy whereby the row type of a table function call
is a ROW, which is
* combined from the row type of operand #0 (which is a TABLE) and two
additional fields. The
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.xml
index fabcb77e269..bf45b5f6de5 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.xml
@@ -16,23 +16,18 @@ See the License for the specific language governing
permissions and
limitations under the License.
-->
<Root>
- <TestCase name="testCumulateTVFWithNegativeOffset">
+ <TestCase name="testCumulateTVF">
<Resource name="sql">
<![CDATA[
SELECT *
FROM TABLE(
- CUMULATE(
- TABLE MyTable,
- DESCRIPTOR(rowtime),
- INTERVAL '1' MINUTE,
- INTERVAL '15' MINUTE,
- INTERVAL '-5' MINUTE))
+ CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL
'1' HOUR))
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5],
window_start=[$6], window_end=[$7], window_time=[$8])
-+- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($4),
60000:INTERVAL MINUTE, 900000:INTERVAL MINUTE, -300000:INTERVAL MINUTE)],
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3)
d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime,
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME*
window_time)])
++- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($4),
600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a,
BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME*
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start,
TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4],
proctime=[$5])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4,
1000:INTERVAL SECOND)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4],
proctime=[PROCTIME()])
@@ -42,25 +37,25 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3],
rowtime=[$4], proctime=[$5], wind
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime,
window_start, window_end, window_time])
-+- WindowTableFunction(window=[CUMULATE(time_col=[rowtime], max_size=[15 min],
step=[1 min], offset=[-5 min])])
++- WindowTableFunction(window=[CUMULATE(time_col=[rowtime], max_size=[1 h],
step=[10 min])])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL
SECOND)])
+- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, rowtime])
]]>
</Resource>
</TestCase>
- <TestCase name="testCumulateTVF">
+ <TestCase name="testCumulateTVFProctime">
<Resource name="sql">
<![CDATA[
SELECT *
FROM TABLE(
- CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL
'1' HOUR))
+ CUMULATE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, INTERVAL
'1' HOUR))
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5],
window_start=[$6], window_end=[$7], window_time=[$8])
-+- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($4),
600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a,
BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME*
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start,
TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
++- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($5),
600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a,
BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME*
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start,
TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4],
proctime=[$5])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4,
1000:INTERVAL SECOND)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4],
proctime=[PROCTIME()])
@@ -69,26 +64,59 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3],
rowtime=[$4], proctime=[$5], wind
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime,
window_start, window_end, window_time])
-+- WindowTableFunction(window=[CUMULATE(time_col=[rowtime], max_size=[1 h],
step=[10 min])])
+Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime,
window_start, window_end, PROCTIME_MATERIALIZE(window_time) AS window_time])
++- WindowTableFunction(window=[CUMULATE(time_col=[proctime], max_size=[1 h],
step=[10 min])])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL
SECOND)])
+- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, rowtime])
]]>
</Resource>
</TestCase>
- <TestCase name="testCumulateTVFProctime">
+ <TestCase name="testSessionTVFWithPartitionKeys">
+ <Resource name="sql">
+ <![CDATA[
+SELECT *
+FROM TABLE(SESSION(TABLE MyTable PARTITION BY (b, a), DESCRIPTOR(rowtime),
INTERVAL '15' MINUTE))
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5],
window_start=[$6], window_end=[$7], window_time=[$8])
++- LogicalTableFunctionScan(invocation=[SESSION(PARTITION BY($1, $0),
DESCRIPTOR($4), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT
b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime,
TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3)
window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4],
proctime=[$5])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4,
1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4],
proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime,
window_start, window_end, window_time])
++- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[15 min],
partition keys=[b, a])])
+ +- Exchange(distribution=[hash[b, a]])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
1000:INTERVAL SECOND)])
+ +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+ +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCumulateTVFWithNegativeOffset">
<Resource name="sql">
<![CDATA[
SELECT *
FROM TABLE(
- CUMULATE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, INTERVAL
'1' HOUR))
+ CUMULATE(
+ TABLE MyTable,
+ DESCRIPTOR(rowtime),
+ INTERVAL '1' MINUTE,
+ INTERVAL '15' MINUTE,
+ INTERVAL '-5' MINUTE))
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5],
window_start=[$6], window_end=[$7], window_time=[$8])
-+- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($5),
600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a,
BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME*
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start,
TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
++- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($4),
60000:INTERVAL MINUTE, 900000:INTERVAL MINUTE, -300000:INTERVAL MINUTE)],
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3)
d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime,
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME*
window_time)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4],
proctime=[$5])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4,
1000:INTERVAL SECOND)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4],
proctime=[PROCTIME()])
@@ -97,8 +125,8 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4],
proctime=[$5], wind
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime,
window_start, window_end, PROCTIME_MATERIALIZE(window_time) AS window_time])
-+- WindowTableFunction(window=[CUMULATE(time_col=[proctime], max_size=[1 h],
step=[10 min])])
+Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime,
window_start, window_end, window_time])
++- WindowTableFunction(window=[CUMULATE(time_col=[rowtime], max_size=[15 min],
step=[1 min], offset=[-5 min])])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL
SECOND)])
+- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, rowtime])
@@ -288,6 +316,94 @@ Calc(select=[a, b, c, d, rowtime,
PROCTIME_MATERIALIZE(proctime) AS proctime, wi
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL
SECOND)])
+- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSessionTVF">
+ <Resource name="sql">
+ <![CDATA[
+SELECT *
+FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5],
window_start=[$6], window_end=[$7], window_time=[$8])
++- LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($4),
900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b,
VARCHAR(2147483647) c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime,
TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3)
window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4],
proctime=[$5])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4,
1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4],
proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime,
window_start, window_end, window_time])
++- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[15 min])])
+ +- Exchange(distribution=[single])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
1000:INTERVAL SECOND)])
+ +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+ +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSessionTVFProctime">
+ <Resource name="sql">
+ <![CDATA[
+SELECT *
+FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '15' MINUTE))
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5],
window_start=[$6], window_end=[$7], window_time=[$8])
++- LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($5),
900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b,
VARCHAR(2147483647) c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime,
TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3)
window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4],
proctime=[$5])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4,
1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4],
proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime,
window_start, window_end, PROCTIME_MATERIALIZE(window_time) AS window_time])
++- WindowTableFunction(window=[SESSION(time_col=[proctime], gap=[15 min])])
+ +- Exchange(distribution=[single])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
1000:INTERVAL SECOND)])
+ +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+ +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSessionTVFWithNamedParams">
+ <Resource name="sql">
+ <![CDATA[
+SELECT *
+FROM TABLE(
+ SESSION(
+ DATA => TABLE MyTable PARTITION BY (b, a),
+ TIMECOL => DESCRIPTOR(rowtime),
+ GAP => INTERVAL '15' MINUTE))
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5],
window_start=[$6], window_end=[$7], window_time=[$8])
++- LogicalTableFunctionScan(invocation=[SESSION(PARTITION BY($1, $0),
DESCRIPTOR($4), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT
b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime,
TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3)
window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4],
proctime=[$5])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4,
1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4],
proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime,
window_start, window_end, window_time])
++- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[15 min],
partition keys=[b, a])])
+ +- Exchange(distribution=[hash[b, a]])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
1000:INTERVAL SECOND)])
+ +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+ +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, rowtime])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala
index ff64550e35d..a364882fb93 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala
@@ -257,4 +257,69 @@ class WindowTableFunctionTest extends TableTestBase {
util.verifyRelPlan(sql)
}
+ @Test
+ def testSessionTVF(): Unit = {
+ val sql =
+ """
+ |SELECT *
+ |FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15'
MINUTE))
+ |""".stripMargin
+ util.verifyRelPlan(sql)
+ }
+
+ @Test
+ def testSessionTVFProctime(): Unit = {
+ val sql =
+ """
+ |SELECT *
+ |FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '15'
MINUTE))
+ |""".stripMargin
+ util.verifyRelPlan(sql)
+ }
+
+ @Test
+ def testSessionTVFWithPartitionKeys(): Unit = {
+ val sql =
+ """
+ |SELECT *
+ |FROM TABLE(SESSION(TABLE MyTable PARTITION BY (b, a),
DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
+ |""".stripMargin
+ util.verifyRelPlan(sql)
+ }
+
+ @Test
+ def testSessionTVFWithNamedParams(): Unit = {
+ val sql =
+ """
+ |SELECT *
+ |FROM TABLE(
+ | SESSION(
+ | DATA => TABLE MyTable PARTITION BY (b, a),
+ | TIMECOL => DESCRIPTOR(rowtime),
+ | GAP => INTERVAL '15' MINUTE))
+ |""".stripMargin
+ util.verifyRelPlan(sql)
+ }
+
+ @Test
+ def testWindowTVFWithNamedParamsOrderChange(): Unit = {
+ // the DATA param must be the first in FLIP-145
+ // change the order about GAP and TIMECOL
+ // TODO fix it in FLINK-34338
+ val sql =
+ """
+ |SELECT *
+ |FROM TABLE(
+ | SESSION(
+ | DATA => TABLE MyTable PARTITION BY (b, a),
+ | GAP => INTERVAL '15' MINUTE,
+ | TIMECOL => DESCRIPTOR(rowtime)))
+ |""".stripMargin
+
+ assertThatThrownBy(() => util.verifyRelPlan(sql))
+ .hasMessage("fieldList must not be null, type = INTERVAL MINUTE")
+ .isInstanceOf[AssertionError]
+
+ }
+
}