This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 038db11198d [FLINK-38493][table] Port Calcite's fix for `LATERAL` is
lost while validation
038db11198d is described below
commit 038db11198df37664891c5bc3e9ce7b9bf52ca48
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Fri Oct 10 20:36:06 2025 +0200
[FLINK-38493][table] Port Calcite's fix for `LATERAL` is lost while
validation
---
.../flink-sql-client/src/test/resources/sql/view.q | 36 +++++++++++-----------
.../service/MaterializedTableStatementITCase.java | 16 +++++-----
.../src/test/resources/sql/view.q | 12 ++++----
.../calcite/sql/validate/SqlValidatorImpl.java | 25 ++++++++++++---
.../apache/calcite/sql2rel/SqlToRelConverter.java | 34 ++++++++++++--------
.../SqlAlterMaterializedTableAsQueryConverter.java | 8 ++---
.../SqlCreateMaterializedTableConverter.java | 12 ++------
.../operations/converters/SqlNodeConvertUtils.java | 6 +---
...erializedTableNodeToOperationConverterTest.java | 12 +++++---
.../table/planner/catalog/CatalogViewITCase.scala | 20 ++++++------
.../planner/plan/common/ViewsExpandingTest.scala | 10 +++---
11 files changed, 102 insertions(+), 89 deletions(-)
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/view.q
b/flink-table/flink-sql-client/src/test/resources/sql/view.q
index 261a43b7bc5..d7939d44539 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/view.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/view.q
@@ -59,9 +59,9 @@ create temporary view if not exists v2 as select * from v1;
# test show create a temporary view
show create view v1;
-+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-|
result |
-+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
++--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+|
result |
++--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| CREATE TEMPORARY VIEW `default_catalog`.`default_database`.`v1` (
`user`,
`product`,
@@ -69,18 +69,18 @@ show create view v1;
`ts`,
`ptime`
)
-AS SELECT *
-FROM `default_catalog`.`default_database`.`orders`
+AS SELECT `orders`.`user`, `orders`.`product`, `orders`.`amount`,
`orders`.`ts`, `orders`.`ptime`
+FROM `default_catalog`.`default_database`.`orders` AS `orders`
|
-+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
++--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set
!ok
# test show create a temporary view reference another view
show create view v2;
-+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-|
result |
-+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
++----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+|
result |
++----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| CREATE TEMPORARY VIEW `default_catalog`.`default_database`.`v2` (
`user`,
`product`,
@@ -88,10 +88,10 @@ show create view v2;
`ts`,
`ptime`
)
-AS SELECT *
-FROM `default_catalog`.`default_database`.`v1`
+AS SELECT `v1`.`user`, `v1`.`product`, `v1`.`amount`, `v1`.`ts`, `v1`.`ptime`
+FROM `default_catalog`.`default_database`.`v1` AS `v1`
|
-+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
++----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set
!ok
@@ -142,9 +142,9 @@ create view permanent_v1 as select * from orders;
# test show create a permanent view
show create view permanent_v1;
-+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-|
result |
-+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
++--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+|
result |
++--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| CREATE VIEW `default_catalog`.`default_database`.`permanent_v1` (
`user`,
`product`,
@@ -152,10 +152,10 @@ show create view permanent_v1;
`ts`,
`ptime`
)
-AS SELECT *
-FROM `default_catalog`.`default_database`.`orders`
+AS SELECT `orders`.`user`, `orders`.`product`, `orders`.`amount`,
`orders`.`ts`, `orders`.`ptime`
+FROM `default_catalog`.`default_database`.`orders` AS `orders`
|
-+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
++--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set
!ok
diff --git
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
index 3358fee9cea..2e2499e75ae 100644
---
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
+++
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
@@ -1088,7 +1088,7 @@ class MaterializedTableStatementITCase extends
AbstractMaterializedTableStatemen
String.format(
"SELECT `tmp`.`user_id`, `tmp`.`shop_id`,
`tmp`.`ds`, COUNT(`tmp`.`order_id`) AS `order_cnt`, SUM(`tmp`.`order_amount`)
AS `order_amount_sum`\n"
+ "FROM (SELECT `my_source`.`user_id`,
`my_source`.`shop_id`, `my_source`.`order_created_at` AS `ds`,
`my_source`.`order_id`, 1 AS `order_amount`\n"
- + "FROM `%s`.`test_db`.`my_source`) AS
`tmp`\n"
+ + "FROM `%s`.`test_db`.`my_source` AS
`my_source`) AS `tmp`\n"
+ "GROUP BY ROW(`tmp`.`user_id`,
`tmp`.`shop_id`, `tmp`.`ds`)",
fileSystemCatalogName));
// the refresh handler in full mode should be the same as the old one
@@ -1228,7 +1228,7 @@ class MaterializedTableStatementITCase extends
AbstractMaterializedTableStatemen
String.format(
"SELECT `tmp`.`user_id`, `tmp`.`shop_id`,
`tmp`.`ds`, COUNT(`tmp`.`order_id`) AS `order_cnt`, SUM(`tmp`.`order_amount`)
AS `order_amount_sum`\n"
+ "FROM (SELECT `my_source`.`user_id`,
`my_source`.`shop_id`, `my_source`.`order_created_at` AS `ds`,
`my_source`.`order_id`, 1 AS `order_amount`\n"
- + "FROM `%s`.`test_db`.`my_source`) AS
`tmp`\n"
+ + "FROM `%s`.`test_db`.`my_source` AS
`my_source`) AS `tmp`\n"
+ "GROUP BY ROW(`tmp`.`user_id`,
`tmp`.`shop_id`, `tmp`.`ds`)",
fileSystemCatalogName));
@@ -1313,11 +1313,11 @@ class MaterializedTableStatementITCase extends
AbstractMaterializedTableStatemen
assertThat(newTable.getDefinitionQuery())
.isEqualTo(
String.format(
- "SELECT COALESCE(`tmp`.`user_id`, 0) AS
`user_id`, `tmp`.`shop_id`, COALESCE(`tmp`.`ds`, '') AS `ds`,
SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS `pv`\n"
- + "FROM (SELECT
`datagenSource`.`user_id`, `datagenSource`.`shop_id`,
`DATE_FORMAT`(`datagenSource`.`order_created_at`, 'yyyy-MM-dd') AS `ds`,
`datagenSource`.`payment_amount_cents`\n"
- + "FROM
`%s`.`test_db`.`datagenSource`) AS `tmp`\n"
+ "SELECT COALESCE(`tmp`.`user_id`, CAST(0 AS
BIGINT)) AS `user_id`, `tmp`.`shop_id`, COALESCE(`tmp`.`ds`, '') AS `ds`,
SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS `pv`\n"
+ + "FROM (SELECT
`datagenSource`.`user_id`, `datagenSource`.`shop_id`,
DATE_FORMAT(`datagenSource`.`order_created_at`, 'yyyy-MM-dd') AS `ds`,
`datagenSource`.`payment_amount_cents`\n"
+ + "FROM `%s`.`%s`.`datagenSource` AS
`datagenSource`) AS `tmp`\n"
+ "GROUP BY ROW(`tmp`.`user_id`,
`tmp`.`shop_id`, `tmp`.`ds`)",
- fileSystemCatalogName));
+ fileSystemCatalogName, TEST_DEFAULT_DATABASE));
assertThat(oldTable.getSerializedRefreshHandler())
.isNotEqualTo(newTable.getSerializedRefreshHandler());
@@ -1413,8 +1413,8 @@ class MaterializedTableStatementITCase extends
AbstractMaterializedTableStatemen
.isEqualTo(
String.format(
"SELECT `tmp`.`user_id`, `tmp`.`shop_id`,
`tmp`.`ds`, SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS
`pv`\n"
- + "FROM (SELECT
`datagenSource`.`user_id`, `datagenSource`.`shop_id`,
`DATE_FORMAT`(`datagenSource`.`order_created_at`, 'yyyy-MM-dd') AS `ds`,
`datagenSource`.`payment_amount_cents`\n"
- + "FROM
`%s`.`test_db`.`datagenSource`) AS `tmp`\n"
+ + "FROM (SELECT
`datagenSource`.`user_id`, `datagenSource`.`shop_id`,
DATE_FORMAT(`datagenSource`.`order_created_at`, 'yyyy-MM-dd') AS `ds`,
`datagenSource`.`payment_amount_cents`\n"
+ + "FROM `%s`.`test_db`.`datagenSource`
AS `datagenSource`) AS `tmp`\n"
+ "GROUP BY ROW(`tmp`.`user_id`,
`tmp`.`shop_id`, `tmp`.`ds`)",
fileSystemCatalogName));
assertThat(oldTable.getSerializedRefreshHandler())
diff --git a/flink-table/flink-sql-gateway/src/test/resources/sql/view.q
b/flink-table/flink-sql-gateway/src/test/resources/sql/view.q
index 3837fc8b64f..b05da00aadc 100644
--- a/flink-table/flink-sql-gateway/src/test/resources/sql/view.q
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/view.q
@@ -91,8 +91,8 @@ CREATE TEMPORARY VIEW
`default_catalog`.`default_database`.`v1` (
`ts`,
`ptime`
)
-AS SELECT *
-FROM `default_catalog`.`default_database`.`orders`
+AS SELECT `orders`.`user`, `orders`.`product`, `orders`.`amount`,
`orders`.`ts`, `orders`.`ptime`
+FROM `default_catalog`.`default_database`.`orders` AS `orders`
!ok
# test show create a temporary view reference another view
@@ -105,8 +105,8 @@ CREATE TEMPORARY VIEW
`default_catalog`.`default_database`.`v2` (
`ts`,
`ptime`
)
-AS SELECT *
-FROM `default_catalog`.`default_database`.`v1`
+AS SELECT `v1`.`user`, `v1`.`product`, `v1`.`amount`, `v1`.`ts`, `v1`.`ptime`
+FROM `default_catalog`.`default_database`.`v1` AS `v1`
!ok
show tables;
@@ -178,8 +178,8 @@ CREATE VIEW
`default_catalog`.`default_database`.`permanent_v1` (
`ts`,
`ptime`
)
-AS SELECT *
-FROM `default_catalog`.`default_database`.`orders`
+AS SELECT `orders`.`user`, `orders`.`product`, `orders`.`amount`,
`orders`.`ts`, `orders`.`ptime`
+FROM `default_catalog`.`default_database`.`orders` AS `orders`
!ok
# remove permanent_v1 view
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
index 6e94d897974..624fb7e6d7b 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
@@ -167,16 +167,20 @@ import static org.apache.calcite.util.Util.first;
* Default implementation of {@link SqlValidator}, the class was copied over
because of
* CALCITE-4554.
*
- * <p>Lines 197 ~ 200, Flink improves error message for functions without
appropriate arguments in
+ * <p>Lines 202 ~ 205, Flink improves error message for functions without
appropriate arguments in
* handleUnresolvedFunction.
*
- * <p>Lines 2012 ~ 2032, Flink improves error message for functions without
appropriate arguments in
+ * <p>Lines 1270 ~ 1272, CALCITE-7217, should be removed after upgrading
Calcite to 1.41.0.
+ *
+ * <p>Lines 2031 ~ 2045, Flink improves error message for functions without
appropriate arguments in
* handleUnresolvedFunction at {@link
SqlValidatorImpl#handleUnresolvedFunction}.
*
- * <p>Lines 3840 ~ 3844, 6511 ~ 6517 Flink improves Optimize the retrieval of
sub-operands in
+ * <p>Lines 2571 ~ 2588, CALCITE-7217, should be removed after upgrading
Calcite to 1.41.0.
+ *
+ * <p>Lines 3895 ~ 3899, 6574 ~ 6580 Flink improves Optimize the retrieval of
sub-operands in
* SqlCall when using NamedParameters at {@link SqlValidatorImpl#checkRollUp}.
*
- * <p>Lines 5246 ~ 5252, FLINK-24352 Add null check for temporal table check
on SqlSnapshot.
+ * <p>Lines 5315 ~ 5321, FLINK-24352 Add null check for temporal table check
on SqlSnapshot.
*/
public class SqlValidatorImpl implements SqlValidatorWithHints {
// ~ Static fields/initializers
---------------------------------------------
@@ -1263,6 +1267,9 @@ public class SqlValidatorImpl implements
SqlValidatorWithHints {
}
// fall through
case TABLE_REF:
+ // ----- FLINK MODIFICATION BEGIN -----
+ case LATERAL:
+ // ----- FLINK MODIFICATION END -----
case SNAPSHOT:
case OVER:
case COLLECTION_TABLE:
@@ -2561,7 +2568,9 @@ public class SqlValidatorImpl implements
SqlValidatorWithHints {
return newNode;
case LATERAL:
- return registerFrom(
+ // ----- FLINK MODIFICATION BEGIN -----
+ SqlBasicCall sbc = (SqlBasicCall) node;
+ registerFrom(
parentScope,
usingScope,
register,
@@ -2571,6 +2580,12 @@ public class SqlValidatorImpl implements
SqlValidatorWithHints {
extendList,
forceNullable,
true);
+ // Put the usingScope which is a JoinScope,
+ // in order to make visible the left items
+ // of the JOIN tree.
+ scopes.put(node, usingScope);
+ return sbc;
+ // ----- FLINK MODIFICATION END -----
case COLLECTION_TABLE:
call = (SqlCall) node;
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
index ad521c88789..b226cc57014 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
@@ -245,18 +245,19 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
* <p>FLINK modifications are at lines
*
* <ol>
- * <li>Added in FLINK-29081, FLINK-28682, FLINK-33395: Lines 685 ~ 702
- * <li>Added in Flink-24024: Lines 1452 ~ 1458
- * <li>Added in Flink-24024: Lines 1472 ~ 1511
- * <li>Added in Flink-37269: Lines 2249 ~ 2271
- * <li>Added in FLINK-28682: Lines 2382 ~ 2399
- * <li>Added in FLINK-28682: Lines 2436 ~ 2464
- * <li>Added in FLINK-32474: Lines 2521 ~ 2523
- * <li>Added in FLINK-32474: Lines 2527 ~ 2529
- * <li>Added in FLINK-32474: Lines 2545 ~ 2547
- * <li>Added in FLINK-32474: Lines 2960 ~ 2972
- * <li>Added in FLINK-32474: Lines 3073 ~ 3107
- * <li>Added in FLINK-34312: Lines 5937 ~ 5948
+ * <li>Added in FLINK-29081, FLINK-28682, FLINK-33395: Lines 686 ~ 703
+ * <li>Added in Flink-24024: Lines 1453 ~ 1459
+ * <li>Added in Flink-24024: Lines 1473 ~ 1512
+ * <li>Added in Flink-37269: Lines 2250 ~ 2272
+ * <li>Added in FLINK-28682: Lines 2383 ~ 2400
+ * <li>Added in FLINK-28682: Lines 2437 ~ 2465
+ * <li>Added in FLINK-32474: Lines 2522 ~ 2524
+ * <li>Added in FLINK-32474: Lines 2528 ~ 2530
+ * <li>Added in FLINK-32474: Lines 2546 ~ 2548
+ * <li>Added in CALCITE-7217: Lines 2587 ~ 2595, should be dropped with
upgrade to Calcite 1.41.0
+ * <li>Added in FLINK-32474: Lines 2970 ~ 2982
+ * <li>Added in FLINK-32474: Lines 3083 ~ 3117
+ * <li>Added in FLINK-34312: Lines 5947 ~ 5958
* </ol>
*
* <p>In official extension point (i.e. {@link
#convertExtendedExpression(SqlNode, Blackboard)}):
@@ -2583,6 +2584,15 @@ public class SqlToRelConverter {
convertCollectionTable(bb, call2);
return;
+ // ----- FLINK MODIFICATION BEGIN -----
+ case LATERAL:
+ call = (SqlCall) from;
+ // Extract and analyze lateral part of join call.
+ assert call.getOperandList().size() == 1;
+ final SqlCall callLateral = call.operand(0);
+ convertFrom(bb, callLateral, fieldNames);
+ return;
+ // ----- FLINK MODIFICATION END -----
default:
throw new AssertionError("not a join operator " + from);
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableAsQueryConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableAsQueryConverter.java
index a46c46248b9..7c145b5dcf2 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableAsQueryConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableAsQueryConverter.java
@@ -52,14 +52,10 @@ public class SqlAlterMaterializedTableAsQueryConverter
context.toQuotedSqlString(sqlAlterMaterializedTableAsQuery.getAsQuery());
SqlNode validatedQuery =
context.getSqlValidator().validate(sqlAlterMaterializedTableAsQuery.getAsQuery());
- // The LATERAL operator was eliminated during sql validation, thus the
unparsed SQL
- // does not contain LATERAL which is problematic,
- // the issue was resolved in CALCITE-4077
- // (always treat the table function as implicitly LATERAL).
- String definitionQuery = context.expandSqlIdentifiers(originalQuery);
+ String definitionQuery = context.toQuotedSqlString(validatedQuery);
PlannerQueryOperation queryOperation =
new PlannerQueryOperation(
- context.toRelRoot(validatedQuery).project(), () ->
originalQuery);
+ context.toRelRoot(validatedQuery).project(), () ->
definitionQuery);
ResolvedCatalogMaterializedTable oldTable =
getResolvedMaterializedTable(
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
index d836bfdcfdc..46c6dd44fa7 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
@@ -108,19 +108,13 @@ public class SqlCreateMaterializedTableConverter
// get query schema and definition query
SqlNode selectQuery = sqlCreateMaterializedTable.getAsQuery();
- String originalQuery = context.toQuotedSqlString(selectQuery);
- SqlNode validateQuery =
context.getSqlValidator().validate(selectQuery);
+ SqlNode validatedQuery =
context.getSqlValidator().validate(selectQuery);
- // The LATERAL operator was eliminated during sql validation, thus the
unparsed SQL
- // does not contain LATERAL which is problematic,
- // the issue was resolved in CALCITE-4077
- // (always treat the table function as implicitly LATERAL).
- String definitionQuery = context.expandSqlIdentifiers(originalQuery);
+ String definitionQuery = context.toQuotedSqlString(validatedQuery);
PlannerQueryOperation queryOperation =
new PlannerQueryOperation(
- context.toRelRoot(validateQuery).project(),
- () -> context.toQuotedSqlString(validateQuery));
+ context.toRelRoot(validatedQuery).project(), () ->
definitionQuery);
// get schema
ResolvedSchema resolvedSchema = queryOperation.getResolvedSchema();
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java
index 8d00ca5207e..db9140a084a 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java
@@ -85,11 +85,7 @@ class SqlNodeConvertUtils {
context.getSqlValidator().getNamespace(validateQuery);
validateDuplicatedColumnNames(query, viewFields, validatedNamespace);
- // The LATERAL operator was eliminated during sql validation, thus the
unparsed SQL
- // does not contain LATERAL which is problematic,
- // the issue was resolved in CALCITE-4077
- // (always treat the table function as implicitly LATERAL).
- String expandedQuery = context.expandSqlIdentifiers(originalQuery);
+ String expandedQuery = context.toQuotedSqlString(query);
PlannerQueryOperation operation = toQueryOperation(validateQuery,
context);
ResolvedSchema schema = operation.getResolvedSchema();
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
index acc651b1e5b..39275541d8c 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
@@ -148,7 +148,9 @@ public class
SqlMaterializedTableNodeToOperationConverterTest
.logicalRefreshMode(CatalogMaterializedTable.LogicalRefreshMode.FULL)
.refreshMode(CatalogMaterializedTable.RefreshMode.FULL)
.refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING)
- .definitionQuery("SELECT *\n" + "FROM
`builtin`.`default`.`t1`")
+ .definitionQuery(
+ "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`,
`t1`.`d`\n"
+ + "FROM `builtin`.`default`.`t1` AS
`t1`")
.build();
assertThat(materializedTable.getOrigin()).isEqualTo(expected);
@@ -185,7 +187,7 @@ public class
SqlMaterializedTableNodeToOperationConverterTest
assertThat(createOperation.getCatalogMaterializedTable().getDefinitionQuery())
.isEqualTo(
"SELECT `t1`.`a`, `T`.`f1`, `T`.`f2`\n"
- + "FROM `builtin`.`default`.`t1`,\n"
+ + "FROM `builtin`.`default`.`t1` AS `t1`,\n"
+ "LATERAL
TABLE(`builtin`.`default`.`myFunc`(`b`)) AS `T` (`f1`, `f2`)");
}
@@ -470,11 +472,11 @@ public class
SqlMaterializedTableNodeToOperationConverterTest
Column.physical("f",
DataTypes.VARCHAR(Integer.MAX_VALUE))),
TableChange.modifyDefinitionQuery(
"SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`,
`t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n"
- + "FROM
`builtin`.`default`.`t3`")));
+ + "FROM
`builtin`.`default`.`t3` AS `t3`")));
assertThat(operation.asSummaryString())
.isEqualTo(
"ALTER MATERIALIZED TABLE builtin.default.base_mtbl AS
SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS
STRING) AS `f`\n"
- + "FROM `builtin`.`default`.`t3`");
+ + "FROM `builtin`.`default`.`t3` AS `t3`");
// new table only difference schema & definition query with old table.
CatalogMaterializedTable oldTable =
@@ -525,7 +527,7 @@ public class
SqlMaterializedTableNodeToOperationConverterTest
TableChange.add(Column.physical("a0",
DataTypes.INT())),
TableChange.modifyDefinitionQuery(
"SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`,
`t3`.`d`, `t3`.`c` AS `a`\n"
- + "FROM
`builtin`.`default`.`t3`")));
+ + "FROM
`builtin`.`default`.`t3` AS `t3`")));
}
@Test
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogViewITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogViewITCase.scala
index f9dcc8330fc..061d2053833 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogViewITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogViewITCase.scala
@@ -355,7 +355,7 @@ class CatalogViewITCase(isStreamingMode: Boolean) extends
TableITCaseBase {
| `c`
|)
|AS SELECT `T1`.`a`, `T1`.`b`, `T1`.`c`
- |FROM `default_catalog`.`default_database`.`T1`
+ |FROM `default_catalog`.`default_database`.`T1` AS `T1`
|""".stripMargin
)
)
@@ -375,7 +375,7 @@ class CatalogViewITCase(isStreamingMode: Boolean) extends
TableITCaseBase {
| `f`
|)
|AS SELECT `T1`.`a`, `T1`.`b`, `T1`.`c`
- |FROM `default_catalog`.`default_database`.`T1`
+ |FROM `default_catalog`.`default_database`.`T1` AS `T1`
|""".stripMargin
)
)
@@ -402,7 +402,7 @@ class CatalogViewITCase(isStreamingMode: Boolean) extends
TableITCaseBase {
| `c`
|)
|AS SELECT `T1`.`a`, `T1`.`b`, `T1`.`c`
- |FROM `default_catalog`.`default_database`.`T1`
+ |FROM `default_catalog`.`default_database`.`T1` AS `T1`
|""".stripMargin
)
)
@@ -422,7 +422,7 @@ class CatalogViewITCase(isStreamingMode: Boolean) extends
TableITCaseBase {
| `z`
|)
|AS SELECT `T1`.`a`, `T1`.`b`, `T1`.`c`
- |FROM `default_catalog`.`default_database`.`T1`
+ |FROM `default_catalog`.`default_database`.`T1` AS `T1`
|""".stripMargin
)
)
@@ -449,8 +449,8 @@ class CatalogViewITCase(isStreamingMode: Boolean) extends
TableITCaseBase {
| `max_value`
|)
|AS SELECT MAX(`t1`.`a`) AS `max_value`
- |FROM `default_catalog`.`default_database`.`t1`
- |LEFT JOIN `default_catalog`.`default_database`.`t2` ON `t1`.`c` =
`t2`.`c`
+ |FROM `default_catalog`.`default_database`.`t1` AS `t1`
+ |LEFT JOIN `default_catalog`.`default_database`.`t2` AS `t2` ON
`t1`.`c` = `t2`.`c`
|""".stripMargin
)
)
@@ -484,8 +484,8 @@ class CatalogViewITCase(isStreamingMode: Boolean) extends
TableITCaseBase {
| `b2`
|)
|AS SELECT `udfEqualsOne`() AS `a`, `t1`.`a` AS `a1`, `t2`.`b` AS
`b2`
- |FROM `default_catalog`.`default_database`.`t1`
- |CROSS JOIN `default_catalog`.`default_database`.`t2`
+ |FROM `default_catalog`.`default_database`.`t1` AS `t1`
+ |CROSS JOIN `default_catalog`.`default_database`.`t2` AS `t2`
|""".stripMargin
)
)
@@ -514,8 +514,8 @@ class CatalogViewITCase(isStreamingMode: Boolean) extends
TableITCaseBase {
| `b2`
|)
|AS SELECT `t1`.`a` AS `a1`, `t2`.`b` AS `b2`
- |FROM `default_catalog`.`default_database`.`t1`
- |INNER JOIN `default_catalog`.`default_database`.`t2` ON `t1`.`c` =
`t2`.`c`
+ |FROM `default_catalog`.`default_database`.`t1` AS `t1`
+ |INNER JOIN `default_catalog`.`default_database`.`t2` AS `t2` ON
`t1`.`c` = `t2`.`c`
|""".stripMargin
)
)
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.scala
index 6a496f5bffd..9371333e3ac 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.scala
@@ -150,7 +150,7 @@ class ViewsExpandingTest(tableTestUtil: TableTestBase =>
TableTestUtil) extends
.getTable(objectID.toObjectPath)
assertThat(
view.asInstanceOf[CatalogView].getExpandedQuery
- ).isEqualTo("SELECT `CONCAT`('a', 'bc', 'def')")
+ ).isEqualTo("SELECT CONCAT('a', 'bc', 'def')")
}
@TestTemplate
@@ -172,7 +172,7 @@ class ViewsExpandingTest(tableTestUtil: TableTestBase =>
TableTestUtil) extends
.getTable(objectID.toObjectPath)
assertThat(
view.asInstanceOf[CatalogView].getExpandedQuery
- ).isEqualTo("SELECT `default_catalog`.`default_database`.`func`(1, 2,
'abc')")
+ ).isEqualTo("SELECT `default_catalog`.`default_database`.`func`(1, CAST(2
AS BIGINT), 'abc')")
}
@TestTemplate
@@ -201,12 +201,12 @@ class ViewsExpandingTest(tableTestUtil: TableTestBase =>
TableTestUtil) extends
assertThat(
view.asInstanceOf[CatalogView].getExpandedQuery
).isEqualTo(
- "SELECT *\n"
+ "SELECT `EXPR$0`.`f0`, `EXPR$0`.`rowNum`\n"
+ "FROM (SELECT `source`.`f0`, "
+ "ROW_NUMBER() "
+ "OVER (PARTITION BY `source`.`f0` ORDER BY `source`.`f0` DESC) AS
`rowNum`\n"
- + "FROM `default_catalog`.`default_database`.`source`)\n"
- + "WHERE `rowNum` = 1")
+ + "FROM `default_catalog`.`default_database`.`source` AS `source`) AS
`EXPR$0`\n"
+ + "WHERE `EXPR$0`.`rowNum` = 1")
}
private def createSqlView(originTable: String): CatalogView = {