This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5e0e5a233eb6f605998d5e64d182203d172f0797
Author: Feng Jin <jinfeng1...@gmail.com>
AuthorDate: Wed Jan 8 09:52:05 2025 +0800

    [FLINK-36994][table] Fix expand sql error when materialized table creation 
with UDTF queries
---
 .../SqlCreateMaterializedTableConverter.java       | 14 +++++---
 ...erializedTableNodeToOperationConverterTest.java | 42 ++++++++++++++++++++--
 2 files changed, 49 insertions(+), 7 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
index de14add9692..df30527cb5c 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
@@ -116,14 +116,20 @@ public class SqlCreateMaterializedTableConverter
         }
 
         // get query schema and definition query
-        SqlNode validateQuery =
-                
context.getSqlValidator().validate(sqlCreateMaterializedTable.getAsQuery());
+        SqlNode selectQuery = sqlCreateMaterializedTable.getAsQuery();
+        String originalQuery = context.toQuotedSqlString(selectQuery);
+        SqlNode validateQuery = 
context.getSqlValidator().validate(selectQuery);
+
+        // The LATERAL operator was eliminated during sql validation, thus the 
unparsed SQL
+        // does not contain LATERAL which is problematic,
+        // the issue was resolved in CALCITE-4077
+        // (always treat the table function as implicitly LATERAL).
+        String definitionQuery = context.expandSqlIdentifiers(originalQuery);
+
         PlannerQueryOperation queryOperation =
                 new PlannerQueryOperation(
                         context.toRelRoot(validateQuery).project(),
                         () -> context.toQuotedSqlString(validateQuery));
-        String definitionQuery =
-                
context.expandSqlIdentifiers(queryOperation.asSerializableString());
 
         // get schema
         ResolvedSchema resolvedSchema = queryOperation.getResolvedSchema();
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
index a3b659f335b..63a91f7e533 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
@@ -25,10 +25,12 @@ import 
org.apache.flink.table.catalog.CatalogMaterializedTable;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.IntervalFreshness;
+import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
@@ -39,6 +41,7 @@ import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTabl
 import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableSuspendOperation;
 import 
org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation;
 import 
org.apache.flink.table.operations.materializedtable.DropMaterializedTableOperation;
+import org.apache.flink.table.planner.utils.TableFunc0;
 
 import org.apache.flink.shaded.guava32.com.google.common.collect.ImmutableMap;
 
@@ -140,15 +143,48 @@ public class 
SqlMaterializedTableNodeToOperationConverterTest
                         
.logicalRefreshMode(CatalogMaterializedTable.LogicalRefreshMode.FULL)
                         .refreshMode(CatalogMaterializedTable.RefreshMode.FULL)
                         
.refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING)
-                        .definitionQuery(
-                                "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, 
`t1`.`d`\n"
-                                        + "FROM `builtin`.`default`.`t1` AS 
`t1`")
+                        .definitionQuery("SELECT *\n" + "FROM 
`builtin`.`default`.`t1`")
                         .build();
 
         assertThat(((ResolvedCatalogMaterializedTable) 
materializedTable).getOrigin())
                 .isEqualTo(expected);
     }
 
+    @Test
+    void testCreateMaterializedTableWithUDTFQuery() {
+        functionCatalog.registerCatalogFunction(
+                UnresolvedIdentifier.of(
+                        ObjectIdentifier.of(
+                                catalogManager.getCurrentCatalog(), "default", 
"myFunc")),
+                TableFunc0.class,
+                true);
+
+        final String sql =
+                "CREATE MATERIALIZED TABLE mtbl1 (\n"
+                        + "   CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED"
+                        + ")\n"
+                        + "COMMENT 'materialized table comment'\n"
+                        + "PARTITIONED BY (a)\n"
+                        + "WITH (\n"
+                        + "  'connector' = 'filesystem', \n"
+                        + "  'format' = 'json'\n"
+                        + ")\n"
+                        + "FRESHNESS = INTERVAL '30' SECOND\n"
+                        + "REFRESH_MODE = FULL\n"
+                        + "AS SELECT a, f1, f2 FROM t1,LATERAL 
TABLE(myFunc(b)) as T(f1, f2)";
+        Operation operation = parse(sql);
+        
assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class);
+
+        CreateMaterializedTableOperation createOperation =
+                (CreateMaterializedTableOperation) operation;
+
+        
assertThat(createOperation.getCatalogMaterializedTable().getDefinitionQuery())
+                .isEqualTo(
+                        "SELECT `t1`.`a`, `T`.`f1`, `T`.`f2`\n"
+                                + "FROM `builtin`.`default`.`t1`,\n"
+                                + "LATERAL 
TABLE(`builtin`.`default`.`myFunc`(`b`)) AS `T` (`f1`, `f2`)");
+    }
+
     @Test
     void testContinuousRefreshMode() {
         // test continuous mode derived by specify freshness automatically

Reply via email to