This is an automated email from the ASF dual-hosted git repository.

xuyangzhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 9fe66715530 [FLINK-37720][table-planner] Enable sink reuse table 
optimizer by default and apply plan changes to existing test cases (#26503)
9fe66715530 is described below

commit 9fe66715530e1cab4658e1e974141e3e6204cde6
Author: xiangyu0xf <[email protected]>
AuthorDate: Mon May 12 19:24:28 2025 +0800

    [FLINK-37720][table-planner] Enable sink reuse table optimizer by default 
and apply plan changes to existing test cases (#26503)
---
 .../generated/optimizer_config_configuration.html  |  2 +-
 .../src/test/resources/sql_multi/statement_set.q   | 24 ++++-----
 .../src/test/resources/sql/statement_set.q         | 16 +++---
 .../table/api/config/OptimizerConfigOptions.java   |  2 +-
 .../explain/testStatementSetExecutionExplain.out   | 34 +++----------
 .../planner/plan/batch/sql/TableSourceTest.xml     | 25 ++++------
 .../physical/stream/ChangelogModeInferenceTest.xml | 51 ++++++++++---------
 .../plan/stream/sql/ProcessTableFunctionTest.xml   | 58 +++++++++-------------
 .../plan/stream/sql/agg/WindowAggregateTest.xml    | 58 ++++++++++------------
 .../flink/table/api/TableEnvironmentITCase.scala   |  5 +-
 10 files changed, 110 insertions(+), 165 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/optimizer_config_configuration.html 
b/docs/layouts/shortcodes/generated/optimizer_config_configuration.html
index e59a1289b10..ae2caa7aad8 100644
--- a/docs/layouts/shortcodes/generated/optimizer_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/optimizer_config_configuration.html
@@ -85,7 +85,7 @@ ONE_PHASE: Enforce to use one stage aggregate which only has 
CompleteGlobalAggre
         </tr>
         <tr>
             <td><h5>table.optimizer.reuse-sink-enabled</h5><br> <span 
class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span></td>
-            <td style="word-wrap: break-word;">false</td>
+            <td style="word-wrap: break-word;">true</td>
             <td>Boolean</td>
             <td>When it is true, the optimizer will try to find out duplicated 
table sinks and reuse them. This works only when 
table.optimizer.reuse-sub-plan-enabled is true.</td>
         </tr>
diff --git 
a/flink-table/flink-sql-client/src/test/resources/sql_multi/statement_set.q 
b/flink-table/flink-sql-client/src/test/resources/sql_multi/statement_set.q
index 39230d12921..afee14890fa 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql_multi/statement_set.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql_multi/statement_set.q
@@ -73,9 +73,9 @@ INSERT INTO StreamingTable SELECT * FROM (VALUES (1, 'Hello 
World'), (2, 'Hi'),
 INSERT INTO StreamingTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 
'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
 END;
 !output
-+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
-|                                                                              
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
++-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+|                                                                              
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
++-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
 | == Abstract Syntax Tree ==
 LogicalSink(table=[default_catalog.default_database.StreamingTable], 
fields=[EXPR$0, EXPR$1])
 +- LogicalProject(EXPR$0=[$0], EXPR$1=[$1])
@@ -87,21 +87,17 @@ 
LogicalSink(table=[default_catalog.default_database.StreamingTable], fields=[EXP
 
 == Optimized Physical Plan ==
 Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, 
EXPR$1])
-+- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 1, 
_UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, 
_UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, 
_UTF-16LE'LINE' }]])
-
-Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, 
EXPR$1])
-+- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 1, 
_UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, 
_UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, 
_UTF-16LE'LINE' }]])
++- Union(all=[true], union=[EXPR$0, EXPR$1])
+   :- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 
1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, 
_UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, 
_UTF-16LE'LINE' }]])
+   +- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 
1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, 
_UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, 
_UTF-16LE'LINE' }]])
 
 == Optimized Execution Plan ==
-Values(tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, 
_UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, 
_UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]])(reuse_id=[1])
-
 Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, 
EXPR$1])
-+- Reused(reference_id=[1])
-
-Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, 
EXPR$1])
-+- Reused(reference_id=[1])
++- Union(all=[true], union=[EXPR$0, EXPR$1])
+   :- Values(tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 
2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, 
_UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]])(reuse_id=[1])
+   +- Reused(reference_id=[1])
  |
-+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
++-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
 1 row in set
 !ok
 
diff --git 
a/flink-table/flink-sql-gateway/src/test/resources/sql/statement_set.q 
b/flink-table/flink-sql-gateway/src/test/resources/sql/statement_set.q
index cd9a33aca58..486fe3d43cc 100644
--- a/flink-table/flink-sql-gateway/src/test/resources/sql/statement_set.q
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/statement_set.q
@@ -105,19 +105,15 @@ 
LogicalSink(table=[default_catalog.default_database.StreamingTable], fields=[EXP
 
 == Optimized Physical Plan ==
 Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, 
EXPR$1])
-+- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 1, 
_UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, 
_UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, 
_UTF-16LE'LINE' }]])
-
-Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, 
EXPR$1])
-+- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 1, 
_UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, 
_UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, 
_UTF-16LE'LINE' }]])
++- Union(all=[true], union=[EXPR$0, EXPR$1])
+   :- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 
1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, 
_UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, 
_UTF-16LE'LINE' }]])
+   +- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 
1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, 
_UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, 
_UTF-16LE'LINE' }]])
 
 == Optimized Execution Plan ==
-Values(tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, 
_UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, 
_UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]])(reuse_id=[1])
-
 Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, 
EXPR$1])
-+- Reused(reference_id=[1])
-
-Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, 
EXPR$1])
-+- Reused(reference_id=[1])
++- Union(all=[true], union=[EXPR$0, EXPR$1])
+   :- Values(tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 
2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, 
_UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]])(reuse_id=[1])
+   +- Reused(reference_id=[1])
 !ok
 
 EXECUTE STATEMENT SET BEGIN
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
index 23d15d0f978..62d085d5cd6 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
@@ -109,7 +109,7 @@ public class OptimizerConfigOptions {
     public static final ConfigOption<Boolean> 
TABLE_OPTIMIZER_REUSE_SINK_ENABLED =
             key("table.optimizer.reuse-sink-enabled")
                     .booleanType()
-                    .defaultValue(false)
+                    .defaultValue(true)
                     .withDescription(
                             "When it is true, the optimizer will try to find 
out duplicated table sinks and "
                                     + "reuse them. This works only when "
diff --git 
a/flink-table/flink-table-planner/src/test/resources/explain/testStatementSetExecutionExplain.out
 
b/flink-table/flink-table-planner/src/test/resources/explain/testStatementSetExecutionExplain.out
index edee1984ddd..d4e6a87a3d1 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/explain/testStatementSetExecutionExplain.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/explain/testStatementSetExecutionExplain.out
@@ -9,17 +9,15 @@ LogicalSink(table=[default_catalog.default_database.MySink], 
fields=[first])
 
 == Optimized Physical Plan ==
 Sink(table=[default_catalog.default_database.MySink], fields=[last])
-+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [CsvTableSource(read fields: last)]]], fields=[last])
-
-Sink(table=[default_catalog.default_database.MySink], fields=[first])
-+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [CsvTableSource(read fields: first)]]], fields=[first])
++- Union(all=[true], union=[last])
+   :- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [CsvTableSource(read fields: last)]]], fields=[last])
+   +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [CsvTableSource(read fields: first)]]], fields=[first])
 
 == Optimized Execution Plan ==
 Sink(table=[default_catalog.default_database.MySink], fields=[last])
-+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [CsvTableSource(read fields: last)]]], fields=[last])
-
-Sink(table=[default_catalog.default_database.MySink], fields=[first])
-+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [CsvTableSource(read fields: first)]]], fields=[first])
++- Union(all=[true], union=[last])
+   :- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [CsvTableSource(read fields: last)]]], fields=[last])
+   +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [CsvTableSource(read fields: first)]]], fields=[first])
 
 == Physical Execution Plan ==
 {
@@ -51,17 +49,6 @@ Sink(table=[default_catalog.default_database.MySink], 
fields=[first])
       "ship_strategy" : "FORWARD",
       "side" : "second"
     } ]
-  }, {
-    "id" : ,
-    "type" : "StreamingFileWriter",
-    "pact" : "Operator",
-    "contents" : "StreamingFileWriter",
-    "parallelism" : 1,
-    "predecessors" : [ {
-      "id" : ,
-      "ship_strategy" : "FORWARD",
-      "side" : "second"
-    } ]
   }, {
     "id" : ,
     "type" : "Source: Custom File source",
@@ -100,14 +87,7 @@ Sink(table=[default_catalog.default_database.MySink], 
fields=[first])
       "id" : ,
       "ship_strategy" : "FORWARD",
       "side" : "second"
-    } ]
-  }, {
-    "id" : ,
-    "type" : "end: Writer",
-    "pact" : "Operator",
-    "contents" : "end: Writer",
-    "parallelism" : 1,
-    "predecessors" : [ {
+    }, {
       "id" : ,
       "ship_strategy" : "FORWARD",
       "side" : "second"
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml
index 9813db3c568..1ba58f298ae 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml
@@ -157,11 +157,10 @@ 
LogicalSink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
       <![CDATA[
 Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
 +- Union(all=[true], union=[a, b, c])
-   :- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=0}]]])
-   +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=1}]]])
-
-Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
-+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=2}]]])
+   :- Union(all=[true], union=[a, b, c])
+   :  :- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=0}]]])
+   :  +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=1}]]])
+   +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=2}]]])
 ]]>
     </Resource>
   </TestCase>
@@ -196,13 +195,10 @@ 
LogicalSink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c], hints=[[[OPTIONS 
options:{source.num-element-to-skip=1}]]])(reuse_id=[1])
-
-Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
-+- Reused(reference_id=[1])
-
 Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
-+- Reused(reference_id=[1])
++- Union(all=[true], union=[a, b, c])
+   :- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c], hints=[[[OPTIONS 
options:{source.num-element-to-skip=1}]]])(reuse_id=[1])
+   +- Reused(reference_id=[1])
 ]]>
     </Resource>
   </TestCase>
@@ -221,10 +217,9 @@ 
LogicalSink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
     <Resource name="optimized exec plan">
       <![CDATA[
 Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
-+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=1}]]])
-
-Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
-+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=2}]]])
++- Union(all=[true], union=[a, b, c])
+   :- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=1}]]])
+   +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=2}]]])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml
index 513a568b5aa..bd5a6a20be2 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml
@@ -367,32 +367,31 @@ 
LogicalSink(table=[default_catalog.default_database.sink2], fields=[number, word
     <Resource name="optimized rel plan">
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink1], fields=[number, word], 
changelogMode=[NONE])
-+- Calc(select=[number, word], where=[>(word, 'a')], changelogMode=[I,UB,UA])
-   +- Union(all=[true], union=[number, word], changelogMode=[I,UB,UA])
-      :- Calc(select=[+(number, 1) AS number, word], changelogMode=[I,UB,UA])
-      :  +- Calc(select=[number, word], changelogMode=[I,UB,UA])
-      :     +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS 
number], changelogMode=[I,UB,UA])
-      :        +- Exchange(distribution=[hash[word]], changelogMode=[I])
-      :           +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [CollectionTableSource(word, number)]]], 
fields=[word, number], changelogMode=[I])
-      +- Calc(select=[-(number, 1) AS number, word], changelogMode=[I,UB,UA])
-         +- Calc(select=[number, word], changelogMode=[I,UB,UA])
-            +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS 
number], changelogMode=[I,UB,UA])
-               +- Exchange(distribution=[hash[word]], changelogMode=[I])
-                  +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [CollectionTableSource(word, number)]]], 
fields=[word, number], changelogMode=[I])
-
-Sink(table=[default_catalog.default_database.sink1], fields=[number, word], 
changelogMode=[NONE])
-+- Calc(select=[number, word], where=[<(word, 'a')], changelogMode=[I,UB,UA])
-   +- Union(all=[true], union=[number, word], changelogMode=[I,UB,UA])
-      :- Calc(select=[+(number, 1) AS number, word], changelogMode=[I,UB,UA])
-      :  +- Calc(select=[number, word], changelogMode=[I,UB,UA])
-      :     +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS 
number], changelogMode=[I,UB,UA])
-      :        +- Exchange(distribution=[hash[word]], changelogMode=[I])
-      :           +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [CollectionTableSource(word, number)]]], 
fields=[word, number], changelogMode=[I])
-      +- Calc(select=[-(number, 1) AS number, word], changelogMode=[I,UB,UA])
-         +- Calc(select=[number, word], changelogMode=[I,UB,UA])
-            +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS 
number], changelogMode=[I,UB,UA])
-               +- Exchange(distribution=[hash[word]], changelogMode=[I])
-                  +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [CollectionTableSource(word, number)]]], 
fields=[word, number], changelogMode=[I])
++- Union(all=[true], union=[number, word], changelogMode=[I,UB,UA])
+   :- Calc(select=[number, word], where=[>(word, 'a')], 
changelogMode=[I,UB,UA])
+   :  +- Union(all=[true], union=[number, word], changelogMode=[I,UB,UA])
+   :     :- Calc(select=[+(number, 1) AS number, word], 
changelogMode=[I,UB,UA])
+   :     :  +- Calc(select=[number, word], changelogMode=[I,UB,UA])
+   :     :     +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS 
number], changelogMode=[I,UB,UA])
+   :     :        +- Exchange(distribution=[hash[word]], changelogMode=[I])
+   :     :           +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [CollectionTableSource(word, number)]]], 
fields=[word, number], changelogMode=[I])
+   :     +- Calc(select=[-(number, 1) AS number, word], 
changelogMode=[I,UB,UA])
+   :        +- Calc(select=[number, word], changelogMode=[I,UB,UA])
+   :           +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS 
number], changelogMode=[I,UB,UA])
+   :              +- Exchange(distribution=[hash[word]], changelogMode=[I])
+   :                 +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [CollectionTableSource(word, number)]]], 
fields=[word, number], changelogMode=[I])
+   +- Calc(select=[number, word], where=[<(word, 'a')], 
changelogMode=[I,UB,UA])
+      +- Union(all=[true], union=[number, word], changelogMode=[I,UB,UA])
+         :- Calc(select=[+(number, 1) AS number, word], 
changelogMode=[I,UB,UA])
+         :  +- Calc(select=[number, word], changelogMode=[I,UB,UA])
+         :     +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS 
number], changelogMode=[I,UB,UA])
+         :        +- Exchange(distribution=[hash[word]], changelogMode=[I])
+         :           +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [CollectionTableSource(word, number)]]], 
fields=[word, number], changelogMode=[I])
+         +- Calc(select=[-(number, 1) AS number, word], 
changelogMode=[I,UB,UA])
+            +- Calc(select=[number, word], changelogMode=[I,UB,UA])
+               +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS 
number], changelogMode=[I,UB,UA])
+                  +- Exchange(distribution=[hash[word]], changelogMode=[I])
+                     +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [CollectionTableSource(word, number)]]], 
fields=[word, number], changelogMode=[I])
 
 Sink(table=[default_catalog.default_database.sink2], fields=[number, word], 
changelogMode=[NONE])
 +- Calc(select=[number, word], changelogMode=[I,UB,UA])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml
index e71a198ba6e..b18b5a61bcf 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml
@@ -129,16 +129,13 @@ 
LogicalSink(table=[default_catalog.default_database.t_keyed_sink], fields=[name,
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Exchange(distribution=[hash[name]])(reuse_id=[1])
-+- Values(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42 }]])
-
-Sink(table=[default_catalog.default_database.t_keyed_sink], fields=[name, out])
-+- ProcessTableFunction(invocation=[f(TABLE(#0) PARTITION BY($0), 1, 
DEFAULT(), _UTF-16LE'a')], uid=[a], select=[name,out], 
rowType=[RecordType(VARCHAR(5) name, VARCHAR(2147483647) out)])
-   +- Reused(reference_id=[1])
-
 Sink(table=[default_catalog.default_database.t_keyed_sink], fields=[name, out])
-+- ProcessTableFunction(invocation=[f(TABLE(#0) PARTITION BY($0), 1, 
DEFAULT(), _UTF-16LE'b')], uid=[b], select=[name,out], 
rowType=[RecordType(VARCHAR(5) name, VARCHAR(2147483647) out)])
-   +- Reused(reference_id=[1])
++- Union(all=[true], union=[name, out])
+   :- ProcessTableFunction(invocation=[f(TABLE(#0) PARTITION BY($0), 1, 
DEFAULT(), _UTF-16LE'a')], uid=[a], select=[name,out], 
rowType=[RecordType(VARCHAR(5) name, VARCHAR(2147483647) out)])
+   :  +- Exchange(distribution=[hash[name]])(reuse_id=[1])
+   :     +- Values(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42 }]])
+   +- ProcessTableFunction(invocation=[f(TABLE(#0) PARTITION BY($0), 1, 
DEFAULT(), _UTF-16LE'b')], uid=[b], select=[name,out], 
rowType=[RecordType(VARCHAR(5) name, VARCHAR(2147483647) out)])
+      +- Reused(reference_id=[1])
 ]]>
     </Resource>
   </TestCase>
@@ -179,15 +176,12 @@ 
LogicalSink(table=[default_catalog.default_database.t_sink], fields=[out])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Values(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42 
}]])(reuse_id=[1])
-
-Sink(table=[default_catalog.default_database.t_sink], fields=[out])
-+- ProcessTableFunction(invocation=[f(TABLE(#0), 1, DEFAULT(), DEFAULT())], 
uid=[null], select=[out], rowType=[RecordType(VARCHAR(2147483647) out)])
-   +- Reused(reference_id=[1])
-
 Sink(table=[default_catalog.default_database.t_sink], fields=[out])
-+- ProcessTableFunction(invocation=[f(TABLE(#0), 42, DEFAULT(), DEFAULT())], 
uid=[null], select=[out], rowType=[RecordType(VARCHAR(2147483647) out)])
-   +- Reused(reference_id=[1])
++- Union(all=[true], union=[out])
+   :- ProcessTableFunction(invocation=[f(TABLE(#0), 1, DEFAULT(), DEFAULT())], 
uid=[null], select=[out], rowType=[RecordType(VARCHAR(2147483647) out)])
+   :  +- Values(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42 
}]])(reuse_id=[1])
+   +- ProcessTableFunction(invocation=[f(TABLE(#0), 42, DEFAULT(), 
DEFAULT())], uid=[null], select=[out], rowType=[RecordType(VARCHAR(2147483647) 
out)])
+      +- Reused(reference_id=[1])
 ]]>
     </Resource>
   </TestCase>
@@ -252,15 +246,12 @@ 
LogicalSink(table=[default_catalog.default_database.t_keyed_sink], fields=[name,
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-ProcessTableFunction(invocation=[f(TABLE(#0) PARTITION BY($0), 1, DEFAULT(), 
_UTF-16LE'same')], uid=[same], select=[name,out], 
rowType=[RecordType(VARCHAR(5) name, VARCHAR(2147483647) out)])(reuse_id=[1])
-+- Exchange(distribution=[hash[name]])
-   +- Values(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42 }]])
-
-Sink(table=[default_catalog.default_database.t_keyed_sink], fields=[name, out])
-+- Reused(reference_id=[1])
-
 Sink(table=[default_catalog.default_database.t_keyed_sink], fields=[name, out])
-+- Reused(reference_id=[1])
++- Union(all=[true], union=[name, out])
+   :- ProcessTableFunction(invocation=[f(TABLE(#0) PARTITION BY($0), 1, 
DEFAULT(), _UTF-16LE'same')], uid=[same], select=[name,out], 
rowType=[RecordType(VARCHAR(5) name, VARCHAR(2147483647) out)])(reuse_id=[1])
+   :  +- Exchange(distribution=[hash[name]])
+   :     +- Values(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42 }]])
+   +- Reused(reference_id=[1])
 ]]>
     </Resource>
   </TestCase>
@@ -286,17 +277,14 @@ 
LogicalSink(table=[default_catalog.default_database.t_keyed_sink], fields=[name,
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-ProcessTableFunction(invocation=[f(TABLE(#0) PARTITION BY($0), 1, DEFAULT(), 
_UTF-16LE'same')], uid=[same], select=[name,out], 
rowType=[RecordType(VARCHAR(5) name, VARCHAR(2147483647) out)])(reuse_id=[1])
-+- Exchange(distribution=[hash[name]])
-   +- Values(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42 }]])
-
-Sink(table=[default_catalog.default_database.t_keyed_sink], fields=[name, out])
-+- Calc(select=['Bob' AS name, out], where=[(name = 'Bob')])
-   +- Reused(reference_id=[1])
-
 Sink(table=[default_catalog.default_database.t_keyed_sink], fields=[name, out])
-+- Calc(select=['Alice' AS name, out], where=[(name = 'Alice')])
-   +- Reused(reference_id=[1])
++- Union(all=[true], union=[name, out])
+   :- Calc(select=['Bob' AS name, out], where=[(name = 'Bob')])
+   :  +- ProcessTableFunction(invocation=[f(TABLE(#0) PARTITION BY($0), 1, 
DEFAULT(), _UTF-16LE'same')], uid=[same], select=[name,out], 
rowType=[RecordType(VARCHAR(5) name, VARCHAR(2147483647) out)])(reuse_id=[1])
+   :     +- Exchange(distribution=[hash[name]])
+   :        +- Values(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42 
}]])
+   +- Calc(select=['Alice' AS name, out], where=[(name = 'Alice')])
+      +- Reused(reference_id=[1])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
index 9f1e3b17485..ca722ace653 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
@@ -2209,22 +2209,19 @@ 
LogicalSink(table=[default_catalog.default_database.s1], fields=[window_start, w
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-TableSourceScan(table=[[default_catalog, default_database, MyTable, 
project=[b, e, rowtime], metadata=[]]], fields=[b, e, rowtime])(reuse_id=[1])
-
 Sink(table=[default_catalog.default_database.s1], fields=[window_start, 
window_end, wAvg])
-+- Calc(select=[window_start, window_end, wAvg])
-   +- WindowAggregate(window=[TUMBLE(time_col=[rowtime], size=[15 min])], 
select=[weightedAvg(b, e) AS wAvg, start('w$) AS window_start, end('w$) AS 
window_end])
-      +- Exchange(distribution=[single])
-         +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 
1000:INTERVAL SECOND)])
-            +- Reused(reference_id=[1])
-
-Sink(table=[default_catalog.default_database.s1], fields=[window_start, 
window_end, EXPR$2])
-+- Calc(select=[window_start, window_end, EXPR$2])
-   +- WindowAggregate(window=[TUMBLE(time_col=[rowtime], size=[15 min])], 
select=[COUNT(*) AS EXPR$2, start('w$) AS window_start, end('w$) AS window_end])
-      +- Exchange(distribution=[single])
-         +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 
1000:INTERVAL SECOND)])
-            +- Calc(select=[rowtime])
-               +- Reused(reference_id=[1])
++- Union(all=[true], union=[window_start, window_end, wAvg])
+   :- Calc(select=[window_start, window_end, wAvg])
+   :  +- WindowAggregate(window=[TUMBLE(time_col=[rowtime], size=[15 min])], 
select=[weightedAvg(b, e) AS wAvg, start('w$) AS window_start, end('w$) AS 
window_end])
+   :     +- Exchange(distribution=[single])
+   :        +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 
1000:INTERVAL SECOND)])
+   :           +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable, project=[b, e, rowtime], metadata=[]]], fields=[b, e, 
rowtime])(reuse_id=[1])
+   +- Calc(select=[window_start, window_end, EXPR$2])
+      +- WindowAggregate(window=[TUMBLE(time_col=[rowtime], size=[15 min])], 
select=[COUNT(*) AS EXPR$2, start('w$) AS window_start, end('w$) AS window_end])
+         +- Exchange(distribution=[single])
+            +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 
1000:INTERVAL SECOND)])
+               +- Calc(select=[rowtime])
+                  +- Reused(reference_id=[1])
 ]]>
     </Resource>
   </TestCase>
@@ -2254,24 +2251,21 @@ 
LogicalSink(table=[default_catalog.default_database.s1], fields=[window_start, w
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-TableSourceScan(table=[[default_catalog, default_database, MyTable, 
project=[b, e, rowtime], metadata=[]]], fields=[b, e, rowtime])(reuse_id=[1])
-
 Sink(table=[default_catalog.default_database.s1], fields=[window_start, 
window_end, wAvg])
-+- Calc(select=[window_start, window_end, wAvg])
-   +- GlobalWindowAggregate(window=[TUMBLE(slice_end=[$slice_end], size=[15 
min])], select=[weightedAvg(weightedavg$0) AS wAvg, start('w$) AS window_start, 
end('w$) AS window_end])
-      +- Exchange(distribution=[single])
-         +- LocalWindowAggregate(window=[TUMBLE(time_col=[rowtime], size=[15 
min])], select=[weightedAvg(b, e) AS weightedavg$0, slice_end('w$) AS 
$slice_end])
-            +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 
1000:INTERVAL SECOND)])
-               +- Reused(reference_id=[1])
-
-Sink(table=[default_catalog.default_database.s1], fields=[window_start, 
window_end, EXPR$2])
-+- Calc(select=[window_start, window_end, EXPR$2])
-   +- GlobalWindowAggregate(window=[TUMBLE(slice_end=[$slice_end], size=[15 
min])], select=[COUNT(count1$0) AS EXPR$2, start('w$) AS window_start, end('w$) 
AS window_end])
-      +- Exchange(distribution=[single])
-         +- LocalWindowAggregate(window=[TUMBLE(time_col=[rowtime], size=[15 
min])], select=[COUNT(*) AS count1$0, slice_end('w$) AS $slice_end])
-            +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 
1000:INTERVAL SECOND)])
-               +- Calc(select=[rowtime])
-                  +- Reused(reference_id=[1])
++- Union(all=[true], union=[window_start, window_end, wAvg])
+   :- Calc(select=[window_start, window_end, wAvg])
+   :  +- GlobalWindowAggregate(window=[TUMBLE(slice_end=[$slice_end], size=[15 
min])], select=[weightedAvg(weightedavg$0) AS wAvg, start('w$) AS window_start, 
end('w$) AS window_end])
+   :     +- Exchange(distribution=[single])
+   :        +- LocalWindowAggregate(window=[TUMBLE(time_col=[rowtime], 
size=[15 min])], select=[weightedAvg(b, e) AS weightedavg$0, slice_end('w$) AS 
$slice_end])
+   :           +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 
1000:INTERVAL SECOND)])
+   :              +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[b, e, rowtime], metadata=[]]], fields=[b, 
e, rowtime])(reuse_id=[1])
+   +- Calc(select=[window_start, window_end, EXPR$2])
+      +- GlobalWindowAggregate(window=[TUMBLE(slice_end=[$slice_end], size=[15 
min])], select=[COUNT(count1$0) AS EXPR$2, start('w$) AS window_start, end('w$) 
AS window_end])
+         +- Exchange(distribution=[single])
+            +- LocalWindowAggregate(window=[TUMBLE(time_col=[rowtime], 
size=[15 min])], select=[COUNT(*) AS count1$0, slice_end('w$) AS $slice_end])
+               +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 
1000:INTERVAL SECOND)])
+                  +- Calc(select=[rowtime])
+                     +- Reused(reference_id=[1])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
index 6055fcb363b..ce029217766 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
@@ -584,10 +584,7 @@ class TableEnvironmentITCase(tableEnvName: String, 
isStreaming: Boolean) {
 
     val tableResult = stmtSet.execute()
     // only check the schema
-    checkInsertTableResult(
-      tableResult,
-      "default_catalog.default_database.MySink_1",
-      "default_catalog.default_database.MySink_2")
+    checkInsertTableResult(tableResult, 
"default_catalog.default_database.MySink_1")
   }
 
   @TestTemplate


Reply via email to