This is an automated email from the ASF dual-hosted git repository.
xuyangzhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 9fe66715530 [FLINK-37720][table-planner] Enable sink reuse table
optimizer by default and apply plan changes to existing test cases (#26503)
9fe66715530 is described below
commit 9fe66715530e1cab4658e1e974141e3e6204cde6
Author: xiangyu0xf <[email protected]>
AuthorDate: Mon May 12 19:24:28 2025 +0800
[FLINK-37720][table-planner] Enable sink reuse table optimizer by default
and apply plan changes to existing test cases (#26503)
---
.../generated/optimizer_config_configuration.html | 2 +-
.../src/test/resources/sql_multi/statement_set.q | 24 ++++-----
.../src/test/resources/sql/statement_set.q | 16 +++---
.../table/api/config/OptimizerConfigOptions.java | 2 +-
.../explain/testStatementSetExecutionExplain.out | 34 +++----------
.../planner/plan/batch/sql/TableSourceTest.xml | 25 ++++------
.../physical/stream/ChangelogModeInferenceTest.xml | 51 ++++++++++---------
.../plan/stream/sql/ProcessTableFunctionTest.xml | 58 +++++++++-------------
.../plan/stream/sql/agg/WindowAggregateTest.xml | 58 ++++++++++------------
.../flink/table/api/TableEnvironmentITCase.scala | 5 +-
10 files changed, 110 insertions(+), 165 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/optimizer_config_configuration.html
b/docs/layouts/shortcodes/generated/optimizer_config_configuration.html
index e59a1289b10..ae2caa7aad8 100644
--- a/docs/layouts/shortcodes/generated/optimizer_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/optimizer_config_configuration.html
@@ -85,7 +85,7 @@ ONE_PHASE: Enforce to use one stage aggregate which only has
CompleteGlobalAggre
</tr>
<tr>
<td><h5>table.optimizer.reuse-sink-enabled</h5><br> <span
class="label label-primary">Batch</span> <span class="label
label-primary">Streaming</span></td>
- <td style="word-wrap: break-word;">false</td>
+ <td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>When it is true, the optimizer will try to find out duplicated
table sinks and reuse them. This works only when
table.optimizer.reuse-sub-plan-enabled is true.</td>
</tr>
diff --git
a/flink-table/flink-sql-client/src/test/resources/sql_multi/statement_set.q
b/flink-table/flink-sql-client/src/test/resources/sql_multi/statement_set.q
index 39230d12921..afee14890fa 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql_multi/statement_set.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql_multi/statement_set.q
@@ -73,9 +73,9 @@ INSERT INTO StreamingTable SELECT * FROM (VALUES (1, 'Hello
World'), (2, 'Hi'),
INSERT INTO StreamingTable SELECT * FROM (VALUES (1, 'Hello World'), (2,
'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
END;
!output
-+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
-|
[...]
-+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
++-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
+|
[...]
++-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
| == Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.StreamingTable],
fields=[EXPR$0, EXPR$1])
+- LogicalProject(EXPR$0=[$0], EXPR$1=[$1])
@@ -87,21 +87,17 @@
LogicalSink(table=[default_catalog.default_database.StreamingTable], fields=[EXP
== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0,
EXPR$1])
-+- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 1,
_UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3,
_UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5,
_UTF-16LE'LINE' }]])
-
-Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0,
EXPR$1])
-+- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 1,
_UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3,
_UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5,
_UTF-16LE'LINE' }]])
++- Union(all=[true], union=[EXPR$0, EXPR$1])
+ :- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{
1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3,
_UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5,
_UTF-16LE'LINE' }]])
+ +- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{
1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3,
_UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5,
_UTF-16LE'LINE' }]])
== Optimized Execution Plan ==
-Values(tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2,
_UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4,
_UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]])(reuse_id=[1])
-
Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0,
EXPR$1])
-+- Reused(reference_id=[1])
-
-Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0,
EXPR$1])
-+- Reused(reference_id=[1])
++- Union(all=[true], union=[EXPR$0, EXPR$1])
+ :- Values(tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, {
2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4,
_UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]])(reuse_id=[1])
+ +- Reused(reference_id=[1])
|
-+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
++-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
1 row in set
!ok
diff --git
a/flink-table/flink-sql-gateway/src/test/resources/sql/statement_set.q
b/flink-table/flink-sql-gateway/src/test/resources/sql/statement_set.q
index cd9a33aca58..486fe3d43cc 100644
--- a/flink-table/flink-sql-gateway/src/test/resources/sql/statement_set.q
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/statement_set.q
@@ -105,19 +105,15 @@
LogicalSink(table=[default_catalog.default_database.StreamingTable], fields=[EXP
== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0,
EXPR$1])
-+- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 1,
_UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3,
_UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5,
_UTF-16LE'LINE' }]])
-
-Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0,
EXPR$1])
-+- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 1,
_UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3,
_UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5,
_UTF-16LE'LINE' }]])
++- Union(all=[true], union=[EXPR$0, EXPR$1])
+ :- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{
1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3,
_UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5,
_UTF-16LE'LINE' }]])
+ +- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{
1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3,
_UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5,
_UTF-16LE'LINE' }]])
== Optimized Execution Plan ==
-Values(tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2,
_UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4,
_UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]])(reuse_id=[1])
-
Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0,
EXPR$1])
-+- Reused(reference_id=[1])
-
-Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0,
EXPR$1])
-+- Reused(reference_id=[1])
++- Union(all=[true], union=[EXPR$0, EXPR$1])
+ :- Values(tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, {
2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4,
_UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]])(reuse_id=[1])
+ +- Reused(reference_id=[1])
!ok
EXECUTE STATEMENT SET BEGIN
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
index 23d15d0f978..62d085d5cd6 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
@@ -109,7 +109,7 @@ public class OptimizerConfigOptions {
public static final ConfigOption<Boolean>
TABLE_OPTIMIZER_REUSE_SINK_ENABLED =
key("table.optimizer.reuse-sink-enabled")
.booleanType()
- .defaultValue(false)
+ .defaultValue(true)
.withDescription(
"When it is true, the optimizer will try to find
out duplicated table sinks and "
+ "reuse them. This works only when "
diff --git
a/flink-table/flink-table-planner/src/test/resources/explain/testStatementSetExecutionExplain.out
b/flink-table/flink-table-planner/src/test/resources/explain/testStatementSetExecutionExplain.out
index edee1984ddd..d4e6a87a3d1 100644
---
a/flink-table/flink-table-planner/src/test/resources/explain/testStatementSetExecutionExplain.out
+++
b/flink-table/flink-table-planner/src/test/resources/explain/testStatementSetExecutionExplain.out
@@ -9,17 +9,15 @@ LogicalSink(table=[default_catalog.default_database.MySink],
fields=[first])
== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.MySink], fields=[last])
-+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [CsvTableSource(read fields: last)]]], fields=[last])
-
-Sink(table=[default_catalog.default_database.MySink], fields=[first])
-+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [CsvTableSource(read fields: first)]]], fields=[first])
++- Union(all=[true], union=[last])
+ :- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [CsvTableSource(read fields: last)]]], fields=[last])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [CsvTableSource(read fields: first)]]], fields=[first])
== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.MySink], fields=[last])
-+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [CsvTableSource(read fields: last)]]], fields=[last])
-
-Sink(table=[default_catalog.default_database.MySink], fields=[first])
-+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [CsvTableSource(read fields: first)]]], fields=[first])
++- Union(all=[true], union=[last])
+ :- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [CsvTableSource(read fields: last)]]], fields=[last])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [CsvTableSource(read fields: first)]]], fields=[first])
== Physical Execution Plan ==
{
@@ -51,17 +49,6 @@ Sink(table=[default_catalog.default_database.MySink],
fields=[first])
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
- }, {
- "id" : ,
- "type" : "StreamingFileWriter",
- "pact" : "Operator",
- "contents" : "StreamingFileWriter",
- "parallelism" : 1,
- "predecessors" : [ {
- "id" : ,
- "ship_strategy" : "FORWARD",
- "side" : "second"
- } ]
}, {
"id" : ,
"type" : "Source: Custom File source",
@@ -100,14 +87,7 @@ Sink(table=[default_catalog.default_database.MySink],
fields=[first])
"id" : ,
"ship_strategy" : "FORWARD",
"side" : "second"
- } ]
- }, {
- "id" : ,
- "type" : "end: Writer",
- "pact" : "Operator",
- "contents" : "end: Writer",
- "parallelism" : 1,
- "predecessors" : [ {
+ }, {
"id" : ,
"ship_strategy" : "FORWARD",
"side" : "second"
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml
index 9813db3c568..1ba58f298ae 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml
@@ -157,11 +157,10 @@
LogicalSink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
<![CDATA[
Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
+- Union(all=[true], union=[a, b, c])
- :- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=0}]]])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=1}]]])
-
-Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
-+- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=2}]]])
+ :- Union(all=[true], union=[a, b, c])
+ : :- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=0}]]])
+ : +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=1}]]])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=2}]]])
]]>
</Resource>
</TestCase>
@@ -196,13 +195,10 @@
LogicalSink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c], hints=[[[OPTIONS
options:{source.num-element-to-skip=1}]]])(reuse_id=[1])
-
-Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
-+- Reused(reference_id=[1])
-
Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
-+- Reused(reference_id=[1])
++- Union(all=[true], union=[a, b, c])
+ :- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c], hints=[[[OPTIONS
options:{source.num-element-to-skip=1}]]])(reuse_id=[1])
+ +- Reused(reference_id=[1])
]]>
</Resource>
</TestCase>
@@ -221,10 +217,9 @@
LogicalSink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
<Resource name="optimized exec plan">
<![CDATA[
Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
-+- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=1}]]])
-
-Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
-+- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=2}]]])
++- Union(all=[true], union=[a, b, c])
+ :- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=1}]]])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=2}]]])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml
index 513a568b5aa..bd5a6a20be2 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml
@@ -367,32 +367,31 @@
LogicalSink(table=[default_catalog.default_database.sink2], fields=[number, word
<Resource name="optimized rel plan">
<![CDATA[
Sink(table=[default_catalog.default_database.sink1], fields=[number, word],
changelogMode=[NONE])
-+- Calc(select=[number, word], where=[>(word, 'a')], changelogMode=[I,UB,UA])
- +- Union(all=[true], union=[number, word], changelogMode=[I,UB,UA])
- :- Calc(select=[+(number, 1) AS number, word], changelogMode=[I,UB,UA])
- : +- Calc(select=[number, word], changelogMode=[I,UB,UA])
- : +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS
number], changelogMode=[I,UB,UA])
- : +- Exchange(distribution=[hash[word]], changelogMode=[I])
- : +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [CollectionTableSource(word, number)]]],
fields=[word, number], changelogMode=[I])
- +- Calc(select=[-(number, 1) AS number, word], changelogMode=[I,UB,UA])
- +- Calc(select=[number, word], changelogMode=[I,UB,UA])
- +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS
number], changelogMode=[I,UB,UA])
- +- Exchange(distribution=[hash[word]], changelogMode=[I])
- +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [CollectionTableSource(word, number)]]],
fields=[word, number], changelogMode=[I])
-
-Sink(table=[default_catalog.default_database.sink1], fields=[number, word],
changelogMode=[NONE])
-+- Calc(select=[number, word], where=[<(word, 'a')], changelogMode=[I,UB,UA])
- +- Union(all=[true], union=[number, word], changelogMode=[I,UB,UA])
- :- Calc(select=[+(number, 1) AS number, word], changelogMode=[I,UB,UA])
- : +- Calc(select=[number, word], changelogMode=[I,UB,UA])
- : +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS
number], changelogMode=[I,UB,UA])
- : +- Exchange(distribution=[hash[word]], changelogMode=[I])
- : +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [CollectionTableSource(word, number)]]],
fields=[word, number], changelogMode=[I])
- +- Calc(select=[-(number, 1) AS number, word], changelogMode=[I,UB,UA])
- +- Calc(select=[number, word], changelogMode=[I,UB,UA])
- +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS
number], changelogMode=[I,UB,UA])
- +- Exchange(distribution=[hash[word]], changelogMode=[I])
- +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [CollectionTableSource(word, number)]]],
fields=[word, number], changelogMode=[I])
++- Union(all=[true], union=[number, word], changelogMode=[I,UB,UA])
+ :- Calc(select=[number, word], where=[>(word, 'a')],
changelogMode=[I,UB,UA])
+ : +- Union(all=[true], union=[number, word], changelogMode=[I,UB,UA])
+ : :- Calc(select=[+(number, 1) AS number, word],
changelogMode=[I,UB,UA])
+ : : +- Calc(select=[number, word], changelogMode=[I,UB,UA])
+ : : +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS
number], changelogMode=[I,UB,UA])
+ : : +- Exchange(distribution=[hash[word]], changelogMode=[I])
+ : : +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [CollectionTableSource(word, number)]]],
fields=[word, number], changelogMode=[I])
+ : +- Calc(select=[-(number, 1) AS number, word],
changelogMode=[I,UB,UA])
+ : +- Calc(select=[number, word], changelogMode=[I,UB,UA])
+ : +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS
number], changelogMode=[I,UB,UA])
+ : +- Exchange(distribution=[hash[word]], changelogMode=[I])
+ : +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [CollectionTableSource(word, number)]]],
fields=[word, number], changelogMode=[I])
+ +- Calc(select=[number, word], where=[<(word, 'a')],
changelogMode=[I,UB,UA])
+ +- Union(all=[true], union=[number, word], changelogMode=[I,UB,UA])
+ :- Calc(select=[+(number, 1) AS number, word],
changelogMode=[I,UB,UA])
+ : +- Calc(select=[number, word], changelogMode=[I,UB,UA])
+ : +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS
number], changelogMode=[I,UB,UA])
+ : +- Exchange(distribution=[hash[word]], changelogMode=[I])
+ : +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [CollectionTableSource(word, number)]]],
fields=[word, number], changelogMode=[I])
+ +- Calc(select=[-(number, 1) AS number, word],
changelogMode=[I,UB,UA])
+ +- Calc(select=[number, word], changelogMode=[I,UB,UA])
+ +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS
number], changelogMode=[I,UB,UA])
+ +- Exchange(distribution=[hash[word]], changelogMode=[I])
+ +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [CollectionTableSource(word, number)]]],
fields=[word, number], changelogMode=[I])
Sink(table=[default_catalog.default_database.sink2], fields=[number, word],
changelogMode=[NONE])
+- Calc(select=[number, word], changelogMode=[I,UB,UA])
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml
index e71a198ba6e..b18b5a61bcf 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml
@@ -129,16 +129,13 @@
LogicalSink(table=[default_catalog.default_database.t_keyed_sink], fields=[name,
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Exchange(distribution=[hash[name]])(reuse_id=[1])
-+- Values(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42 }]])
-
-Sink(table=[default_catalog.default_database.t_keyed_sink], fields=[name, out])
-+- ProcessTableFunction(invocation=[f(TABLE(#0) PARTITION BY($0), 1,
DEFAULT(), _UTF-16LE'a')], uid=[a], select=[name,out],
rowType=[RecordType(VARCHAR(5) name, VARCHAR(2147483647) out)])
- +- Reused(reference_id=[1])
-
Sink(table=[default_catalog.default_database.t_keyed_sink], fields=[name, out])
-+- ProcessTableFunction(invocation=[f(TABLE(#0) PARTITION BY($0), 1,
DEFAULT(), _UTF-16LE'b')], uid=[b], select=[name,out],
rowType=[RecordType(VARCHAR(5) name, VARCHAR(2147483647) out)])
- +- Reused(reference_id=[1])
++- Union(all=[true], union=[name, out])
+ :- ProcessTableFunction(invocation=[f(TABLE(#0) PARTITION BY($0), 1,
DEFAULT(), _UTF-16LE'a')], uid=[a], select=[name,out],
rowType=[RecordType(VARCHAR(5) name, VARCHAR(2147483647) out)])
+ : +- Exchange(distribution=[hash[name]])(reuse_id=[1])
+ : +- Values(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42 }]])
+ +- ProcessTableFunction(invocation=[f(TABLE(#0) PARTITION BY($0), 1,
DEFAULT(), _UTF-16LE'b')], uid=[b], select=[name,out],
rowType=[RecordType(VARCHAR(5) name, VARCHAR(2147483647) out)])
+ +- Reused(reference_id=[1])
]]>
</Resource>
</TestCase>
@@ -179,15 +176,12 @@
LogicalSink(table=[default_catalog.default_database.t_sink], fields=[out])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Values(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42
}]])(reuse_id=[1])
-
-Sink(table=[default_catalog.default_database.t_sink], fields=[out])
-+- ProcessTableFunction(invocation=[f(TABLE(#0), 1, DEFAULT(), DEFAULT())],
uid=[null], select=[out], rowType=[RecordType(VARCHAR(2147483647) out)])
- +- Reused(reference_id=[1])
-
Sink(table=[default_catalog.default_database.t_sink], fields=[out])
-+- ProcessTableFunction(invocation=[f(TABLE(#0), 42, DEFAULT(), DEFAULT())],
uid=[null], select=[out], rowType=[RecordType(VARCHAR(2147483647) out)])
- +- Reused(reference_id=[1])
++- Union(all=[true], union=[out])
+ :- ProcessTableFunction(invocation=[f(TABLE(#0), 1, DEFAULT(), DEFAULT())],
uid=[null], select=[out], rowType=[RecordType(VARCHAR(2147483647) out)])
+ : +- Values(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42
}]])(reuse_id=[1])
+ +- ProcessTableFunction(invocation=[f(TABLE(#0), 42, DEFAULT(),
DEFAULT())], uid=[null], select=[out], rowType=[RecordType(VARCHAR(2147483647)
out)])
+ +- Reused(reference_id=[1])
]]>
</Resource>
</TestCase>
@@ -252,15 +246,12 @@
LogicalSink(table=[default_catalog.default_database.t_keyed_sink], fields=[name,
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-ProcessTableFunction(invocation=[f(TABLE(#0) PARTITION BY($0), 1, DEFAULT(),
_UTF-16LE'same')], uid=[same], select=[name,out],
rowType=[RecordType(VARCHAR(5) name, VARCHAR(2147483647) out)])(reuse_id=[1])
-+- Exchange(distribution=[hash[name]])
- +- Values(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42 }]])
-
-Sink(table=[default_catalog.default_database.t_keyed_sink], fields=[name, out])
-+- Reused(reference_id=[1])
-
Sink(table=[default_catalog.default_database.t_keyed_sink], fields=[name, out])
-+- Reused(reference_id=[1])
++- Union(all=[true], union=[name, out])
+ :- ProcessTableFunction(invocation=[f(TABLE(#0) PARTITION BY($0), 1,
DEFAULT(), _UTF-16LE'same')], uid=[same], select=[name,out],
rowType=[RecordType(VARCHAR(5) name, VARCHAR(2147483647) out)])(reuse_id=[1])
+ : +- Exchange(distribution=[hash[name]])
+ : +- Values(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42 }]])
+ +- Reused(reference_id=[1])
]]>
</Resource>
</TestCase>
@@ -286,17 +277,14 @@
LogicalSink(table=[default_catalog.default_database.t_keyed_sink], fields=[name,
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-ProcessTableFunction(invocation=[f(TABLE(#0) PARTITION BY($0), 1, DEFAULT(),
_UTF-16LE'same')], uid=[same], select=[name,out],
rowType=[RecordType(VARCHAR(5) name, VARCHAR(2147483647) out)])(reuse_id=[1])
-+- Exchange(distribution=[hash[name]])
- +- Values(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42 }]])
-
-Sink(table=[default_catalog.default_database.t_keyed_sink], fields=[name, out])
-+- Calc(select=['Bob' AS name, out], where=[(name = 'Bob')])
- +- Reused(reference_id=[1])
-
Sink(table=[default_catalog.default_database.t_keyed_sink], fields=[name, out])
-+- Calc(select=['Alice' AS name, out], where=[(name = 'Alice')])
- +- Reused(reference_id=[1])
++- Union(all=[true], union=[name, out])
+ :- Calc(select=['Bob' AS name, out], where=[(name = 'Bob')])
+ : +- ProcessTableFunction(invocation=[f(TABLE(#0) PARTITION BY($0), 1,
DEFAULT(), _UTF-16LE'same')], uid=[same], select=[name,out],
rowType=[RecordType(VARCHAR(5) name, VARCHAR(2147483647) out)])(reuse_id=[1])
+ : +- Exchange(distribution=[hash[name]])
+ : +- Values(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42
}]])
+ +- Calc(select=['Alice' AS name, out], where=[(name = 'Alice')])
+ +- Reused(reference_id=[1])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
index 9f1e3b17485..ca722ace653 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
@@ -2209,22 +2209,19 @@
LogicalSink(table=[default_catalog.default_database.s1], fields=[window_start, w
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-TableSourceScan(table=[[default_catalog, default_database, MyTable,
project=[b, e, rowtime], metadata=[]]], fields=[b, e, rowtime])(reuse_id=[1])
-
Sink(table=[default_catalog.default_database.s1], fields=[window_start,
window_end, wAvg])
-+- Calc(select=[window_start, window_end, wAvg])
- +- WindowAggregate(window=[TUMBLE(time_col=[rowtime], size=[15 min])],
select=[weightedAvg(b, e) AS wAvg, start('w$) AS window_start, end('w$) AS
window_end])
- +- Exchange(distribution=[single])
- +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime -
1000:INTERVAL SECOND)])
- +- Reused(reference_id=[1])
-
-Sink(table=[default_catalog.default_database.s1], fields=[window_start,
window_end, EXPR$2])
-+- Calc(select=[window_start, window_end, EXPR$2])
- +- WindowAggregate(window=[TUMBLE(time_col=[rowtime], size=[15 min])],
select=[COUNT(*) AS EXPR$2, start('w$) AS window_start, end('w$) AS window_end])
- +- Exchange(distribution=[single])
- +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime -
1000:INTERVAL SECOND)])
- +- Calc(select=[rowtime])
- +- Reused(reference_id=[1])
++- Union(all=[true], union=[window_start, window_end, wAvg])
+ :- Calc(select=[window_start, window_end, wAvg])
+ : +- WindowAggregate(window=[TUMBLE(time_col=[rowtime], size=[15 min])],
select=[weightedAvg(b, e) AS wAvg, start('w$) AS window_start, end('w$) AS
window_end])
+ : +- Exchange(distribution=[single])
+ : +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime -
1000:INTERVAL SECOND)])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
MyTable, project=[b, e, rowtime], metadata=[]]], fields=[b, e,
rowtime])(reuse_id=[1])
+ +- Calc(select=[window_start, window_end, EXPR$2])
+ +- WindowAggregate(window=[TUMBLE(time_col=[rowtime], size=[15 min])],
select=[COUNT(*) AS EXPR$2, start('w$) AS window_start, end('w$) AS window_end])
+ +- Exchange(distribution=[single])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime -
1000:INTERVAL SECOND)])
+ +- Calc(select=[rowtime])
+ +- Reused(reference_id=[1])
]]>
</Resource>
</TestCase>
@@ -2254,24 +2251,21 @@
LogicalSink(table=[default_catalog.default_database.s1], fields=[window_start, w
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-TableSourceScan(table=[[default_catalog, default_database, MyTable,
project=[b, e, rowtime], metadata=[]]], fields=[b, e, rowtime])(reuse_id=[1])
-
Sink(table=[default_catalog.default_database.s1], fields=[window_start,
window_end, wAvg])
-+- Calc(select=[window_start, window_end, wAvg])
- +- GlobalWindowAggregate(window=[TUMBLE(slice_end=[$slice_end], size=[15
min])], select=[weightedAvg(weightedavg$0) AS wAvg, start('w$) AS window_start,
end('w$) AS window_end])
- +- Exchange(distribution=[single])
- +- LocalWindowAggregate(window=[TUMBLE(time_col=[rowtime], size=[15
min])], select=[weightedAvg(b, e) AS weightedavg$0, slice_end('w$) AS
$slice_end])
- +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime -
1000:INTERVAL SECOND)])
- +- Reused(reference_id=[1])
-
-Sink(table=[default_catalog.default_database.s1], fields=[window_start,
window_end, EXPR$2])
-+- Calc(select=[window_start, window_end, EXPR$2])
- +- GlobalWindowAggregate(window=[TUMBLE(slice_end=[$slice_end], size=[15
min])], select=[COUNT(count1$0) AS EXPR$2, start('w$) AS window_start, end('w$)
AS window_end])
- +- Exchange(distribution=[single])
- +- LocalWindowAggregate(window=[TUMBLE(time_col=[rowtime], size=[15
min])], select=[COUNT(*) AS count1$0, slice_end('w$) AS $slice_end])
- +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime -
1000:INTERVAL SECOND)])
- +- Calc(select=[rowtime])
- +- Reused(reference_id=[1])
++- Union(all=[true], union=[window_start, window_end, wAvg])
+ :- Calc(select=[window_start, window_end, wAvg])
+ : +- GlobalWindowAggregate(window=[TUMBLE(slice_end=[$slice_end], size=[15
min])], select=[weightedAvg(weightedavg$0) AS wAvg, start('w$) AS window_start,
end('w$) AS window_end])
+ : +- Exchange(distribution=[single])
+ : +- LocalWindowAggregate(window=[TUMBLE(time_col=[rowtime],
size=[15 min])], select=[weightedAvg(b, e) AS weightedavg$0, slice_end('w$) AS
$slice_end])
+ : +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime -
1000:INTERVAL SECOND)])
+ : +- TableSourceScan(table=[[default_catalog,
default_database, MyTable, project=[b, e, rowtime], metadata=[]]], fields=[b,
e, rowtime])(reuse_id=[1])
+ +- Calc(select=[window_start, window_end, EXPR$2])
+ +- GlobalWindowAggregate(window=[TUMBLE(slice_end=[$slice_end], size=[15
min])], select=[COUNT(count1$0) AS EXPR$2, start('w$) AS window_start, end('w$)
AS window_end])
+ +- Exchange(distribution=[single])
+ +- LocalWindowAggregate(window=[TUMBLE(time_col=[rowtime],
size=[15 min])], select=[COUNT(*) AS count1$0, slice_end('w$) AS $slice_end])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime -
1000:INTERVAL SECOND)])
+ +- Calc(select=[rowtime])
+ +- Reused(reference_id=[1])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
index 6055fcb363b..ce029217766 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
@@ -584,10 +584,7 @@ class TableEnvironmentITCase(tableEnvName: String,
isStreaming: Boolean) {
val tableResult = stmtSet.execute()
// only check the schema
- checkInsertTableResult(
- tableResult,
- "default_catalog.default_database.MySink_1",
- "default_catalog.default_database.MySink_2")
+ checkInsertTableResult(tableResult,
"default_catalog.default_database.MySink_1")
}
@TestTemplate