This is an automated email from the ASF dual-hosted git repository.
lincoln pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.20 by this push:
new 7f96af3ea82 [FLINK-35829][table-planner] The requireWatermark property
of StreamPhysicalWindowTableFunction needs to check the window's time type
7f96af3ea82 is described below
commit 7f96af3ea82a1a1efab57f65db9674fac5bc41bf
Author: Xuyang <[email protected]>
AuthorDate: Sat Jul 13 20:27:03 2024 +0800
[FLINK-35829][table-planner] The requireWatermark property of
StreamPhysicalWindowTableFunction needs to check the window's time type
This closes #25083
---
.../stream/StreamPhysicalWindowTableFunction.scala | 2 +-
.../plan/stream/sql/WindowTableFunctionTest.xml | 68 ++++++++++++++++++++--
.../plan/stream/sql/WindowTableFunctionTest.scala | 37 ++++++++++++
3 files changed, 100 insertions(+), 7 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWindowTableFunction.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWindowTableFunction.scala
index 633ff355418..7578db0222c 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWindowTableFunction.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWindowTableFunction.scala
@@ -40,7 +40,7 @@ class StreamPhysicalWindowTableFunction(
extends CommonPhysicalWindowTableFunction(cluster, traitSet, inputRel,
outputRowType, windowing)
with StreamPhysicalRel {
- override def requireWatermark: Boolean = true
+ override def requireWatermark: Boolean = windowing.isRowtime
override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]):
RelNode = {
new StreamPhysicalWindowTableFunction(
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.xml
index bf45b5f6de5..d793d021e85 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.xml
@@ -316,6 +316,34 @@ Calc(select=[a, b, c, d, rowtime,
PROCTIME_MATERIALIZE(proctime) AS proctime, wi
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL
SECOND)])
+- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testRowtimeWindowTVFWithMiniBatch">
+ <Resource name="sql">
+ <![CDATA[
+SELECT *
+FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5],
window_start=[$6], window_end=[$7], window_time=[$8])
++- LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($4), 900000:INTERVAL
MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c,
DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME*
proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3)
*ROWTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4],
proctime=[$5])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4,
1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4],
proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime,
window_start, window_end, window_time])
++- WindowTableFunction(window=[TUMBLE(time_col=[rowtime], size=[15 min])])
+ +- MiniBatchAssigner(interval=[5000ms], mode=[RowTime])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
1000:INTERVAL SECOND)])
+ +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+ +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, rowtime])
]]>
</Resource>
</TestCase>
@@ -407,17 +435,17 @@ Calc(select=[a, b, c, d, rowtime,
PROCTIME_MATERIALIZE(proctime) AS proctime, wi
]]>
</Resource>
</TestCase>
- <TestCase name="testTumbleTVF">
+ <TestCase name="testTumbleTVFProctime">
<Resource name="sql">
<![CDATA[
SELECT *
-FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
+FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '15' MINUTE))
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5],
window_start=[$6], window_end=[$7], window_time=[$8])
-+- LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($4), 900000:INTERVAL
MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c,
DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME*
proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3)
*ROWTIME* window_time)])
++- LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($5), 900000:INTERVAL
MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c,
DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME*
proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3)
*PROCTIME* window_time)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4],
proctime=[$5])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4,
1000:INTERVAL SECOND)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4],
proctime=[PROCTIME()])
@@ -426,15 +454,15 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3],
rowtime=[$4], proctime=[$5], wind
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime,
window_start, window_end, window_time])
-+- WindowTableFunction(window=[TUMBLE(time_col=[rowtime], size=[15 min])])
+Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime,
window_start, window_end, PROCTIME_MATERIALIZE(window_time) AS window_time])
++- WindowTableFunction(window=[TUMBLE(time_col=[proctime], size=[15 min])])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL
SECOND)])
+- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, rowtime])
]]>
</Resource>
</TestCase>
- <TestCase name="testTumbleTVFProctime">
+ <TestCase name="testProctimeWindowTVFWithMiniBatch">
<Resource name="sql">
<![CDATA[
SELECT *
@@ -455,6 +483,34 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3],
rowtime=[$4], proctime=[$5], wind
<![CDATA[
Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime,
window_start, window_end, PROCTIME_MATERIALIZE(window_time) AS window_time])
+- WindowTableFunction(window=[TUMBLE(time_col=[proctime], size=[15 min])])
+ +- MiniBatchAssigner(interval=[5000ms], mode=[ProcTime])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
1000:INTERVAL SECOND)])
+ +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+ +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testTumbleTVF">
+ <Resource name="sql">
+ <![CDATA[
+SELECT *
+FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5],
window_start=[$6], window_end=[$7], window_time=[$8])
++- LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($4), 900000:INTERVAL
MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c,
DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME*
proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3)
*ROWTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4],
proctime=[$5])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4,
1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4],
proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime,
window_start, window_end, window_time])
++- WindowTableFunction(window=[TUMBLE(time_col=[rowtime], size=[15 min])])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL
SECOND)])
+- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, rowtime])
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala
index a364882fb93..e06a7410326 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala
@@ -18,11 +18,14 @@
package org.apache.flink.table.planner.plan.stream.sql
import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.planner.utils.TableTestBase
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.jupiter.api.Test
+import java.time.Duration
+
/** Tests for window table-valued function. */
class WindowTableFunctionTest extends TableTestBase {
@@ -322,4 +325,38 @@ class WindowTableFunctionTest extends TableTestBase {
}
+ @Test
+ def testProctimeWindowTVFWithMiniBatch(): Unit = {
+ enableMiniBatch()
+ val sql =
+ """
+ |SELECT *
+ |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '15'
MINUTE))
+ |""".stripMargin
+ util.verifyRelPlan(sql)
+ }
+
+ @Test
+ def testRowtimeWindowTVFWithMiniBatch(): Unit = {
+ enableMiniBatch()
+ val sql =
+ """
+ |SELECT *
+ |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15'
MINUTE))
+ |""".stripMargin
+ util.verifyRelPlan(sql)
+ }
+
+ private def enableMiniBatch(): Unit = {
+ util.tableConfig.set(
+ ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED,
+ java.lang.Boolean.TRUE)
+ util.tableConfig.set(
+ ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE,
+ java.lang.Long.valueOf(5L))
+ util.tableConfig.set(
+ ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
+ Duration.ofSeconds(5L))
+ }
+
}