This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 37320fd90a4 [FLINK-35019][table]Support to convert show create model 
sql to operation (#26528)
37320fd90a4 is described below

commit 37320fd90a46d6a1e4cb3381bbaef7b1e099ef3a
Author: Hao Li <[email protected]>
AuthorDate: Tue May 13 01:29:28 2025 -0700

    [FLINK-35019][table]Support to convert show create model sql to operation 
(#26528)
---
 .../flink/table/api/internal/ShowCreateUtil.java   |  54 ++++++-
 .../table/operations/ShowCreateModelOperation.java |  59 +++++++
 .../operations/converters/SqlNodeConverters.java   |   1 +
 .../converters/SqlShowCreateModelConverter.java    |  52 +++++++
 .../flink/table/api/TableEnvironmentTest.scala     | 171 +++++++++++++++++++++
 5 files changed, 334 insertions(+), 3 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
index 61df98b3326..40f77169f7f 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
@@ -27,7 +27,9 @@ import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.QueryOperationCatalogView;
 import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.ResolvedCatalogModel;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.TableDistribution;
 import org.apache.flink.table.catalog.UniqueConstraint;
 import org.apache.flink.table.expressions.SqlFactory;
@@ -35,6 +37,7 @@ import org.apache.flink.table.utils.EncodingUtils;
 
 import org.apache.commons.lang3.StringUtils;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
@@ -48,6 +51,32 @@ public class ShowCreateUtil {
 
     private ShowCreateUtil() {}
 
+    public static String buildShowCreateModelRow(
+            ResolvedCatalogModel model, ObjectIdentifier modelIdentifier, 
boolean isTemporary) {
+        StringBuilder sb =
+                new StringBuilder()
+                        .append(buildCreateFormattedPrefix("MODEL", 
isTemporary, modelIdentifier));
+        extractFormattedColumns(model.getResolvedInputSchema())
+                .ifPresent(
+                        c -> sb.append(String.format("INPUT (%s)%s", c, 
System.lineSeparator())));
+        extractFormattedColumns(model.getResolvedOutputSchema())
+                .ifPresent(
+                        c -> sb.append(String.format("OUTPUT (%s)%s", c, 
System.lineSeparator())));
+        extractComment(model)
+                .ifPresent(c -> 
sb.append(formatComment(c)).append(System.lineSeparator()));
+        extractFormattedOptions(model.getOptions(), PRINT_INDENT)
+                .ifPresent(
+                        v ->
+                                sb.append(String.format("WITH (%s", 
System.lineSeparator()))
+                                        .append(v)
+                                        .append(
+                                                String.format(
+                                                        "%s)%s",
+                                                        System.lineSeparator(),
+                                                        
System.lineSeparator())));
+        return sb.toString();
+    }
+
     public static String buildShowCreateTableRow(
             ResolvedCatalogBaseTable<?> table,
             ObjectIdentifier tableIdentifier,
@@ -121,12 +150,14 @@ public class ShowCreateUtil {
     }
 
     static String buildCreateFormattedPrefix(
-            String tableType, boolean isTemporary, ObjectIdentifier 
identifier) {
+            String type, boolean isTemporary, ObjectIdentifier identifier) {
+        String postName = "model".equalsIgnoreCase(type) ? "" : " (";
         return String.format(
-                "CREATE %s%s %s (%s",
+                "CREATE %s%s %s%s%s",
                 isTemporary ? "TEMPORARY " : "",
-                tableType,
+                type,
                 identifier.asSerializableString(),
+                postName,
                 System.lineSeparator());
     }
 
@@ -177,6 +208,17 @@ public class ShowCreateUtil {
                 .collect(Collectors.joining(",\n"));
     }
 
+    static Optional<String> extractFormattedColumns(ResolvedSchema schema) {
+        List<Column> columns = schema.getColumns();
+        if (columns.isEmpty()) {
+            return Optional.empty();
+        }
+        return Optional.of(
+                columns.stream()
+                        .map(ShowCreateUtil::getColumnString)
+                        .collect(Collectors.joining(", ")));
+    }
+
     static Optional<String> extractFormattedWatermarkSpecs(
             ResolvedCatalogBaseTable<?> table, String printIndent, SqlFactory 
sqlFactory) {
         if (table.getResolvedSchema().getWatermarkSpecs().isEmpty()) {
@@ -207,6 +249,12 @@ public class ShowCreateUtil {
                 : Optional.of(table.getComment());
     }
 
+    static Optional<String> extractComment(ResolvedCatalogModel model) {
+        return StringUtils.isEmpty(model.getComment())
+                ? Optional.empty()
+                : Optional.of(model.getComment());
+    }
+
     static Optional<String> 
extractFormattedDistributedInfo(ResolvedCatalogTable catalogTable) {
         return catalogTable.getDistribution().map(TableDistribution::toString);
     }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateModelOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateModelOperation.java
new file mode 100644
index 00000000000..d94471ba0c3
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateModelOperation.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.internal.ShowCreateUtil;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogModel;
+
+import static 
org.apache.flink.table.api.internal.TableResultUtils.buildStringArrayResult;
+
+/** Operation to describe a SHOW CREATE MODEL statement. */
+@Internal
+public class ShowCreateModelOperation implements ShowOperation {
+
+    private final ObjectIdentifier modelIdentifier;
+    private final ResolvedCatalogModel model;
+    private final boolean isTemporary;
+
+    public ShowCreateModelOperation(
+            ObjectIdentifier sqlIdentifier, ResolvedCatalogModel model, 
boolean isTemporary) {
+        this.modelIdentifier = sqlIdentifier;
+        this.model = model;
+        this.isTemporary = isTemporary;
+    }
+
+    public ResolvedCatalogModel getModel() {
+        return model;
+    }
+
+    @Override
+    public String asSummaryString() {
+        return String.format("SHOW CREATE MODEL %s", 
modelIdentifier.asSummaryString());
+    }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        String resultRow =
+                ShowCreateUtil.buildShowCreateModelRow(model, modelIdentifier, 
isTemporary);
+        return buildStringArrayResult("result", new String[] {resultRow});
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
index 323be2239ae..26fe98680fe 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
@@ -53,6 +53,7 @@ public class SqlNodeConverters {
         register(new SqlTruncateTableConverter());
         register(new SqlShowFunctionsConverter());
         register(new SqlShowModelsConverter());
+        register(new SqlShowCreateModelConverter());
         register(new SqlShowProcedureConverter());
         register(new SqlReplaceTableAsConverter());
         register(new SqlProcedureCallConverter());
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlShowCreateModelConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlShowCreateModelConverter.java
new file mode 100644
index 00000000000..da9f144ca73
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlShowCreateModelConverter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.dql.SqlShowCreateModel;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ContextResolvedModel;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ShowCreateModelOperation;
+
+import java.util.Optional;
+
+/** A converter for {@link 
org.apache.flink.sql.parser.dql.SqlShowCreateModel}. */
+public class SqlShowCreateModelConverter implements 
SqlNodeConverter<SqlShowCreateModel> {
+
+    @Override
+    public Operation convertSqlNode(SqlShowCreateModel showCreateModel, 
ConvertContext context) {
+        UnresolvedIdentifier unresolvedIdentifier =
+                UnresolvedIdentifier.of(showCreateModel.getFullModelName());
+        ObjectIdentifier identifier =
+                
context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier);
+        Optional<ContextResolvedModel> model = 
context.getCatalogManager().getModel(identifier);
+
+        if (model.isEmpty()) {
+            throw new ValidationException(
+                    String.format(
+                            "Could not execute SHOW CREATE MODEL. Model with 
identifier %s does not exist.",
+                            identifier.asSerializableString()));
+        }
+
+        return new ShowCreateModelOperation(
+                identifier, model.get().getResolvedModel(), 
model.get().isTemporary());
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index 40feb25327f..c70abbe720f 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -2957,6 +2957,177 @@ class TableEnvironmentTest {
     checkData(expectedResult1.iterator(), modelResult2.collect())
   }
 
+  @Test
+  def testShowCreateModel(): Unit = {
+    val sourceDDL =
+      """
+        |CREATE MODEL M1
+        |  INPUT(f0 char(10), f1 varchar(10))
+        |  OUTPUT(f2 string)
+        |COMMENT 'this is a model'
+        |with (
+        |  'task' = 'clustering',
+        |  'provider' = 'openai',
+        |  'openai.endpoint' = 'some-endpoint'
+        |)
+      """.stripMargin
+
+    tableEnv.executeSql(sourceDDL)
+
+    val expectedDDL =
+      """|CREATE MODEL `default_catalog`.`default_database`.`M1`
+         |INPUT (`f0` CHAR(10), `f1` VARCHAR(10))
+         |OUTPUT (`f2` VARCHAR(2147483647))
+         |COMMENT 'this is a model'
+         |WITH (
+         |  'openai.endpoint' = 'some-endpoint',
+         |  'provider' = 'openai',
+         |  'task' = 'clustering'
+         |)
+         |""".stripMargin
+    val row = tableEnv.executeSql("SHOW CREATE MODEL M1").collect().next()
+    assertEquals(expectedDDL, row.getField(0))
+  }
+
+  @Test
+  def testShowCreateModelComplexTypes(): Unit = {
+    val sourceDDL =
+      """
+        |CREATE MODEL M1
+        |  INPUT(
+        |    f0 ARRAY<INT>,
+        |    f1 MAP<STRING, INT>,
+        |    f2 ROW<name STRING, age INT>,
+        |    f3 ROW<name STRING, address ROW<street STRING, city STRING>>,
+        |    f4 ARRAY<ROW<id INT, details ROW<color STRING, size INT>>>
+        |  )
+        |  OUTPUT(
+        |    f5 ARRAY<MAP<STRING, INT>>,
+        |    f6 ARRAY<ARRAY<STRING>>
+        |  )
+        |COMMENT 'this is a model'
+        |with (
+        |  'task' = 'clustering',
+        |  'provider' = 'openai',
+        |  'openai.endpoint' = 'some-endpoint'
+        |)
+      """.stripMargin
+
+    tableEnv.executeSql(sourceDDL)
+
+    val expectedDDL =
+      """|CREATE MODEL `default_catalog`.`default_database`.`M1`
+         |INPUT (`f0` ARRAY<INT>, `f1` MAP<VARCHAR(2147483647), INT>, `f2` 
ROW<`name` VARCHAR(2147483647), `age` INT>, `f3` ROW<`name` 
VARCHAR(2147483647), `address` ROW<`street` VARCHAR(2147483647), `city` 
VARCHAR(2147483647)>>, `f4` ARRAY<ROW<`id` INT, `details` ROW<`color` 
VARCHAR(2147483647), `size` INT>>>)
+         |OUTPUT (`f5` ARRAY<MAP<VARCHAR(2147483647), INT>>, `f6` 
ARRAY<ARRAY<VARCHAR(2147483647)>>)
+         |COMMENT 'this is a model'
+         |WITH (
+         |  'openai.endpoint' = 'some-endpoint',
+         |  'provider' = 'openai',
+         |  'task' = 'clustering'
+         |)
+         |""".stripMargin
+    val row = tableEnv.executeSql("SHOW CREATE MODEL M1").collect().next()
+    assertEquals(expectedDDL, row.getField(0))
+  }
+
+  @Test
+  def testShowCreateTemporaryModel(): Unit = {
+    val sourceDDL =
+      """
+        |CREATE TEMPORARY MODEL M1
+        |  INPUT(f0 char(10), f1 varchar(10))
+        |  OUTPUT(f2 string)
+        |COMMENT 'this is a model'
+        |with (
+        |  'task' = 'clustering',
+        |  'provider' = 'openai',
+        |  'openai.endpoint' = 'some-endpoint'
+        |)
+      """.stripMargin
+
+    tableEnv.executeSql(sourceDDL)
+
+    val expectedDDL =
+      """|CREATE TEMPORARY MODEL `default_catalog`.`default_database`.`M1`
+         |INPUT (`f0` CHAR(10), `f1` VARCHAR(10))
+         |OUTPUT (`f2` VARCHAR(2147483647))
+         |COMMENT 'this is a model'
+         |WITH (
+         |  'openai.endpoint' = 'some-endpoint',
+         |  'provider' = 'openai',
+         |  'task' = 'clustering'
+         |)
+         |""".stripMargin
+    val row = tableEnv.executeSql("SHOW CREATE MODEL M1").collect().next()
+    assertEquals(expectedDDL, row.getField(0))
+  }
+
+  @Test
+  def testShowCreateNonExistModel(): Unit = {
+    assertThatThrownBy(() => tableEnv.executeSql("SHOW CREATE MODEL M1"))
+      .isInstanceOf(classOf[ValidationException])
+      .hasMessage(
+        "Could not execute SHOW CREATE MODEL. Model with identifier 
`default_catalog`.`default_database`.`M1` does not exist.")
+  }
+
+  @Test
+  def testShowCreateModelNoInputOutput(): Unit = {
+    val sourceDDL =
+      """
+        |CREATE MODEL M1
+        |  COMMENT 'this is a model'
+        |with (
+        |  'task' = 'clustering',
+        |  'provider' = 'openai',
+        |  'openai.endpoint' = 'some-endpoint'
+        |)
+      """.stripMargin
+
+    tableEnv.executeSql(sourceDDL)
+
+    val expectedDDL =
+      """|CREATE MODEL `default_catalog`.`default_database`.`M1`
+         |COMMENT 'this is a model'
+         |WITH (
+         |  'openai.endpoint' = 'some-endpoint',
+         |  'provider' = 'openai',
+         |  'task' = 'clustering'
+         |)
+         |""".stripMargin
+    val row = tableEnv.executeSql("SHOW CREATE MODEL M1").collect().next()
+    assertEquals(expectedDDL, row.getField(0))
+  }
+
+  @Test
+  def testShowCreateModelNoComment(): Unit = {
+    val sourceDDL =
+      """
+        |CREATE MODEL M1
+        |  INPUT(f0 char(10), f1 varchar(10))
+        |  OUTPUT(f2 string)
+        |with (
+        |  'task' = 'clustering',
+        |  'provider' = 'openai',
+        |  'openai.endpoint' = 'some-endpoint'
+        |)
+      """.stripMargin
+
+    tableEnv.executeSql(sourceDDL)
+
+    val expectedDDL =
+      """|CREATE MODEL `default_catalog`.`default_database`.`M1`
+         |INPUT (`f0` CHAR(10), `f1` VARCHAR(10))
+         |OUTPUT (`f2` VARCHAR(2147483647))
+         |WITH (
+         |  'openai.endpoint' = 'some-endpoint',
+         |  'provider' = 'openai',
+         |  'task' = 'clustering'
+         |)
+         |""".stripMargin
+    val row = tableEnv.executeSql("SHOW CREATE MODEL M1").collect().next()
+    assertEquals(expectedDDL, row.getField(0))
+  }
+
   @Test
   def testDropModel(): Unit = {
     val sourceDDL =

Reply via email to