This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 37320fd90a4 [FLINK-35019][table]Support to convert show create model
sql to operation (#26528)
37320fd90a4 is described below
commit 37320fd90a46d6a1e4cb3381bbaef7b1e099ef3a
Author: Hao Li <[email protected]>
AuthorDate: Tue May 13 01:29:28 2025 -0700
[FLINK-35019][table]Support to convert show create model sql to operation
(#26528)
---
.../flink/table/api/internal/ShowCreateUtil.java | 54 ++++++-
.../table/operations/ShowCreateModelOperation.java | 59 +++++++
.../operations/converters/SqlNodeConverters.java | 1 +
.../converters/SqlShowCreateModelConverter.java | 52 +++++++
.../flink/table/api/TableEnvironmentTest.scala | 171 +++++++++++++++++++++
5 files changed, 334 insertions(+), 3 deletions(-)
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
index 61df98b3326..40f77169f7f 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
@@ -27,7 +27,9 @@ import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.QueryOperationCatalogView;
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.ResolvedCatalogModel;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.TableDistribution;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.expressions.SqlFactory;
@@ -35,6 +37,7 @@ import org.apache.flink.table.utils.EncodingUtils;
import org.apache.commons.lang3.StringUtils;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@@ -48,6 +51,32 @@ public class ShowCreateUtil {
private ShowCreateUtil() {}
+ public static String buildShowCreateModelRow(
+ ResolvedCatalogModel model, ObjectIdentifier modelIdentifier,
boolean isTemporary) {
+ StringBuilder sb =
+ new StringBuilder()
+ .append(buildCreateFormattedPrefix("MODEL",
isTemporary, modelIdentifier));
+ extractFormattedColumns(model.getResolvedInputSchema())
+ .ifPresent(
+ c -> sb.append(String.format("INPUT (%s)%s", c,
System.lineSeparator())));
+ extractFormattedColumns(model.getResolvedOutputSchema())
+ .ifPresent(
+ c -> sb.append(String.format("OUTPUT (%s)%s", c,
System.lineSeparator())));
+ extractComment(model)
+ .ifPresent(c ->
sb.append(formatComment(c)).append(System.lineSeparator()));
+ extractFormattedOptions(model.getOptions(), PRINT_INDENT)
+ .ifPresent(
+ v ->
+ sb.append(String.format("WITH (%s",
System.lineSeparator()))
+ .append(v)
+ .append(
+ String.format(
+ "%s)%s",
+ System.lineSeparator(),
+
System.lineSeparator())));
+ return sb.toString();
+ }
+
public static String buildShowCreateTableRow(
ResolvedCatalogBaseTable<?> table,
ObjectIdentifier tableIdentifier,
@@ -121,12 +150,14 @@ public class ShowCreateUtil {
}
static String buildCreateFormattedPrefix(
- String tableType, boolean isTemporary, ObjectIdentifier
identifier) {
+ String type, boolean isTemporary, ObjectIdentifier identifier) {
+ String postName = "model".equalsIgnoreCase(type) ? "" : " (";
return String.format(
- "CREATE %s%s %s (%s",
+ "CREATE %s%s %s%s%s",
isTemporary ? "TEMPORARY " : "",
- tableType,
+ type,
identifier.asSerializableString(),
+ postName,
System.lineSeparator());
}
@@ -177,6 +208,17 @@ public class ShowCreateUtil {
.collect(Collectors.joining(",\n"));
}
+ static Optional<String> extractFormattedColumns(ResolvedSchema schema) {
+ List<Column> columns = schema.getColumns();
+ if (columns.isEmpty()) {
+ return Optional.empty();
+ }
+ return Optional.of(
+ columns.stream()
+ .map(ShowCreateUtil::getColumnString)
+ .collect(Collectors.joining(", ")));
+ }
+
static Optional<String> extractFormattedWatermarkSpecs(
ResolvedCatalogBaseTable<?> table, String printIndent, SqlFactory
sqlFactory) {
if (table.getResolvedSchema().getWatermarkSpecs().isEmpty()) {
@@ -207,6 +249,12 @@ public class ShowCreateUtil {
: Optional.of(table.getComment());
}
+ static Optional<String> extractComment(ResolvedCatalogModel model) {
+ return StringUtils.isEmpty(model.getComment())
+ ? Optional.empty()
+ : Optional.of(model.getComment());
+ }
+
static Optional<String>
extractFormattedDistributedInfo(ResolvedCatalogTable catalogTable) {
return catalogTable.getDistribution().map(TableDistribution::toString);
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateModelOperation.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateModelOperation.java
new file mode 100644
index 00000000000..d94471ba0c3
--- /dev/null
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateModelOperation.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.internal.ShowCreateUtil;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogModel;
+
+import static
org.apache.flink.table.api.internal.TableResultUtils.buildStringArrayResult;
+
+/** Operation to describe a SHOW CREATE MODEL statement. */
+@Internal
+public class ShowCreateModelOperation implements ShowOperation {
+
+ private final ObjectIdentifier modelIdentifier;
+ private final ResolvedCatalogModel model;
+ private final boolean isTemporary;
+
+ public ShowCreateModelOperation(
+ ObjectIdentifier sqlIdentifier, ResolvedCatalogModel model,
boolean isTemporary) {
+ this.modelIdentifier = sqlIdentifier;
+ this.model = model;
+ this.isTemporary = isTemporary;
+ }
+
+ public ResolvedCatalogModel getModel() {
+ return model;
+ }
+
+ @Override
+ public String asSummaryString() {
+ return String.format("SHOW CREATE MODEL %s",
modelIdentifier.asSummaryString());
+ }
+
+ @Override
+ public TableResultInternal execute(Context ctx) {
+ String resultRow =
+ ShowCreateUtil.buildShowCreateModelRow(model, modelIdentifier,
isTemporary);
+ return buildStringArrayResult("result", new String[] {resultRow});
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
index 323be2239ae..26fe98680fe 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
@@ -53,6 +53,7 @@ public class SqlNodeConverters {
register(new SqlTruncateTableConverter());
register(new SqlShowFunctionsConverter());
register(new SqlShowModelsConverter());
+ register(new SqlShowCreateModelConverter());
register(new SqlShowProcedureConverter());
register(new SqlReplaceTableAsConverter());
register(new SqlProcedureCallConverter());
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlShowCreateModelConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlShowCreateModelConverter.java
new file mode 100644
index 00000000000..da9f144ca73
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlShowCreateModelConverter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.dql.SqlShowCreateModel;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ContextResolvedModel;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ShowCreateModelOperation;
+
+import java.util.Optional;
+
+/** A converter for {@link
org.apache.flink.sql.parser.dql.SqlShowCreateModel}. */
+public class SqlShowCreateModelConverter implements
SqlNodeConverter<SqlShowCreateModel> {
+
+ @Override
+ public Operation convertSqlNode(SqlShowCreateModel showCreateModel,
ConvertContext context) {
+ UnresolvedIdentifier unresolvedIdentifier =
+ UnresolvedIdentifier.of(showCreateModel.getFullModelName());
+ ObjectIdentifier identifier =
+
context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier);
+ Optional<ContextResolvedModel> model =
context.getCatalogManager().getModel(identifier);
+
+ if (model.isEmpty()) {
+ throw new ValidationException(
+ String.format(
+ "Could not execute SHOW CREATE MODEL. Model with
identifier %s does not exist.",
+ identifier.asSerializableString()));
+ }
+
+ return new ShowCreateModelOperation(
+ identifier, model.get().getResolvedModel(),
model.get().isTemporary());
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index 40feb25327f..c70abbe720f 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -2957,6 +2957,177 @@ class TableEnvironmentTest {
checkData(expectedResult1.iterator(), modelResult2.collect())
}
+ @Test
+ def testShowCreateModel(): Unit = {
+ val sourceDDL =
+ """
+ |CREATE MODEL M1
+ | INPUT(f0 char(10), f1 varchar(10))
+ | OUTPUT(f2 string)
+ |COMMENT 'this is a model'
+ |with (
+ | 'task' = 'clustering',
+ | 'provider' = 'openai',
+ | 'openai.endpoint' = 'some-endpoint'
+ |)
+ """.stripMargin
+
+ tableEnv.executeSql(sourceDDL)
+
+ val expectedDDL =
+ """|CREATE MODEL `default_catalog`.`default_database`.`M1`
+ |INPUT (`f0` CHAR(10), `f1` VARCHAR(10))
+ |OUTPUT (`f2` VARCHAR(2147483647))
+ |COMMENT 'this is a model'
+ |WITH (
+ | 'openai.endpoint' = 'some-endpoint',
+ | 'provider' = 'openai',
+ | 'task' = 'clustering'
+ |)
+ |""".stripMargin
+ val row = tableEnv.executeSql("SHOW CREATE MODEL M1").collect().next()
+ assertEquals(expectedDDL, row.getField(0))
+ }
+
+ @Test
+ def testShowCreateModelComplexTypes(): Unit = {
+ val sourceDDL =
+ """
+ |CREATE MODEL M1
+ | INPUT(
+ | f0 ARRAY<INT>,
+ | f1 MAP<STRING, INT>,
+ | f2 ROW<name STRING, age INT>,
+ | f3 ROW<name STRING, address ROW<street STRING, city STRING>>,
+ | f4 ARRAY<ROW<id INT, details ROW<color STRING, size INT>>>
+ | )
+ | OUTPUT(
+ | f5 ARRAY<MAP<STRING, INT>>,
+ | f6 ARRAY<ARRAY<STRING>>
+ | )
+ |COMMENT 'this is a model'
+ |with (
+ | 'task' = 'clustering',
+ | 'provider' = 'openai',
+ | 'openai.endpoint' = 'some-endpoint'
+ |)
+ """.stripMargin
+
+ tableEnv.executeSql(sourceDDL)
+
+ val expectedDDL =
+ """|CREATE MODEL `default_catalog`.`default_database`.`M1`
+ |INPUT (`f0` ARRAY<INT>, `f1` MAP<VARCHAR(2147483647), INT>, `f2`
ROW<`name` VARCHAR(2147483647), `age` INT>, `f3` ROW<`name`
VARCHAR(2147483647), `address` ROW<`street` VARCHAR(2147483647), `city`
VARCHAR(2147483647)>>, `f4` ARRAY<ROW<`id` INT, `details` ROW<`color`
VARCHAR(2147483647), `size` INT>>>)
+ |OUTPUT (`f5` ARRAY<MAP<VARCHAR(2147483647), INT>>, `f6`
ARRAY<ARRAY<VARCHAR(2147483647)>>)
+ |COMMENT 'this is a model'
+ |WITH (
+ | 'openai.endpoint' = 'some-endpoint',
+ | 'provider' = 'openai',
+ | 'task' = 'clustering'
+ |)
+ |""".stripMargin
+ val row = tableEnv.executeSql("SHOW CREATE MODEL M1").collect().next()
+ assertEquals(expectedDDL, row.getField(0))
+ }
+
+ @Test
+ def testShowCreateTemporaryModel(): Unit = {
+ val sourceDDL =
+ """
+ |CREATE TEMPORARY MODEL M1
+ | INPUT(f0 char(10), f1 varchar(10))
+ | OUTPUT(f2 string)
+ |COMMENT 'this is a model'
+ |with (
+ | 'task' = 'clustering',
+ | 'provider' = 'openai',
+ | 'openai.endpoint' = 'some-endpoint'
+ |)
+ """.stripMargin
+
+ tableEnv.executeSql(sourceDDL)
+
+ val expectedDDL =
+ """|CREATE TEMPORARY MODEL `default_catalog`.`default_database`.`M1`
+ |INPUT (`f0` CHAR(10), `f1` VARCHAR(10))
+ |OUTPUT (`f2` VARCHAR(2147483647))
+ |COMMENT 'this is a model'
+ |WITH (
+ | 'openai.endpoint' = 'some-endpoint',
+ | 'provider' = 'openai',
+ | 'task' = 'clustering'
+ |)
+ |""".stripMargin
+ val row = tableEnv.executeSql("SHOW CREATE MODEL M1").collect().next()
+ assertEquals(expectedDDL, row.getField(0))
+ }
+
+ @Test
+ def testShowCreateNonExistModel(): Unit = {
+ assertThatThrownBy(() => tableEnv.executeSql("SHOW CREATE MODEL M1"))
+ .isInstanceOf(classOf[ValidationException])
+ .hasMessage(
+ "Could not execute SHOW CREATE MODEL. Model with identifier
`default_catalog`.`default_database`.`M1` does not exist.")
+ }
+
+ @Test
+ def testShowCreateModelNoInputOutput(): Unit = {
+ val sourceDDL =
+ """
+ |CREATE MODEL M1
+ | COMMENT 'this is a model'
+ |with (
+ | 'task' = 'clustering',
+ | 'provider' = 'openai',
+ | 'openai.endpoint' = 'some-endpoint'
+ |)
+ """.stripMargin
+
+ tableEnv.executeSql(sourceDDL)
+
+ val expectedDDL =
+ """|CREATE MODEL `default_catalog`.`default_database`.`M1`
+ |COMMENT 'this is a model'
+ |WITH (
+ | 'openai.endpoint' = 'some-endpoint',
+ | 'provider' = 'openai',
+ | 'task' = 'clustering'
+ |)
+ |""".stripMargin
+ val row = tableEnv.executeSql("SHOW CREATE MODEL M1").collect().next()
+ assertEquals(expectedDDL, row.getField(0))
+ }
+
+ @Test
+ def testShowCreateModelNoComment(): Unit = {
+ val sourceDDL =
+ """
+ |CREATE MODEL M1
+ | INPUT(f0 char(10), f1 varchar(10))
+ | OUTPUT(f2 string)
+ |with (
+ | 'task' = 'clustering',
+ | 'provider' = 'openai',
+ | 'openai.endpoint' = 'some-endpoint'
+ |)
+ """.stripMargin
+
+ tableEnv.executeSql(sourceDDL)
+
+ val expectedDDL =
+ """|CREATE MODEL `default_catalog`.`default_database`.`M1`
+ |INPUT (`f0` CHAR(10), `f1` VARCHAR(10))
+ |OUTPUT (`f2` VARCHAR(2147483647))
+ |WITH (
+ | 'openai.endpoint' = 'some-endpoint',
+ | 'provider' = 'openai',
+ | 'task' = 'clustering'
+ |)
+ |""".stripMargin
+ val row = tableEnv.executeSql("SHOW CREATE MODEL M1").collect().next()
+ assertEquals(expectedDDL, row.getField(0))
+ }
+
@Test
def testDropModel(): Unit = {
val sourceDDL =