This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 038db11198d [FLINK-38493][table] Port Calcite's fix for `LATERAL` is 
lost while validation
038db11198d is described below

commit 038db11198df37664891c5bc3e9ce7b9bf52ca48
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Fri Oct 10 20:36:06 2025 +0200

    [FLINK-38493][table] Port Calcite's fix for `LATERAL` is lost while 
validation
---
 .../flink-sql-client/src/test/resources/sql/view.q | 36 +++++++++++-----------
 .../service/MaterializedTableStatementITCase.java  | 16 +++++-----
 .../src/test/resources/sql/view.q                  | 12 ++++----
 .../calcite/sql/validate/SqlValidatorImpl.java     | 25 ++++++++++++---
 .../apache/calcite/sql2rel/SqlToRelConverter.java  | 34 ++++++++++++--------
 .../SqlAlterMaterializedTableAsQueryConverter.java |  8 ++---
 .../SqlCreateMaterializedTableConverter.java       | 12 ++------
 .../operations/converters/SqlNodeConvertUtils.java |  6 +---
 ...erializedTableNodeToOperationConverterTest.java | 12 +++++---
 .../table/planner/catalog/CatalogViewITCase.scala  | 20 ++++++------
 .../planner/plan/common/ViewsExpandingTest.scala   | 10 +++---
 11 files changed, 102 insertions(+), 89 deletions(-)

diff --git a/flink-table/flink-sql-client/src/test/resources/sql/view.q 
b/flink-table/flink-sql-client/src/test/resources/sql/view.q
index 261a43b7bc5..d7939d44539 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/view.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/view.q
@@ -59,9 +59,9 @@ create temporary view if not exists v2 as select * from v1;
 
 # test show create a temporary view
 show create view v1;
-+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-|                                                                              
                                                                                
                     result |
-+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
++--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+|                                                                              
                                                                                
                                                                                
                                       result |
++--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
 | CREATE TEMPORARY VIEW `default_catalog`.`default_database`.`v1` (
   `user`,
   `product`,
@@ -69,18 +69,18 @@ show create view v1;
   `ts`,
   `ptime`
 )
-AS SELECT *
-FROM `default_catalog`.`default_database`.`orders`
+AS SELECT `orders`.`user`, `orders`.`product`, `orders`.`amount`, 
`orders`.`ts`, `orders`.`ptime`
+FROM `default_catalog`.`default_database`.`orders` AS `orders`
  |
-+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
++--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
 1 row in set
 !ok
 
 # test show create a temporary view reference another view
 show create view v2;
-+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-|                                                                              
                                                                                
                 result |
-+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
++----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+|                                                                              
                                                                                
                                                                                
           result |
++----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
 | CREATE TEMPORARY VIEW `default_catalog`.`default_database`.`v2` (
   `user`,
   `product`,
@@ -88,10 +88,10 @@ show create view v2;
   `ts`,
   `ptime`
 )
-AS SELECT *
-FROM `default_catalog`.`default_database`.`v1`
+AS SELECT `v1`.`user`, `v1`.`product`, `v1`.`amount`, `v1`.`ts`, `v1`.`ptime`
+FROM `default_catalog`.`default_database`.`v1` AS `v1`
  |
-+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
++----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
 1 row in set
 !ok
 
@@ -142,9 +142,9 @@ create view permanent_v1 as select * from orders;
 
 # test show create a permanent view
 show create view permanent_v1;
-+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-|                                                                              
                                                                                
                     result |
-+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
++--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+|                                                                              
                                                                                
                                                                                
                                       result |
++--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
 | CREATE VIEW `default_catalog`.`default_database`.`permanent_v1` (
   `user`,
   `product`,
@@ -152,10 +152,10 @@ show create view permanent_v1;
   `ts`,
   `ptime`
 )
-AS SELECT *
-FROM `default_catalog`.`default_database`.`orders`
+AS SELECT `orders`.`user`, `orders`.`product`, `orders`.`amount`, 
`orders`.`ts`, `orders`.`ptime`
+FROM `default_catalog`.`default_database`.`orders` AS `orders`
  |
-+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
++--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
 1 row in set
 !ok
 
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
index 3358fee9cea..2e2499e75ae 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
@@ -1088,7 +1088,7 @@ class MaterializedTableStatementITCase extends 
AbstractMaterializedTableStatemen
                         String.format(
                                 "SELECT `tmp`.`user_id`, `tmp`.`shop_id`, 
`tmp`.`ds`, COUNT(`tmp`.`order_id`) AS `order_cnt`, SUM(`tmp`.`order_amount`) 
AS `order_amount_sum`\n"
                                         + "FROM (SELECT `my_source`.`user_id`, 
`my_source`.`shop_id`, `my_source`.`order_created_at` AS `ds`, 
`my_source`.`order_id`, 1 AS `order_amount`\n"
-                                        + "FROM `%s`.`test_db`.`my_source`) AS 
`tmp`\n"
+                                        + "FROM `%s`.`test_db`.`my_source` AS 
`my_source`) AS `tmp`\n"
                                         + "GROUP BY ROW(`tmp`.`user_id`, 
`tmp`.`shop_id`, `tmp`.`ds`)",
                                 fileSystemCatalogName));
         // the refresh handler in full mode should be the same as the old one
@@ -1228,7 +1228,7 @@ class MaterializedTableStatementITCase extends 
AbstractMaterializedTableStatemen
                         String.format(
                                 "SELECT `tmp`.`user_id`, `tmp`.`shop_id`, 
`tmp`.`ds`, COUNT(`tmp`.`order_id`) AS `order_cnt`, SUM(`tmp`.`order_amount`) 
AS `order_amount_sum`\n"
                                         + "FROM (SELECT `my_source`.`user_id`, 
`my_source`.`shop_id`, `my_source`.`order_created_at` AS `ds`, 
`my_source`.`order_id`, 1 AS `order_amount`\n"
-                                        + "FROM `%s`.`test_db`.`my_source`) AS 
`tmp`\n"
+                                        + "FROM `%s`.`test_db`.`my_source` AS 
`my_source`) AS `tmp`\n"
                                         + "GROUP BY ROW(`tmp`.`user_id`, 
`tmp`.`shop_id`, `tmp`.`ds`)",
                                 fileSystemCatalogName));
 
@@ -1313,11 +1313,11 @@ class MaterializedTableStatementITCase extends 
AbstractMaterializedTableStatemen
         assertThat(newTable.getDefinitionQuery())
                 .isEqualTo(
                         String.format(
-                                "SELECT COALESCE(`tmp`.`user_id`, 0) AS 
`user_id`, `tmp`.`shop_id`, COALESCE(`tmp`.`ds`, '') AS `ds`, 
SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS `pv`\n"
-                                        + "FROM (SELECT 
`datagenSource`.`user_id`, `datagenSource`.`shop_id`, 
`DATE_FORMAT`(`datagenSource`.`order_created_at`, 'yyyy-MM-dd') AS `ds`, 
`datagenSource`.`payment_amount_cents`\n"
-                                        + "FROM 
`%s`.`test_db`.`datagenSource`) AS `tmp`\n"
+                                "SELECT COALESCE(`tmp`.`user_id`, CAST(0 AS 
BIGINT)) AS `user_id`, `tmp`.`shop_id`, COALESCE(`tmp`.`ds`, '') AS `ds`, 
SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS `pv`\n"
+                                        + "FROM (SELECT 
`datagenSource`.`user_id`, `datagenSource`.`shop_id`, 
DATE_FORMAT(`datagenSource`.`order_created_at`, 'yyyy-MM-dd') AS `ds`, 
`datagenSource`.`payment_amount_cents`\n"
+                                        + "FROM `%s`.`%s`.`datagenSource` AS 
`datagenSource`) AS `tmp`\n"
                                         + "GROUP BY ROW(`tmp`.`user_id`, 
`tmp`.`shop_id`, `tmp`.`ds`)",
-                                fileSystemCatalogName));
+                                fileSystemCatalogName, TEST_DEFAULT_DATABASE));
         assertThat(oldTable.getSerializedRefreshHandler())
                 .isNotEqualTo(newTable.getSerializedRefreshHandler());
 
@@ -1413,8 +1413,8 @@ class MaterializedTableStatementITCase extends 
AbstractMaterializedTableStatemen
                 .isEqualTo(
                         String.format(
                                 "SELECT `tmp`.`user_id`, `tmp`.`shop_id`, 
`tmp`.`ds`, SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS 
`pv`\n"
-                                        + "FROM (SELECT 
`datagenSource`.`user_id`, `datagenSource`.`shop_id`, 
`DATE_FORMAT`(`datagenSource`.`order_created_at`, 'yyyy-MM-dd') AS `ds`, 
`datagenSource`.`payment_amount_cents`\n"
-                                        + "FROM 
`%s`.`test_db`.`datagenSource`) AS `tmp`\n"
+                                        + "FROM (SELECT 
`datagenSource`.`user_id`, `datagenSource`.`shop_id`, 
DATE_FORMAT(`datagenSource`.`order_created_at`, 'yyyy-MM-dd') AS `ds`, 
`datagenSource`.`payment_amount_cents`\n"
+                                        + "FROM `%s`.`test_db`.`datagenSource` 
AS `datagenSource`) AS `tmp`\n"
                                         + "GROUP BY ROW(`tmp`.`user_id`, 
`tmp`.`shop_id`, `tmp`.`ds`)",
                                 fileSystemCatalogName));
         assertThat(oldTable.getSerializedRefreshHandler())
diff --git a/flink-table/flink-sql-gateway/src/test/resources/sql/view.q 
b/flink-table/flink-sql-gateway/src/test/resources/sql/view.q
index 3837fc8b64f..b05da00aadc 100644
--- a/flink-table/flink-sql-gateway/src/test/resources/sql/view.q
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/view.q
@@ -91,8 +91,8 @@ CREATE TEMPORARY VIEW 
`default_catalog`.`default_database`.`v1` (
   `ts`,
   `ptime`
 )
-AS SELECT *
-FROM `default_catalog`.`default_database`.`orders`
+AS SELECT `orders`.`user`, `orders`.`product`, `orders`.`amount`, 
`orders`.`ts`, `orders`.`ptime`
+FROM `default_catalog`.`default_database`.`orders` AS `orders`
 !ok
 
 # test show create a temporary view reference another view
@@ -105,8 +105,8 @@ CREATE TEMPORARY VIEW 
`default_catalog`.`default_database`.`v2` (
   `ts`,
   `ptime`
 )
-AS SELECT *
-FROM `default_catalog`.`default_database`.`v1`
+AS SELECT `v1`.`user`, `v1`.`product`, `v1`.`amount`, `v1`.`ts`, `v1`.`ptime`
+FROM `default_catalog`.`default_database`.`v1` AS `v1`
 !ok
 
 show tables;
@@ -178,8 +178,8 @@ CREATE VIEW 
`default_catalog`.`default_database`.`permanent_v1` (
   `ts`,
   `ptime`
 )
-AS SELECT *
-FROM `default_catalog`.`default_database`.`orders`
+AS SELECT `orders`.`user`, `orders`.`product`, `orders`.`amount`, 
`orders`.`ts`, `orders`.`ptime`
+FROM `default_catalog`.`default_database`.`orders` AS `orders`
 !ok
 
 # remove permanent_v1 view
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
index 6e94d897974..624fb7e6d7b 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
@@ -167,16 +167,20 @@ import static org.apache.calcite.util.Util.first;
  * Default implementation of {@link SqlValidator}, the class was copied over 
because of
  * CALCITE-4554.
  *
- * <p>Lines 197 ~ 200, Flink improves error message for functions without 
appropriate arguments in
+ * <p>Lines 202 ~ 205, Flink improves error message for functions without 
appropriate arguments in
  * handleUnresolvedFunction.
  *
- * <p>Lines 2012 ~ 2032, Flink improves error message for functions without 
appropriate arguments in
+ * <p>Lines 1270 ~ 1272, CALCITE-7217, should be removed after upgrading 
Calcite to 1.41.0.
+ *
+ * <p>Lines 2031 ~ 2045, Flink improves error message for functions without 
appropriate arguments in
  * handleUnresolvedFunction at {@link 
SqlValidatorImpl#handleUnresolvedFunction}.
  *
- * <p>Lines 3840 ~ 3844, 6511 ~ 6517 Flink improves Optimize the retrieval of 
sub-operands in
+ * <p>Lines 2571 ~ 2588, CALCITE-7217, should be removed after upgrading 
Calcite to 1.41.0.
+ *
+ * <p>Lines 3895 ~ 3899, 6574 ~ 6580 Flink improves Optimize the retrieval of 
sub-operands in
  * SqlCall when using NamedParameters at {@link SqlValidatorImpl#checkRollUp}.
  *
- * <p>Lines 5246 ~ 5252, FLINK-24352 Add null check for temporal table check 
on SqlSnapshot.
+ * <p>Lines 5315 ~ 5321, FLINK-24352 Add null check for temporal table check 
on SqlSnapshot.
  */
 public class SqlValidatorImpl implements SqlValidatorWithHints {
     // ~ Static fields/initializers 
---------------------------------------------
@@ -1263,6 +1267,9 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
                 }
             // fall through
             case TABLE_REF:
+            // ----- FLINK MODIFICATION BEGIN -----
+            case LATERAL:
+            // ----- FLINK MODIFICATION END -----
             case SNAPSHOT:
             case OVER:
             case COLLECTION_TABLE:
@@ -2561,7 +2568,9 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
                 return newNode;
 
             case LATERAL:
-                return registerFrom(
+                // ----- FLINK MODIFICATION BEGIN -----
+                SqlBasicCall sbc = (SqlBasicCall) node;
+                registerFrom(
                         parentScope,
                         usingScope,
                         register,
@@ -2571,6 +2580,12 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
                         extendList,
                         forceNullable,
                         true);
+                // Put the usingScope which is a JoinScope,
+                // in order to make visible the left items
+                // of the JOIN tree.
+                scopes.put(node, usingScope);
+                return sbc;
+            // ----- FLINK MODIFICATION END -----
 
             case COLLECTION_TABLE:
                 call = (SqlCall) node;
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
index ad521c88789..b226cc57014 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
@@ -245,18 +245,19 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * <p>FLINK modifications are at lines
  *
  * <ol>
- *   <li>Added in FLINK-29081, FLINK-28682, FLINK-33395: Lines 685 ~ 702
- *   <li>Added in Flink-24024: Lines 1452 ~ 1458
- *   <li>Added in Flink-24024: Lines 1472 ~ 1511
- *   <li>Added in Flink-37269: Lines 2249 ~ 2271
- *   <li>Added in FLINK-28682: Lines 2382 ~ 2399
- *   <li>Added in FLINK-28682: Lines 2436 ~ 2464
- *   <li>Added in FLINK-32474: Lines 2521 ~ 2523
- *   <li>Added in FLINK-32474: Lines 2527 ~ 2529
- *   <li>Added in FLINK-32474: Lines 2545 ~ 2547
- *   <li>Added in FLINK-32474: Lines 2960 ~ 2972
- *   <li>Added in FLINK-32474: Lines 3073 ~ 3107
- *   <li>Added in FLINK-34312: Lines 5937 ~ 5948
+ *   <li>Added in FLINK-29081, FLINK-28682, FLINK-33395: Lines 686 ~ 703
+ *   <li>Added in Flink-24024: Lines 1453 ~ 1459
+ *   <li>Added in Flink-24024: Lines 1473 ~ 1512
+ *   <li>Added in Flink-37269: Lines 2250 ~ 2272
+ *   <li>Added in FLINK-28682: Lines 2383 ~ 2400
+ *   <li>Added in FLINK-28682: Lines 2437 ~ 2465
+ *   <li>Added in FLINK-32474: Lines 2522 ~ 2524
+ *   <li>Added in FLINK-32474: Lines 2528 ~ 2530
+ *   <li>Added in FLINK-32474: Lines 2546 ~ 2548
+ *   <li>Added in CALCITE-7217: Lines 2587 ~ 2595, should be dropped with 
upgrade to Calcite 1.41.0
+ *   <li>Added in FLINK-32474: Lines 2970 ~ 2982
+ *   <li>Added in FLINK-32474: Lines 3083 ~ 3117
+ *   <li>Added in FLINK-34312: Lines 5947 ~ 5958
  * </ol>
  *
  * <p>In official extension point (i.e. {@link 
#convertExtendedExpression(SqlNode, Blackboard)}):
@@ -2583,6 +2584,15 @@ public class SqlToRelConverter {
                 convertCollectionTable(bb, call2);
                 return;
 
+            // ----- FLINK MODIFICATION BEGIN -----
+            case LATERAL:
+                call = (SqlCall) from;
+                // Extract and analyze lateral part of join call.
+                assert call.getOperandList().size() == 1;
+                final SqlCall callLateral = call.operand(0);
+                convertFrom(bb, callLateral, fieldNames);
+                return;
+            // ----- FLINK MODIFICATION END -----
             default:
                 throw new AssertionError("not a join operator " + from);
         }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableAsQueryConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableAsQueryConverter.java
index a46c46248b9..7c145b5dcf2 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableAsQueryConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableAsQueryConverter.java
@@ -52,14 +52,10 @@ public class SqlAlterMaterializedTableAsQueryConverter
                 
context.toQuotedSqlString(sqlAlterMaterializedTableAsQuery.getAsQuery());
         SqlNode validatedQuery =
                 
context.getSqlValidator().validate(sqlAlterMaterializedTableAsQuery.getAsQuery());
-        // The LATERAL operator was eliminated during sql validation, thus the 
unparsed SQL
-        // does not contain LATERAL which is problematic,
-        // the issue was resolved in CALCITE-4077
-        // (always treat the table function as implicitly LATERAL).
-        String definitionQuery = context.expandSqlIdentifiers(originalQuery);
+        String definitionQuery = context.toQuotedSqlString(validatedQuery);
         PlannerQueryOperation queryOperation =
                 new PlannerQueryOperation(
-                        context.toRelRoot(validatedQuery).project(), () -> 
originalQuery);
+                        context.toRelRoot(validatedQuery).project(), () -> 
definitionQuery);
 
         ResolvedCatalogMaterializedTable oldTable =
                 getResolvedMaterializedTable(
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
index d836bfdcfdc..46c6dd44fa7 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
@@ -108,19 +108,13 @@ public class SqlCreateMaterializedTableConverter
 
         // get query schema and definition query
         SqlNode selectQuery = sqlCreateMaterializedTable.getAsQuery();
-        String originalQuery = context.toQuotedSqlString(selectQuery);
-        SqlNode validateQuery = 
context.getSqlValidator().validate(selectQuery);
+        SqlNode validatedQuery = 
context.getSqlValidator().validate(selectQuery);
 
-        // The LATERAL operator was eliminated during sql validation, thus the 
unparsed SQL
-        // does not contain LATERAL which is problematic,
-        // the issue was resolved in CALCITE-4077
-        // (always treat the table function as implicitly LATERAL).
-        String definitionQuery = context.expandSqlIdentifiers(originalQuery);
+        String definitionQuery = context.toQuotedSqlString(validatedQuery);
 
         PlannerQueryOperation queryOperation =
                 new PlannerQueryOperation(
-                        context.toRelRoot(validateQuery).project(),
-                        () -> context.toQuotedSqlString(validateQuery));
+                        context.toRelRoot(validatedQuery).project(), () -> 
definitionQuery);
 
         // get schema
         ResolvedSchema resolvedSchema = queryOperation.getResolvedSchema();
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java
index 8d00ca5207e..db9140a084a 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java
@@ -85,11 +85,7 @@ class SqlNodeConvertUtils {
                 context.getSqlValidator().getNamespace(validateQuery);
         validateDuplicatedColumnNames(query, viewFields, validatedNamespace);
 
-        // The LATERAL operator was eliminated during sql validation, thus the 
unparsed SQL
-        // does not contain LATERAL which is problematic,
-        // the issue was resolved in CALCITE-4077
-        // (always treat the table function as implicitly LATERAL).
-        String expandedQuery = context.expandSqlIdentifiers(originalQuery);
+        String expandedQuery = context.toQuotedSqlString(query);
 
         PlannerQueryOperation operation = toQueryOperation(validateQuery, 
context);
         ResolvedSchema schema = operation.getResolvedSchema();
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
index acc651b1e5b..39275541d8c 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
@@ -148,7 +148,9 @@ public class 
SqlMaterializedTableNodeToOperationConverterTest
                         
.logicalRefreshMode(CatalogMaterializedTable.LogicalRefreshMode.FULL)
                         .refreshMode(CatalogMaterializedTable.RefreshMode.FULL)
                         
.refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING)
-                        .definitionQuery("SELECT *\n" + "FROM 
`builtin`.`default`.`t1`")
+                        .definitionQuery(
+                                "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, 
`t1`.`d`\n"
+                                        + "FROM `builtin`.`default`.`t1` AS 
`t1`")
                         .build();
 
         assertThat(materializedTable.getOrigin()).isEqualTo(expected);
@@ -185,7 +187,7 @@ public class 
SqlMaterializedTableNodeToOperationConverterTest
         
assertThat(createOperation.getCatalogMaterializedTable().getDefinitionQuery())
                 .isEqualTo(
                         "SELECT `t1`.`a`, `T`.`f1`, `T`.`f2`\n"
-                                + "FROM `builtin`.`default`.`t1`,\n"
+                                + "FROM `builtin`.`default`.`t1` AS `t1`,\n"
                                 + "LATERAL 
TABLE(`builtin`.`default`.`myFunc`(`b`)) AS `T` (`f1`, `f2`)");
     }
 
@@ -470,11 +472,11 @@ public class 
SqlMaterializedTableNodeToOperationConverterTest
                                         Column.physical("f", 
DataTypes.VARCHAR(Integer.MAX_VALUE))),
                                 TableChange.modifyDefinitionQuery(
                                         "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, 
`t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n"
-                                                + "FROM 
`builtin`.`default`.`t3`")));
+                                                + "FROM 
`builtin`.`default`.`t3` AS `t3`")));
         assertThat(operation.asSummaryString())
                 .isEqualTo(
                         "ALTER MATERIALIZED TABLE builtin.default.base_mtbl AS 
SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS 
STRING) AS `f`\n"
-                                + "FROM `builtin`.`default`.`t3`");
+                                + "FROM `builtin`.`default`.`t3` AS `t3`");
 
         // new table only difference schema & definition query with old table.
         CatalogMaterializedTable oldTable =
@@ -525,7 +527,7 @@ public class 
SqlMaterializedTableNodeToOperationConverterTest
                                 TableChange.add(Column.physical("a0", 
DataTypes.INT())),
                                 TableChange.modifyDefinitionQuery(
                                         "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, 
`t3`.`d`, `t3`.`c` AS `a`\n"
-                                                + "FROM 
`builtin`.`default`.`t3`")));
+                                                + "FROM 
`builtin`.`default`.`t3` AS `t3`")));
     }
 
     @Test
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogViewITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogViewITCase.scala
index f9dcc8330fc..061d2053833 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogViewITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogViewITCase.scala
@@ -355,7 +355,7 @@ class CatalogViewITCase(isStreamingMode: Boolean) extends 
TableITCaseBase {
            |  `c`
            |)
            |AS SELECT `T1`.`a`, `T1`.`b`, `T1`.`c`
-           |FROM `default_catalog`.`default_database`.`T1`
+           |FROM `default_catalog`.`default_database`.`T1` AS `T1`
            |""".stripMargin
       )
     )
@@ -375,7 +375,7 @@ class CatalogViewITCase(isStreamingMode: Boolean) extends 
TableITCaseBase {
            |  `f`
            |)
            |AS SELECT `T1`.`a`, `T1`.`b`, `T1`.`c`
-           |FROM `default_catalog`.`default_database`.`T1`
+           |FROM `default_catalog`.`default_database`.`T1` AS `T1`
            |""".stripMargin
       )
     )
@@ -402,7 +402,7 @@ class CatalogViewITCase(isStreamingMode: Boolean) extends 
TableITCaseBase {
              |  `c`
              |)
              |AS SELECT `T1`.`a`, `T1`.`b`, `T1`.`c`
-             |FROM `default_catalog`.`default_database`.`T1`
+             |FROM `default_catalog`.`default_database`.`T1` AS `T1`
              |""".stripMargin
         )
       )
@@ -422,7 +422,7 @@ class CatalogViewITCase(isStreamingMode: Boolean) extends 
TableITCaseBase {
            |  `z`
            |)
            |AS SELECT `T1`.`a`, `T1`.`b`, `T1`.`c`
-           |FROM `default_catalog`.`default_database`.`T1`
+           |FROM `default_catalog`.`default_database`.`T1` AS `T1`
            |""".stripMargin
       )
     )
@@ -449,8 +449,8 @@ class CatalogViewITCase(isStreamingMode: Boolean) extends 
TableITCaseBase {
            |  `max_value`
            |)
            |AS SELECT MAX(`t1`.`a`) AS `max_value`
-           |FROM `default_catalog`.`default_database`.`t1`
-           |LEFT JOIN `default_catalog`.`default_database`.`t2` ON `t1`.`c` = 
`t2`.`c`
+           |FROM `default_catalog`.`default_database`.`t1` AS `t1`
+           |LEFT JOIN `default_catalog`.`default_database`.`t2` AS `t2` ON 
`t1`.`c` = `t2`.`c`
            |""".stripMargin
       )
     )
@@ -484,8 +484,8 @@ class CatalogViewITCase(isStreamingMode: Boolean) extends 
TableITCaseBase {
            |  `b2`
            |)
            |AS SELECT `udfEqualsOne`() AS `a`, `t1`.`a` AS `a1`, `t2`.`b` AS 
`b2`
-           |FROM `default_catalog`.`default_database`.`t1`
-           |CROSS JOIN `default_catalog`.`default_database`.`t2`
+           |FROM `default_catalog`.`default_database`.`t1` AS `t1`
+           |CROSS JOIN `default_catalog`.`default_database`.`t2` AS `t2`
            |""".stripMargin
       )
     )
@@ -514,8 +514,8 @@ class CatalogViewITCase(isStreamingMode: Boolean) extends 
TableITCaseBase {
            |  `b2`
            |)
            |AS SELECT `t1`.`a` AS `a1`, `t2`.`b` AS `b2`
-           |FROM `default_catalog`.`default_database`.`t1`
-           |INNER JOIN `default_catalog`.`default_database`.`t2` ON `t1`.`c` = 
`t2`.`c`
+           |FROM `default_catalog`.`default_database`.`t1` AS `t1`
+           |INNER JOIN `default_catalog`.`default_database`.`t2` AS `t2` ON 
`t1`.`c` = `t2`.`c`
            |""".stripMargin
       )
     )
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.scala
index 6a496f5bffd..9371333e3ac 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.scala
@@ -150,7 +150,7 @@ class ViewsExpandingTest(tableTestUtil: TableTestBase => 
TableTestUtil) extends
       .getTable(objectID.toObjectPath)
     assertThat(
       view.asInstanceOf[CatalogView].getExpandedQuery
-    ).isEqualTo("SELECT `CONCAT`('a', 'bc', 'def')")
+    ).isEqualTo("SELECT CONCAT('a', 'bc', 'def')")
   }
 
   @TestTemplate
@@ -172,7 +172,7 @@ class ViewsExpandingTest(tableTestUtil: TableTestBase => 
TableTestUtil) extends
       .getTable(objectID.toObjectPath)
     assertThat(
       view.asInstanceOf[CatalogView].getExpandedQuery
-    ).isEqualTo("SELECT `default_catalog`.`default_database`.`func`(1, 2, 
'abc')")
+    ).isEqualTo("SELECT `default_catalog`.`default_database`.`func`(1, CAST(2 
AS BIGINT), 'abc')")
   }
 
   @TestTemplate
@@ -201,12 +201,12 @@ class ViewsExpandingTest(tableTestUtil: TableTestBase => 
TableTestUtil) extends
     assertThat(
       view.asInstanceOf[CatalogView].getExpandedQuery
     ).isEqualTo(
-      "SELECT *\n"
+      "SELECT `EXPR$0`.`f0`, `EXPR$0`.`rowNum`\n"
         + "FROM (SELECT `source`.`f0`, "
         + "ROW_NUMBER() "
         + "OVER (PARTITION BY `source`.`f0` ORDER BY `source`.`f0` DESC) AS 
`rowNum`\n"
-        + "FROM `default_catalog`.`default_database`.`source`)\n"
-        + "WHERE `rowNum` = 1")
+        + "FROM `default_catalog`.`default_database`.`source` AS `source`) AS 
`EXPR$0`\n"
+        + "WHERE `EXPR$0`.`rowNum` = 1")
   }
 
   private def createSqlView(originTable: String): CatalogView = {


Reply via email to