This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5e0e5a233eb6f605998d5e64d182203d172f0797 Author: Feng Jin <jinfeng1...@gmail.com> AuthorDate: Wed Jan 8 09:52:05 2025 +0800 [FLINK-36994][table] Fix expand sql error when materialized table creation with UDTF queries --- .../SqlCreateMaterializedTableConverter.java | 14 +++++--- ...erializedTableNodeToOperationConverterTest.java | 42 ++++++++++++++++++++-- 2 files changed, 49 insertions(+), 7 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java index de14add9692..df30527cb5c 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java @@ -116,14 +116,20 @@ public class SqlCreateMaterializedTableConverter } // get query schema and definition query - SqlNode validateQuery = - context.getSqlValidator().validate(sqlCreateMaterializedTable.getAsQuery()); + SqlNode selectQuery = sqlCreateMaterializedTable.getAsQuery(); + String originalQuery = context.toQuotedSqlString(selectQuery); + SqlNode validateQuery = context.getSqlValidator().validate(selectQuery); + + // The LATERAL operator was eliminated during sql validation, thus the unparsed SQL + // does not contain LATERAL which is problematic, + // the issue was resolved in CALCITE-4077 + // (always treat the table function as implicitly LATERAL). + String definitionQuery = context.expandSqlIdentifiers(originalQuery); + PlannerQueryOperation queryOperation = new PlannerQueryOperation( context.toRelRoot(validateQuery).project(), () -> context.toQuotedSqlString(validateQuery)); - String definitionQuery = - context.expandSqlIdentifiers(queryOperation.asSerializableString()); // get schema ResolvedSchema resolvedSchema = queryOperation.getResolvedSchema(); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java index a3b659f335b..63a91f7e533 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java @@ -25,10 +25,12 @@ import org.apache.flink.table.catalog.CatalogMaterializedTable; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.IntervalFreshness; +import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.catalog.UnresolvedIdentifier; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; import org.apache.flink.table.catalog.exceptions.TableNotExistException; @@ -39,6 +41,7 @@ import org.apache.flink.table.operations.materializedtable.AlterMaterializedTabl import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableSuspendOperation; import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation; import org.apache.flink.table.operations.materializedtable.DropMaterializedTableOperation; +import org.apache.flink.table.planner.utils.TableFunc0; import org.apache.flink.shaded.guava32.com.google.common.collect.ImmutableMap; @@ -140,15 +143,48 @@ public class SqlMaterializedTableNodeToOperationConverterTest .logicalRefreshMode(CatalogMaterializedTable.LogicalRefreshMode.FULL) .refreshMode(CatalogMaterializedTable.RefreshMode.FULL) .refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING) - .definitionQuery( - "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`\n" - + "FROM `builtin`.`default`.`t1` AS `t1`") + .definitionQuery("SELECT *\n" + "FROM `builtin`.`default`.`t1`") .build(); assertThat(((ResolvedCatalogMaterializedTable) materializedTable).getOrigin()) .isEqualTo(expected); } + @Test + void testCreateMaterializedTableWithUDTFQuery() { + functionCatalog.registerCatalogFunction( + UnresolvedIdentifier.of( + ObjectIdentifier.of( + catalogManager.getCurrentCatalog(), "default", "myFunc")), + TableFunc0.class, + true); + + final String sql = + "CREATE MATERIALIZED TABLE mtbl1 (\n" + + " CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED" + + ")\n" + + "COMMENT 'materialized table comment'\n" + + "PARTITIONED BY (a)\n" + + "WITH (\n" + + " 'connector' = 'filesystem', \n" + + " 'format' = 'json'\n" + + ")\n" + + "FRESHNESS = INTERVAL '30' SECOND\n" + + "REFRESH_MODE = FULL\n" + + "AS SELECT a, f1, f2 FROM t1,LATERAL TABLE(myFunc(b)) as T(f1, f2)"; + Operation operation = parse(sql); + assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class); + + CreateMaterializedTableOperation createOperation = + (CreateMaterializedTableOperation) operation; + + assertThat(createOperation.getCatalogMaterializedTable().getDefinitionQuery()) + .isEqualTo( + "SELECT `t1`.`a`, `T`.`f1`, `T`.`f2`\n" + + "FROM `builtin`.`default`.`t1`,\n" + + "LATERAL TABLE(`builtin`.`default`.`myFunc`(`b`)) AS `T` (`f1`, `f2`)"); + } + @Test void testContinuousRefreshMode() { // test continuous mode derived by specify freshness automatically