This is an automated email from the ASF dual-hosted git repository.

jchan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2747a5814bc [FLINK-24939][table] Support `SHOW CREATE CATALOG` syntax
2747a5814bc is described below

commit 2747a5814bcc5cd45f15c023beba9b0644fe1ead
Author: Yubin Li <[email protected]>
AuthorDate: Sun Apr 7 10:46:19 2024 +0800

    [FLINK-24939][table] Support `SHOW CREATE CATALOG` syntax
    
    This closes #24555
---
 docs/content.zh/docs/dev/table/sql/show.md         | 89 ++++++++++++++++++++++
 docs/content/docs/dev/table/sql/show.md            | 89 ++++++++++++++++++++++
 .../src/test/resources/sql/catalog_database.q      | 21 +++++
 .../src/test/resources/sql/catalog_database.q      | 22 ++++++
 .../src/main/codegen/data/Parser.tdd               |  1 +
 .../src/main/codegen/includes/parserImpls.ftl      | 12 ++-
 .../flink/sql/parser/dql/SqlShowCreateCatalog.java | 65 ++++++++++++++++
 .../flink/sql/parser/FlinkSqlParserImplTest.java   |  5 ++
 .../flink/table/api/internal/ShowCreateUtil.java   | 27 +++++--
 .../apache/flink/table/catalog/CatalogManager.java |  7 +-
 .../operations/ShowCreateCatalogOperation.java     | 64 ++++++++++++++++
 .../operations/converters/SqlNodeConverters.java   |  1 +
 .../converters/SqlShowCreateCatalogConverter.java  | 33 ++++++++
 .../operations/SqlOtherOperationConverterTest.java |  8 ++
 14 files changed, 434 insertions(+), 10 deletions(-)

diff --git a/docs/content.zh/docs/dev/table/sql/show.md 
b/docs/content.zh/docs/dev/table/sql/show.md
index fc6cb8799ea..2c24cbb6759 100644
--- a/docs/content.zh/docs/dev/table/sql/show.md
+++ b/docs/content.zh/docs/dev/table/sql/show.md
@@ -34,6 +34,7 @@ SHOW CREATE 语句用于打印给定对象的创建 DDL 语句。当前的 SHOW
 目前 Flink SQL 支持下列 SHOW 语句:
 - SHOW CATALOGS
 - SHOW CURRENT CATALOG
+- SHOW CREATE CATALOG
 - SHOW DATABASES
 - SHOW CURRENT DATABASE
 - SHOW TABLES
@@ -102,6 +103,22 @@ tEnv.executeSql("SHOW CURRENT CATALOG").print();
 // |      default_catalog |
 // +----------------------+
 
+// create a catalog
+tEnv.executeSql("CREATE CATALOG cat2 WITH (...)");
+
+// show create catalog
+tEnv.executeSql("SHOW CREATE CATALOG cat2").print();
+// 
+---------------------------------------------------------------------------------------------+
+// |                                                                           
           result |
+// 
+---------------------------------------------------------------------------------------------+
+// | CREATE CATALOG `cat2` WITH (
+//   'default-database' = 'db',
+//   'type' = 'generic_in_memory'
+// )
+// |
+// 
+---------------------------------------------------------------------------------------------+
+// 1 row in set
+
 // show databases
 tEnv.executeSql("SHOW DATABASES").print();
 // +------------------+
@@ -214,6 +231,22 @@ tEnv.executeSql("SHOW CATALOGS").print()
 // | default_catalog |
 // +-----------------+
 
+// create a catalog
+tEnv.executeSql("CREATE CATALOG cat2 WITH (...)")
+
+// show create catalog
+tEnv.executeSql("SHOW CREATE CATALOG cat2").print()
+// 
+---------------------------------------------------------------------------------------------+
+// |                                                                           
           result |
+// 
+---------------------------------------------------------------------------------------------+
+// | CREATE CATALOG `cat2` WITH (
+//   'default-database' = 'db',
+//   'type' = 'generic_in_memory'
+// )
+// |
+// 
+---------------------------------------------------------------------------------------------+
+// 1 row in set
+
 // show databases
 tEnv.executeSql("SHOW DATABASES").print()
 // +------------------+
@@ -316,6 +349,22 @@ table_env.execute_sql("SHOW CATALOGS").print()
 # | default_catalog |
 # +-----------------+
 
+# create a catalog
+table_env.execute_sql("CREATE CATALOG cat2 WITH (...)")
+
+# show create catalog
+table_env.execute_sql("SHOW CREATE CATALOG cat2").print()
+# 
+---------------------------------------------------------------------------------------------+
+# |                                                                            
          result |
+# 
+---------------------------------------------------------------------------------------------+
+# | CREATE CATALOG `cat2` WITH (
+#   'default-database' = 'db',
+#   'type' = 'generic_in_memory'
+# )
+#  |
+# 
+---------------------------------------------------------------------------------------------+
+# 1 row in set
+
 # show databases
 table_env.execute_sql("SHOW DATABASES").print()
 # +------------------+
@@ -411,6 +460,14 @@ table_env.execute_sql("SHOW FULL MODULES").print()
 Flink SQL> SHOW CATALOGS;
 default_catalog
 
+Flink SQL> CREATE CATALOG cat2 WITH (...);
+[INFO] Execute statement succeeded.
+ 
+Flink SQL> SHOW CREATE CATALOG cat2;
+CREATE CATALOG `cat2` WITH (
+  ...
+)
+
 Flink SQL> SHOW DATABASES;
 default_database
 
@@ -504,6 +561,38 @@ SHOW CURRENT CATALOG
 
 显示当前正在使用的 catalog。
 
+## SHOW CREATE CATALOG
+
+```sql
+SHOW CREATE CATALOG catalog_name
+```
+
+展示一个现有 catalog 的创建语句。
+
+该语句的输出内容包括 catalog 的名称和相关属性,使您可以直观地了解相应 catalog 的元数据。
+
+假设 `cat2` 是按如下方式创建的:
+```sql
+create catalog cat2 WITH (
+    'type'='generic_in_memory',
+    'default-database'='db'
+);
+```
+展示 catalog 创建语句。
+```sql
+show create catalog cat2;
++---------------------------------------------------------------------------------------------+
+|                                                                              
        result |
++---------------------------------------------------------------------------------------------+
+| CREATE CATALOG `cat2` WITH (
+  'default-database' = 'db',
+  'type' = 'generic_in_memory'
+)
+ |
++---------------------------------------------------------------------------------------------+
+1 row in set
+```
+
 ## SHOW DATABASES
 
 ```sql
diff --git a/docs/content/docs/dev/table/sql/show.md 
b/docs/content/docs/dev/table/sql/show.md
index 73ec91b5ac6..e0b3a9d7fb8 100644
--- a/docs/content/docs/dev/table/sql/show.md
+++ b/docs/content/docs/dev/table/sql/show.md
@@ -33,6 +33,7 @@ SHOW CREATE statements are used to print a DDL statement with 
which a given obje
 Flink SQL supports the following SHOW statements for now:
 - SHOW CATALOGS
 - SHOW CURRENT CATALOG
+- SHOW CREATE CATALOG
 - SHOW DATABASES
 - SHOW CURRENT DATABASE
 - SHOW TABLES
@@ -102,6 +103,22 @@ tEnv.executeSql("SHOW CURRENT CATALOG").print();
 // |      default_catalog |
 // +----------------------+
 
+// create a catalog
+tEnv.executeSql("CREATE CATALOG cat2 WITH (...)");
+
+// show create catalog
+tEnv.executeSql("SHOW CREATE CATALOG cat2").print();
+// 
+---------------------------------------------------------------------------------------------+
+// |                                                                           
           result |
+// 
+---------------------------------------------------------------------------------------------+
+// | CREATE CATALOG `cat2` WITH (
+//   'default-database' = 'db',
+//   'type' = 'generic_in_memory'
+// )
+// |
+// 
+---------------------------------------------------------------------------------------------+
+// 1 row in set
+
 // show databases
 tEnv.executeSql("SHOW DATABASES").print();
 // +------------------+
@@ -214,6 +231,22 @@ tEnv.executeSql("SHOW CATALOGS").print()
 // | default_catalog |
 // +-----------------+
 
+// create a catalog
+tEnv.executeSql("CREATE CATALOG cat2 WITH (...)")
+
+// show create catalog
+tEnv.executeSql("SHOW CREATE CATALOG cat2").print()
+// 
+---------------------------------------------------------------------------------------------+
+// |                                                                           
           result |
+// 
+---------------------------------------------------------------------------------------------+
+// | CREATE CATALOG `cat2` WITH (
+//   'default-database' = 'db',
+//   'type' = 'generic_in_memory'
+// )
+// |
+// 
+---------------------------------------------------------------------------------------------+
+// 1 row in set
+
 // show databases
 tEnv.executeSql("SHOW DATABASES").print()
 // +------------------+
@@ -316,6 +349,22 @@ table_env.execute_sql("SHOW CATALOGS").print()
 # | default_catalog |
 # +-----------------+
 
+# create a catalog
+table_env.execute_sql("CREATE CATALOG cat2 WITH (...)")
+
+# show create catalog
+table_env.execute_sql("SHOW CREATE CATALOG cat2").print()
+# 
+---------------------------------------------------------------------------------------------+
+# |                                                                            
          result |
+# 
+---------------------------------------------------------------------------------------------+
+# | CREATE CATALOG `cat2` WITH (
+#   'default-database' = 'db',
+#   'type' = 'generic_in_memory'
+# )
+#  |
+# 
+---------------------------------------------------------------------------------------------+
+# 1 row in set
+
 # show databases
 table_env.execute_sql("SHOW DATABASES").print()
 # +------------------+
@@ -411,6 +460,14 @@ table_env.execute_sql("SHOW FULL MODULES").print()
 Flink SQL> SHOW CATALOGS;
 default_catalog
 
+Flink SQL> CREATE CATALOG cat2 WITH (...);
+[INFO] Execute statement succeeded.
+ 
+Flink SQL> SHOW CREATE CATALOG cat2;
+CREATE CATALOG `cat2` WITH (
+  ...
+)
+
 Flink SQL> SHOW DATABASES;
 default_database
 
@@ -504,6 +561,38 @@ SHOW CURRENT CATALOG
 
 Show current catalog.
 
+## SHOW CREATE CATALOG
+
+```sql
+SHOW CREATE CATALOG catalog_name
+```
+
+Show creation statement for an existing catalog.
+
+The output includes the catalog's name and relevant properties, which allows 
you to gain an intuitive understanding of the underlying catalog's metadata.
+
+Assumes that the catalog `cat2` is created as follows:
+```sql
+create catalog cat2 WITH (
+    'type'='generic_in_memory',
+    'default-database'='db'
+);
+```
+Shows the creation statement.
+```sql
+show create catalog cat2;
++---------------------------------------------------------------------------------------------+
+|                                                                              
        result |
++---------------------------------------------------------------------------------------------+
+| CREATE CATALOG `cat2` WITH (
+  'default-database' = 'db',
+  'type' = 'generic_in_memory'
+)
+ |
++---------------------------------------------------------------------------------------------+
+1 row in set
+```
+
 ## SHOW DATABASES
 
 ```sql
diff --git 
a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q 
b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q
index 11958807220..009712fd2b3 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q
@@ -686,3 +686,24 @@ show tables from db1 like 'p_r%';
 +------------+
 1 row in set
 !ok
+
+# ==========================================================================
+# test catalog
+# ==========================================================================
+
+create catalog cat2 WITH ('type'='generic_in_memory', 'default-database'='db');
+[INFO] Execute statement succeeded.
+!info
+
+show create catalog cat2;
++---------------------------------------------------------------------------------------------+
+|                                                                              
        result |
++---------------------------------------------------------------------------------------------+
+| CREATE CATALOG `cat2` WITH (
+  'default-database' = 'db',
+  'type' = 'generic_in_memory'
+)
+ |
++---------------------------------------------------------------------------------------------+
+1 row in set
+!ok
diff --git 
a/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q 
b/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q
index 2b9f0b46aa8..2d94bd32ac5 100644
--- a/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q
@@ -816,3 +816,25 @@ show tables from db1 like 'p_r%';
 +------------+
 1 row in set
 !ok
+
+# ==========================================================================
+# test catalog
+# ==========================================================================
+
+create catalog cat2 WITH ('type'='generic_in_memory', 'default-database'='db');
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+show create catalog cat2;
+!output
+CREATE CATALOG `cat2` WITH (
+  'default-database' = 'db',
+  'type' = 'generic_in_memory'
+)
+!ok
diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd 
b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 8c39f41b463..d88a9c0f454 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -111,6 +111,7 @@
     "org.apache.flink.sql.parser.dql.SqlShowCreate"
     "org.apache.flink.sql.parser.dql.SqlShowCreateTable"
     "org.apache.flink.sql.parser.dql.SqlShowCreateView"
+    "org.apache.flink.sql.parser.dql.SqlShowCreateCatalog"
     "org.apache.flink.sql.parser.dql.SqlShowViews"
     "org.apache.flink.sql.parser.dql.SqlRichDescribeTable"
     "org.apache.flink.sql.parser.dql.SqlUnloadModule"
diff --git 
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl 
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 58d1a2c6c39..9ac769e18a2 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -583,7 +583,8 @@ SqlShowViews SqlShowViews() :
 }
 
 /**
-* SHOW TABLES FROM [catalog.] database sql call.
+* Parses a show tables statement.
+* SHOW TABLES [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE 
pattern ];
 */
 SqlShowTables SqlShowTables() :
 {
@@ -653,7 +654,7 @@ SqlShowColumns SqlShowColumns() :
 }
 
 /**
-* Parse a "Show Create Table" query and "Show Create View" query commands.
+* Parse a "Show Create Table" query and "Show Create View" and "Show Create 
Catalog" query commands.
 */
 SqlShowCreate SqlShowCreate() :
 {
@@ -676,6 +677,13 @@ SqlShowCreate SqlShowCreate() :
         {
             return new SqlShowCreateView(pos, sqlIdentifier);
         }
+    |
+        <CATALOG>
+        { pos = getPos(); }
+        sqlIdentifier = SimpleIdentifier()
+        {
+            return new SqlShowCreateCatalog(pos, sqlIdentifier);
+        }
     )
 }
 
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateCatalog.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateCatalog.java
new file mode 100644
index 00000000000..93d85cca39d
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateCatalog.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.dql;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import java.util.Collections;
+import java.util.List;
+
+/** SHOW CREATE CATALOG sql call. */
+public class SqlShowCreateCatalog extends SqlShowCreate {
+
+    public static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("SHOW CREATE CATALOG", SqlKind.OTHER_DDL);
+
+    public SqlShowCreateCatalog(SqlParserPos pos, SqlIdentifier catalogName) {
+        super(pos, catalogName);
+    }
+
+    public SqlIdentifier getCatalogName() {
+        return sqlIdentifier;
+    }
+
+    @Override
+    public SqlOperator getOperator() {
+        return OPERATOR;
+    }
+
+    @Override
+    public List<SqlNode> getOperandList() {
+        return Collections.singletonList(sqlIdentifier);
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        writer.keyword("SHOW CREATE CATALOG");
+        sqlIdentifier.unparse(writer, leftPrec, rightPrec);
+    }
+
+    public String catalogName() {
+        return sqlIdentifier.getSimple();
+    }
+}
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index d12b9b0d3fa..48b3b6b2e12 100644
--- 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -127,6 +127,11 @@ class FlinkSqlParserImplTest extends SqlParserTest {
                                 + ")");
     }
 
+    @Test
+    void testShowCreateCatalog() {
+        sql("show create catalog c1").ok("SHOW CREATE CATALOG `C1`");
+    }
+
     @Test
     void testDropCatalog() {
         sql("drop catalog c1").ok("DROP CATALOG `C1`");
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
index 2a9eb90bbe7..c11aca26046 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.api.internal;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDescriptor;
 import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -33,10 +34,13 @@ import org.apache.flink.table.utils.EncodingUtils;
 
 import org.apache.commons.lang3.StringUtils;
 
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.table.utils.EncodingUtils.escapeIdentifier;
+
 /** SHOW CREATE statement Util. */
 @Internal
 public class ShowCreateUtil {
@@ -72,7 +76,7 @@ public class ShowCreateUtil {
                                 sb.append("PARTITIONED BY (")
                                         .append(partitionedInfoFormatted)
                                         .append(")\n"));
-        extractFormattedOptions(table, printIndent)
+        extractFormattedOptions(table.getOptions(), printIndent)
                 .ifPresent(v -> sb.append("WITH 
(\n").append(v).append("\n)\n"));
         return sb.toString();
     }
@@ -111,6 +115,18 @@ public class ShowCreateUtil {
         return stringBuilder.toString();
     }
 
+    public static String buildShowCreateCatalogRow(CatalogDescriptor 
catalogDescriptor) {
+        final String printIndent = "  ";
+        return String.format(
+                "CREATE CATALOG %s WITH (%s%s%s)%s",
+                escapeIdentifier(catalogDescriptor.getCatalogName()),
+                System.lineSeparator(),
+                
extractFormattedOptions(catalogDescriptor.getConfiguration().toMap(), 
printIndent)
+                        .orElse(""),
+                System.lineSeparator(),
+                System.lineSeparator());
+    }
+
     static String buildCreateFormattedPrefix(
             String tableType, boolean isTemporary, ObjectIdentifier 
identifier) {
         return String.format(
@@ -213,13 +229,12 @@ public class ShowCreateUtil {
                         .collect(Collectors.joining(", ")));
     }
 
-    static Optional<String> extractFormattedOptions(
-            ResolvedCatalogBaseTable<?> table, String printIndent) {
-        if (Objects.isNull(table.getOptions()) || 
table.getOptions().isEmpty()) {
+    static Optional<String> extractFormattedOptions(Map<String, String> conf, 
String printIndent) {
+        if (Objects.isNull(conf) || conf.isEmpty()) {
             return Optional.empty();
         }
         return Optional.of(
-                table.getOptions().entrySet().stream()
+                conf.entrySet().stream()
                         .map(
                                 entry ->
                                         String.format(
@@ -227,7 +242,7 @@ public class ShowCreateUtil {
                                                 printIndent,
                                                 
EncodingUtils.escapeSingleQuotes(entry.getKey()),
                                                 
EncodingUtils.escapeSingleQuotes(entry.getValue())))
-                        .collect(Collectors.joining(",\n")));
+                        .collect(Collectors.joining("," + 
System.lineSeparator())));
     }
 
     static String extractFormattedColumnNames(ResolvedCatalogBaseTable<?> 
baseTable) {
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index de6fdb8e8db..543105d027a 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -138,6 +138,10 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
         return catalogModificationListeners;
     }
 
+    public Optional<CatalogDescriptor> getCatalogDescriptor(String 
catalogName) {
+        return catalogStoreHolder.catalogStore().getCatalog(catalogName);
+    }
+
     public static Builder newBuilder() {
         return new Builder();
     }
@@ -402,8 +406,7 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
         }
 
         // Get catalog from the CatalogStore.
-        Optional<CatalogDescriptor> optionalDescriptor =
-                catalogStoreHolder.catalogStore().getCatalog(catalogName);
+        Optional<CatalogDescriptor> optionalDescriptor = 
getCatalogDescriptor(catalogName);
         return optionalDescriptor.map(
                 descriptor -> {
                     Catalog catalog = initCatalog(catalogName, descriptor);
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateCatalogOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateCatalogOperation.java
new file mode 100644
index 00000000000..60baecb40a8
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateCatalogOperation.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.internal.ShowCreateUtil;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.CatalogDescriptor;
+
+import static 
org.apache.flink.table.api.internal.TableResultUtils.buildStringArrayResult;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Operation to describe a SHOW CREATE CATALOG statement. */
+@Internal
+public class ShowCreateCatalogOperation implements ShowOperation {
+
+    private final String catalogName;
+
+    public ShowCreateCatalogOperation(String catalogName) {
+        this.catalogName = checkNotNull(catalogName, "Catalog name must not be 
null");
+    }
+
+    public String getCatalogName() {
+        return catalogName;
+    }
+
+    @Override
+    public String asSummaryString() {
+        return String.format("SHOW CREATE CATALOG %s", catalogName);
+    }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        CatalogDescriptor catalogDescriptor =
+                ctx.getCatalogManager()
+                        .getCatalogDescriptor(catalogName)
+                        .orElseThrow(
+                                () ->
+                                        new ValidationException(
+                                                String.format(
+                                                        "Cannot obtain 
metadata information from Catalog %s.",
+                                                        catalogName)));
+        String resultRow = 
ShowCreateUtil.buildShowCreateCatalogRow(catalogDescriptor);
+
+        return buildStringArrayResult("result", new String[] {resultRow});
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
index 78356d16c7d..72d71403dc8 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
@@ -53,6 +53,7 @@ public class SqlNodeConverters {
         register(new SqlReplaceTableAsConverter());
         register(new SqlProcedureCallConverter());
         register(new SqlShowDatabasesConverter());
+        register(new SqlShowCreateCatalogConverter());
     }
 
     /**
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlShowCreateCatalogConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlShowCreateCatalogConverter.java
new file mode 100644
index 00000000000..6078dfc02b7
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlShowCreateCatalogConverter.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.dql.SqlShowCreateCatalog;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ShowCreateCatalogOperation;
+
+/** A converter for {@link SqlShowCreateCatalog}. */
+public class SqlShowCreateCatalogConverter implements 
SqlNodeConverter<SqlShowCreateCatalog> {
+
+    @Override
+    public Operation convertSqlNode(
+            SqlShowCreateCatalog sqlShowCreateCatalog, ConvertContext context) 
{
+        return new 
ShowCreateCatalogOperation(sqlShowCreateCatalog.catalogName());
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlOtherOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlOtherOperationConverterTest.java
index 48fc6fe6104..4acae04ad9c 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlOtherOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlOtherOperationConverterTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.table.api.SqlParserException;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.operations.LoadModuleOperation;
 import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ShowCreateCatalogOperation;
 import org.apache.flink.table.operations.ShowDatabasesOperation;
 import org.apache.flink.table.operations.ShowFunctionsOperation;
 import org.apache.flink.table.operations.ShowModulesOperation;
@@ -187,6 +188,13 @@ public class SqlOtherOperationConverterTest extends 
SqlNodeToOperationConversion
         assertThat(showTablesOperation.getPreposition()).isNull();
     }
 
+    @Test
+    void testShowCreateCatalog() {
+        Operation operation = parse("show create catalog cat1");
+        assertThat(operation).isInstanceOf(ShowCreateCatalogOperation.class);
+        assertThat(operation.asSummaryString()).isEqualTo("SHOW CREATE CATALOG 
cat1");
+    }
+
     @Test
     void testShowFullModules() {
         final String sql = "SHOW FULL MODULES";

Reply via email to