This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.20 by this push:
     new 7f96af3ea82 [FLINK-35829][table-planner] The requireWatermark property 
of StreamPhysicalWindowTableFunction needs to check the window's time type
7f96af3ea82 is described below

commit 7f96af3ea82a1a1efab57f65db9674fac5bc41bf
Author: Xuyang <[email protected]>
AuthorDate: Sat Jul 13 20:27:03 2024 +0800

    [FLINK-35829][table-planner] The requireWatermark property of 
StreamPhysicalWindowTableFunction needs to check the window's time type
    
    This closes #25083
---
 .../stream/StreamPhysicalWindowTableFunction.scala |  2 +-
 .../plan/stream/sql/WindowTableFunctionTest.xml    | 68 ++++++++++++++++++++--
 .../plan/stream/sql/WindowTableFunctionTest.scala  | 37 ++++++++++++
 3 files changed, 100 insertions(+), 7 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWindowTableFunction.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWindowTableFunction.scala
index 633ff355418..7578db0222c 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWindowTableFunction.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWindowTableFunction.scala
@@ -40,7 +40,7 @@ class StreamPhysicalWindowTableFunction(
   extends CommonPhysicalWindowTableFunction(cluster, traitSet, inputRel, 
outputRowType, windowing)
   with StreamPhysicalRel {
 
-  override def requireWatermark: Boolean = true
+  override def requireWatermark: Boolean = windowing.isRowtime
 
   override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
     new StreamPhysicalWindowTableFunction(
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.xml
index bf45b5f6de5..d793d021e85 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.xml
@@ -316,6 +316,34 @@ Calc(select=[a, b, c, d, rowtime, 
PROCTIME_MATERIALIZE(proctime) AS proctime, wi
    +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL 
SECOND)])
       +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
          +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRowtimeWindowTVFWithMiniBatch">
+    <Resource name="sql">
+      <![CDATA[
+SELECT *
+FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5], 
window_start=[$6], window_end=[$7], window_time=[$8])
++- LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($4), 900000:INTERVAL 
MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* 
proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) 
*ROWTIME* window_time)])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], 
proctime=[$5])
+      +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 
1000:INTERVAL SECOND)])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], 
proctime=[PROCTIME()])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, 
window_start, window_end, window_time])
++- WindowTableFunction(window=[TUMBLE(time_col=[rowtime], size=[15 min])])
+   +- MiniBatchAssigner(interval=[5000ms], mode=[RowTime])
+      +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+         +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+            +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, rowtime])
 ]]>
     </Resource>
   </TestCase>
@@ -407,17 +435,17 @@ Calc(select=[a, b, c, d, rowtime, 
PROCTIME_MATERIALIZE(proctime) AS proctime, wi
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testTumbleTVF">
+  <TestCase name="testTumbleTVFProctime">
     <Resource name="sql">
       <![CDATA[
 SELECT *
-FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
+FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '15' MINUTE))
 ]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5], 
window_start=[$6], window_end=[$7], window_time=[$8])
-+- LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($4), 900000:INTERVAL 
MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* 
proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) 
*ROWTIME* window_time)])
++- LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($5), 900000:INTERVAL 
MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* 
proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) 
*PROCTIME* window_time)])
    +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], 
proctime=[$5])
       +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 
1000:INTERVAL SECOND)])
          +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], 
proctime=[PROCTIME()])
@@ -426,15 +454,15 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], 
rowtime=[$4], proctime=[$5], wind
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, 
window_start, window_end, window_time])
-+- WindowTableFunction(window=[TUMBLE(time_col=[rowtime], size=[15 min])])
+Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, 
window_start, window_end, PROCTIME_MATERIALIZE(window_time) AS window_time])
++- WindowTableFunction(window=[TUMBLE(time_col=[proctime], size=[15 min])])
    +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL 
SECOND)])
       +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
          +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, rowtime])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testTumbleTVFProctime">
+  <TestCase name="testProctimeWindowTVFWithMiniBatch">
     <Resource name="sql">
       <![CDATA[
 SELECT *
@@ -455,6 +483,34 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], 
rowtime=[$4], proctime=[$5], wind
       <![CDATA[
 Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, 
window_start, window_end, PROCTIME_MATERIALIZE(window_time) AS window_time])
 +- WindowTableFunction(window=[TUMBLE(time_col=[proctime], size=[15 min])])
+   +- MiniBatchAssigner(interval=[5000ms], mode=[ProcTime])
+      +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+         +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+            +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTumbleTVF">
+    <Resource name="sql">
+      <![CDATA[
+SELECT *
+FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5], 
window_start=[$6], window_end=[$7], window_time=[$8])
++- LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($4), 900000:INTERVAL 
MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* 
proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) 
*ROWTIME* window_time)])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], 
proctime=[$5])
+      +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 
1000:INTERVAL SECOND)])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], 
proctime=[PROCTIME()])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, 
window_start, window_end, window_time])
++- WindowTableFunction(window=[TUMBLE(time_col=[rowtime], size=[15 min])])
    +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL 
SECOND)])
       +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
          +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, rowtime])
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala
index a364882fb93..e06a7410326 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala
@@ -18,11 +18,14 @@
 package org.apache.flink.table.planner.plan.stream.sql
 
 import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.planner.utils.TableTestBase
 
 import org.assertj.core.api.Assertions.assertThatThrownBy
 import org.junit.jupiter.api.Test
 
+import java.time.Duration
+
 /** Tests for window table-valued function. */
 class WindowTableFunctionTest extends TableTestBase {
 
@@ -322,4 +325,38 @@ class WindowTableFunctionTest extends TableTestBase {
 
   }
 
+  @Test
+  def testProctimeWindowTVFWithMiniBatch(): Unit = {
+    enableMiniBatch()
+    val sql =
+      """
+        |SELECT *
+        |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '15' 
MINUTE))
+        |""".stripMargin
+    util.verifyRelPlan(sql)
+  }
+
+  @Test
+  def testRowtimeWindowTVFWithMiniBatch(): Unit = {
+    enableMiniBatch()
+    val sql =
+      """
+        |SELECT *
+        |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' 
MINUTE))
+        |""".stripMargin
+    util.verifyRelPlan(sql)
+  }
+
+  private def enableMiniBatch(): Unit = {
+    util.tableConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED,
+      java.lang.Boolean.TRUE)
+    util.tableConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE,
+      java.lang.Long.valueOf(5L))
+    util.tableConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
+      Duration.ofSeconds(5L))
+  }
+
 }

Reply via email to