This is an automated email from the ASF dual-hosted git repository.
jchan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 2747a5814bc [FLINK-24939][table] Support `SHOW CREATE CATALOG` syntax
2747a5814bc is described below
commit 2747a5814bcc5cd45f15c023beba9b0644fe1ead
Author: Yubin Li <[email protected]>
AuthorDate: Sun Apr 7 10:46:19 2024 +0800
[FLINK-24939][table] Support `SHOW CREATE CATALOG` syntax
This closes #24555
---
docs/content.zh/docs/dev/table/sql/show.md | 89 ++++++++++++++++++++++
docs/content/docs/dev/table/sql/show.md | 89 ++++++++++++++++++++++
.../src/test/resources/sql/catalog_database.q | 21 +++++
.../src/test/resources/sql/catalog_database.q | 22 ++++++
.../src/main/codegen/data/Parser.tdd | 1 +
.../src/main/codegen/includes/parserImpls.ftl | 12 ++-
.../flink/sql/parser/dql/SqlShowCreateCatalog.java | 65 ++++++++++++++++
.../flink/sql/parser/FlinkSqlParserImplTest.java | 5 ++
.../flink/table/api/internal/ShowCreateUtil.java | 27 +++++--
.../apache/flink/table/catalog/CatalogManager.java | 7 +-
.../operations/ShowCreateCatalogOperation.java | 64 ++++++++++++++++
.../operations/converters/SqlNodeConverters.java | 1 +
.../converters/SqlShowCreateCatalogConverter.java | 33 ++++++++
.../operations/SqlOtherOperationConverterTest.java | 8 ++
14 files changed, 434 insertions(+), 10 deletions(-)
diff --git a/docs/content.zh/docs/dev/table/sql/show.md
b/docs/content.zh/docs/dev/table/sql/show.md
index fc6cb8799ea..2c24cbb6759 100644
--- a/docs/content.zh/docs/dev/table/sql/show.md
+++ b/docs/content.zh/docs/dev/table/sql/show.md
@@ -34,6 +34,7 @@ SHOW CREATE 语句用于打印给定对象的创建 DDL 语句。当前的 SHOW
目前 Flink SQL 支持下列 SHOW 语句:
- SHOW CATALOGS
- SHOW CURRENT CATALOG
+- SHOW CREATE CATALOG
- SHOW DATABASES
- SHOW CURRENT DATABASE
- SHOW TABLES
@@ -102,6 +103,22 @@ tEnv.executeSql("SHOW CURRENT CATALOG").print();
// | default_catalog |
// +----------------------+
+// create a catalog
+tEnv.executeSql("CREATE CATALOG cat2 WITH (...)");
+
+// show create catalog
+tEnv.executeSql("SHOW CREATE CATALOG cat2").print();
+//
+---------------------------------------------------------------------------------------------+
+// |
result |
+//
+---------------------------------------------------------------------------------------------+
+// | CREATE CATALOG `cat2` WITH (
+// 'default-database' = 'db',
+// 'type' = 'generic_in_memory'
+// )
+// |
+//
+---------------------------------------------------------------------------------------------+
+// 1 row in set
+
// show databases
tEnv.executeSql("SHOW DATABASES").print();
// +------------------+
@@ -214,6 +231,22 @@ tEnv.executeSql("SHOW CATALOGS").print()
// | default_catalog |
// +-----------------+
+// create a catalog
+tEnv.executeSql("CREATE CATALOG cat2 WITH (...)")
+
+// show create catalog
+tEnv.executeSql("SHOW CREATE CATALOG cat2").print()
+//
+---------------------------------------------------------------------------------------------+
+// |
result |
+//
+---------------------------------------------------------------------------------------------+
+// | CREATE CATALOG `cat2` WITH (
+// 'default-database' = 'db',
+// 'type' = 'generic_in_memory'
+// )
+// |
+//
+---------------------------------------------------------------------------------------------+
+// 1 row in set
+
// show databases
tEnv.executeSql("SHOW DATABASES").print()
// +------------------+
@@ -316,6 +349,22 @@ table_env.execute_sql("SHOW CATALOGS").print()
# | default_catalog |
# +-----------------+
+# create a catalog
+table_env.execute_sql("CREATE CATALOG cat2 WITH (...)")
+
+# show create catalog
+table_env.execute_sql("SHOW CREATE CATALOG cat2").print()
+#
+---------------------------------------------------------------------------------------------+
+# |
result |
+#
+---------------------------------------------------------------------------------------------+
+# | CREATE CATALOG `cat2` WITH (
+# 'default-database' = 'db',
+# 'type' = 'generic_in_memory'
+# )
+# |
+#
+---------------------------------------------------------------------------------------------+
+# 1 row in set
+
# show databases
table_env.execute_sql("SHOW DATABASES").print()
# +------------------+
@@ -411,6 +460,14 @@ table_env.execute_sql("SHOW FULL MODULES").print()
Flink SQL> SHOW CATALOGS;
default_catalog
+Flink SQL> CREATE CATALOG cat2 WITH (...);
+[INFO] Execute statement succeeded.
+
+Flink SQL> SHOW CREATE CATALOG cat2;
+CREATE CATALOG `cat2` WITH (
+ ...
+)
+
Flink SQL> SHOW DATABASES;
default_database
@@ -504,6 +561,38 @@ SHOW CURRENT CATALOG
显示当前正在使用的 catalog。
+## SHOW CREATE CATALOG
+
+```sql
+SHOW CREATE CATALOG catalog_name
+```
+
+展示一个现有 catalog 的创建语句。
+
+该语句的输出内容包括 catalog 的名称和相关属性,使您可以直观地了解相应 catalog 的元数据。
+
+假设 `cat2` 是按如下方式创建的:
+```sql
+create catalog cat2 WITH (
+ 'type'='generic_in_memory',
+ 'default-database'='db'
+);
+```
+展示 catalog 创建语句。
+```sql
+show create catalog cat2;
++---------------------------------------------------------------------------------------------+
+|
result |
++---------------------------------------------------------------------------------------------+
+| CREATE CATALOG `cat2` WITH (
+ 'default-database' = 'db',
+ 'type' = 'generic_in_memory'
+)
+ |
++---------------------------------------------------------------------------------------------+
+1 row in set
+```
+
## SHOW DATABASES
```sql
diff --git a/docs/content/docs/dev/table/sql/show.md
b/docs/content/docs/dev/table/sql/show.md
index 73ec91b5ac6..e0b3a9d7fb8 100644
--- a/docs/content/docs/dev/table/sql/show.md
+++ b/docs/content/docs/dev/table/sql/show.md
@@ -33,6 +33,7 @@ SHOW CREATE statements are used to print a DDL statement with
which a given obje
Flink SQL supports the following SHOW statements for now:
- SHOW CATALOGS
- SHOW CURRENT CATALOG
+- SHOW CREATE CATALOG
- SHOW DATABASES
- SHOW CURRENT DATABASE
- SHOW TABLES
@@ -102,6 +103,22 @@ tEnv.executeSql("SHOW CURRENT CATALOG").print();
// | default_catalog |
// +----------------------+
+// create a catalog
+tEnv.executeSql("CREATE CATALOG cat2 WITH (...)");
+
+// show create catalog
+tEnv.executeSql("SHOW CREATE CATALOG cat2").print();
+//
+---------------------------------------------------------------------------------------------+
+// |
result |
+//
+---------------------------------------------------------------------------------------------+
+// | CREATE CATALOG `cat2` WITH (
+// 'default-database' = 'db',
+// 'type' = 'generic_in_memory'
+// )
+// |
+//
+---------------------------------------------------------------------------------------------+
+// 1 row in set
+
// show databases
tEnv.executeSql("SHOW DATABASES").print();
// +------------------+
@@ -214,6 +231,22 @@ tEnv.executeSql("SHOW CATALOGS").print()
// | default_catalog |
// +-----------------+
+// create a catalog
+tEnv.executeSql("CREATE CATALOG cat2 WITH (...)")
+
+// show create catalog
+tEnv.executeSql("SHOW CREATE CATALOG cat2").print()
+//
+---------------------------------------------------------------------------------------------+
+// |
result |
+//
+---------------------------------------------------------------------------------------------+
+// | CREATE CATALOG `cat2` WITH (
+// 'default-database' = 'db',
+// 'type' = 'generic_in_memory'
+// )
+// |
+//
+---------------------------------------------------------------------------------------------+
+// 1 row in set
+
// show databases
tEnv.executeSql("SHOW DATABASES").print()
// +------------------+
@@ -316,6 +349,22 @@ table_env.execute_sql("SHOW CATALOGS").print()
# | default_catalog |
# +-----------------+
+# create a catalog
+table_env.execute_sql("CREATE CATALOG cat2 WITH (...)")
+
+# show create catalog
+table_env.execute_sql("SHOW CREATE CATALOG cat2").print()
+#
+---------------------------------------------------------------------------------------------+
+# |
result |
+#
+---------------------------------------------------------------------------------------------+
+# | CREATE CATALOG `cat2` WITH (
+# 'default-database' = 'db',
+# 'type' = 'generic_in_memory'
+# )
+# |
+#
+---------------------------------------------------------------------------------------------+
+# 1 row in set
+
# show databases
table_env.execute_sql("SHOW DATABASES").print()
# +------------------+
@@ -411,6 +460,14 @@ table_env.execute_sql("SHOW FULL MODULES").print()
Flink SQL> SHOW CATALOGS;
default_catalog
+Flink SQL> CREATE CATALOG cat2 WITH (...);
+[INFO] Execute statement succeeded.
+
+Flink SQL> SHOW CREATE CATALOG cat2;
+CREATE CATALOG `cat2` WITH (
+ ...
+)
+
Flink SQL> SHOW DATABASES;
default_database
@@ -504,6 +561,38 @@ SHOW CURRENT CATALOG
Show current catalog.
+## SHOW CREATE CATALOG
+
+```sql
+SHOW CREATE CATALOG catalog_name
+```
+
+Show creation statement for an existing catalog.
+
+The output includes the catalog's name and relevant properties, which allows
you to gain an intuitive understanding of the underlying catalog's metadata.
+
+Assumes that the catalog `cat2` is created as follows:
+```sql
+create catalog cat2 WITH (
+ 'type'='generic_in_memory',
+ 'default-database'='db'
+);
+```
+Shows the creation statement.
+```sql
+show create catalog cat2;
++---------------------------------------------------------------------------------------------+
+|
result |
++---------------------------------------------------------------------------------------------+
+| CREATE CATALOG `cat2` WITH (
+ 'default-database' = 'db',
+ 'type' = 'generic_in_memory'
+)
+ |
++---------------------------------------------------------------------------------------------+
+1 row in set
+```
+
## SHOW DATABASES
```sql
diff --git
a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q
b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q
index 11958807220..009712fd2b3 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q
@@ -686,3 +686,24 @@ show tables from db1 like 'p_r%';
+------------+
1 row in set
!ok
+
+# ==========================================================================
+# test catalog
+# ==========================================================================
+
+create catalog cat2 WITH ('type'='generic_in_memory', 'default-database'='db');
+[INFO] Execute statement succeeded.
+!info
+
+show create catalog cat2;
++---------------------------------------------------------------------------------------------+
+|
result |
++---------------------------------------------------------------------------------------------+
+| CREATE CATALOG `cat2` WITH (
+ 'default-database' = 'db',
+ 'type' = 'generic_in_memory'
+)
+ |
++---------------------------------------------------------------------------------------------+
+1 row in set
+!ok
diff --git
a/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q
b/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q
index 2b9f0b46aa8..2d94bd32ac5 100644
--- a/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q
@@ -816,3 +816,25 @@ show tables from db1 like 'p_r%';
+------------+
1 row in set
!ok
+
+# ==========================================================================
+# test catalog
+# ==========================================================================
+
+create catalog cat2 WITH ('type'='generic_in_memory', 'default-database'='db');
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
+show create catalog cat2;
+!output
+CREATE CATALOG `cat2` WITH (
+ 'default-database' = 'db',
+ 'type' = 'generic_in_memory'
+)
+!ok
diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 8c39f41b463..d88a9c0f454 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -111,6 +111,7 @@
"org.apache.flink.sql.parser.dql.SqlShowCreate"
"org.apache.flink.sql.parser.dql.SqlShowCreateTable"
"org.apache.flink.sql.parser.dql.SqlShowCreateView"
+ "org.apache.flink.sql.parser.dql.SqlShowCreateCatalog"
"org.apache.flink.sql.parser.dql.SqlShowViews"
"org.apache.flink.sql.parser.dql.SqlRichDescribeTable"
"org.apache.flink.sql.parser.dql.SqlUnloadModule"
diff --git
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 58d1a2c6c39..9ac769e18a2 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -583,7 +583,8 @@ SqlShowViews SqlShowViews() :
}
/**
-* SHOW TABLES FROM [catalog.] database sql call.
+* Parses a show tables statement.
+* SHOW TABLES [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE
pattern ];
*/
SqlShowTables SqlShowTables() :
{
@@ -653,7 +654,7 @@ SqlShowColumns SqlShowColumns() :
}
/**
-* Parse a "Show Create Table" query and "Show Create View" query commands.
+* Parse a "Show Create Table" query and "Show Create View" and "Show Create
Catalog" query commands.
*/
SqlShowCreate SqlShowCreate() :
{
@@ -676,6 +677,13 @@ SqlShowCreate SqlShowCreate() :
{
return new SqlShowCreateView(pos, sqlIdentifier);
}
+ |
+ <CATALOG>
+ { pos = getPos(); }
+ sqlIdentifier = SimpleIdentifier()
+ {
+ return new SqlShowCreateCatalog(pos, sqlIdentifier);
+ }
)
}
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateCatalog.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateCatalog.java
new file mode 100644
index 00000000000..93d85cca39d
--- /dev/null
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateCatalog.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.dql;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import java.util.Collections;
+import java.util.List;
+
+/** SHOW CREATE CATALOG sql call. */
+public class SqlShowCreateCatalog extends SqlShowCreate {
+
+ public static final SqlSpecialOperator OPERATOR =
+ new SqlSpecialOperator("SHOW CREATE CATALOG", SqlKind.OTHER_DDL);
+
+ public SqlShowCreateCatalog(SqlParserPos pos, SqlIdentifier catalogName) {
+ super(pos, catalogName);
+ }
+
+ public SqlIdentifier getCatalogName() {
+ return sqlIdentifier;
+ }
+
+ @Override
+ public SqlOperator getOperator() {
+ return OPERATOR;
+ }
+
+ @Override
+ public List<SqlNode> getOperandList() {
+ return Collections.singletonList(sqlIdentifier);
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ writer.keyword("SHOW CREATE CATALOG");
+ sqlIdentifier.unparse(writer, leftPrec, rightPrec);
+ }
+
+ public String catalogName() {
+ return sqlIdentifier.getSimple();
+ }
+}
diff --git
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index d12b9b0d3fa..48b3b6b2e12 100644
---
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -127,6 +127,11 @@ class FlinkSqlParserImplTest extends SqlParserTest {
+ ")");
}
+ @Test
+ void testShowCreateCatalog() {
+ sql("show create catalog c1").ok("SHOW CREATE CATALOG `C1`");
+ }
+
@Test
void testDropCatalog() {
sql("drop catalog c1").ok("DROP CATALOG `C1`");
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
index 2a9eb90bbe7..c11aca26046 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.api.internal;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDescriptor;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -33,10 +34,13 @@ import org.apache.flink.table.utils.EncodingUtils;
import org.apache.commons.lang3.StringUtils;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
+import static org.apache.flink.table.utils.EncodingUtils.escapeIdentifier;
+
/** SHOW CREATE statement Util. */
@Internal
public class ShowCreateUtil {
@@ -72,7 +76,7 @@ public class ShowCreateUtil {
sb.append("PARTITIONED BY (")
.append(partitionedInfoFormatted)
.append(")\n"));
- extractFormattedOptions(table, printIndent)
+ extractFormattedOptions(table.getOptions(), printIndent)
.ifPresent(v -> sb.append("WITH
(\n").append(v).append("\n)\n"));
return sb.toString();
}
@@ -111,6 +115,18 @@ public class ShowCreateUtil {
return stringBuilder.toString();
}
+ public static String buildShowCreateCatalogRow(CatalogDescriptor
catalogDescriptor) {
+ final String printIndent = " ";
+ return String.format(
+ "CREATE CATALOG %s WITH (%s%s%s)%s",
+ escapeIdentifier(catalogDescriptor.getCatalogName()),
+ System.lineSeparator(),
+
extractFormattedOptions(catalogDescriptor.getConfiguration().toMap(),
printIndent)
+ .orElse(""),
+ System.lineSeparator(),
+ System.lineSeparator());
+ }
+
static String buildCreateFormattedPrefix(
String tableType, boolean isTemporary, ObjectIdentifier
identifier) {
return String.format(
@@ -213,13 +229,12 @@ public class ShowCreateUtil {
.collect(Collectors.joining(", ")));
}
- static Optional<String> extractFormattedOptions(
- ResolvedCatalogBaseTable<?> table, String printIndent) {
- if (Objects.isNull(table.getOptions()) ||
table.getOptions().isEmpty()) {
+ static Optional<String> extractFormattedOptions(Map<String, String> conf,
String printIndent) {
+ if (Objects.isNull(conf) || conf.isEmpty()) {
return Optional.empty();
}
return Optional.of(
- table.getOptions().entrySet().stream()
+ conf.entrySet().stream()
.map(
entry ->
String.format(
@@ -227,7 +242,7 @@ public class ShowCreateUtil {
printIndent,
EncodingUtils.escapeSingleQuotes(entry.getKey()),
EncodingUtils.escapeSingleQuotes(entry.getValue())))
- .collect(Collectors.joining(",\n")));
+ .collect(Collectors.joining("," +
System.lineSeparator())));
}
static String extractFormattedColumnNames(ResolvedCatalogBaseTable<?>
baseTable) {
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index de6fdb8e8db..543105d027a 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -138,6 +138,10 @@ public final class CatalogManager implements
CatalogRegistry, AutoCloseable {
return catalogModificationListeners;
}
+ public Optional<CatalogDescriptor> getCatalogDescriptor(String
catalogName) {
+ return catalogStoreHolder.catalogStore().getCatalog(catalogName);
+ }
+
public static Builder newBuilder() {
return new Builder();
}
@@ -402,8 +406,7 @@ public final class CatalogManager implements
CatalogRegistry, AutoCloseable {
}
// Get catalog from the CatalogStore.
- Optional<CatalogDescriptor> optionalDescriptor =
- catalogStoreHolder.catalogStore().getCatalog(catalogName);
+ Optional<CatalogDescriptor> optionalDescriptor =
getCatalogDescriptor(catalogName);
return optionalDescriptor.map(
descriptor -> {
Catalog catalog = initCatalog(catalogName, descriptor);
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateCatalogOperation.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateCatalogOperation.java
new file mode 100644
index 00000000000..60baecb40a8
--- /dev/null
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateCatalogOperation.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.internal.ShowCreateUtil;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.CatalogDescriptor;
+
+import static
org.apache.flink.table.api.internal.TableResultUtils.buildStringArrayResult;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Operation to describe a SHOW CREATE CATALOG statement. */
+@Internal
+public class ShowCreateCatalogOperation implements ShowOperation {
+
+ private final String catalogName;
+
+ public ShowCreateCatalogOperation(String catalogName) {
+ this.catalogName = checkNotNull(catalogName, "Catalog name must not be
null");
+ }
+
+ public String getCatalogName() {
+ return catalogName;
+ }
+
+ @Override
+ public String asSummaryString() {
+ return String.format("SHOW CREATE CATALOG %s", catalogName);
+ }
+
+ @Override
+ public TableResultInternal execute(Context ctx) {
+ CatalogDescriptor catalogDescriptor =
+ ctx.getCatalogManager()
+ .getCatalogDescriptor(catalogName)
+ .orElseThrow(
+ () ->
+ new ValidationException(
+ String.format(
+ "Cannot obtain
metadata information from Catalog %s.",
+ catalogName)));
+ String resultRow =
ShowCreateUtil.buildShowCreateCatalogRow(catalogDescriptor);
+
+ return buildStringArrayResult("result", new String[] {resultRow});
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
index 78356d16c7d..72d71403dc8 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
@@ -53,6 +53,7 @@ public class SqlNodeConverters {
register(new SqlReplaceTableAsConverter());
register(new SqlProcedureCallConverter());
register(new SqlShowDatabasesConverter());
+ register(new SqlShowCreateCatalogConverter());
}
/**
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlShowCreateCatalogConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlShowCreateCatalogConverter.java
new file mode 100644
index 00000000000..6078dfc02b7
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlShowCreateCatalogConverter.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.dql.SqlShowCreateCatalog;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ShowCreateCatalogOperation;
+
+/** A converter for {@link SqlShowCreateCatalog}. */
+public class SqlShowCreateCatalogConverter implements
SqlNodeConverter<SqlShowCreateCatalog> {
+
+ @Override
+ public Operation convertSqlNode(
+ SqlShowCreateCatalog sqlShowCreateCatalog, ConvertContext context)
{
+ return new
ShowCreateCatalogOperation(sqlShowCreateCatalog.catalogName());
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlOtherOperationConverterTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlOtherOperationConverterTest.java
index 48fc6fe6104..4acae04ad9c 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlOtherOperationConverterTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlOtherOperationConverterTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.table.api.SqlParserException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.operations.LoadModuleOperation;
import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ShowCreateCatalogOperation;
import org.apache.flink.table.operations.ShowDatabasesOperation;
import org.apache.flink.table.operations.ShowFunctionsOperation;
import org.apache.flink.table.operations.ShowModulesOperation;
@@ -187,6 +188,13 @@ public class SqlOtherOperationConverterTest extends
SqlNodeToOperationConversion
assertThat(showTablesOperation.getPreposition()).isNull();
}
+ @Test
+ void testShowCreateCatalog() {
+ Operation operation = parse("show create catalog cat1");
+ assertThat(operation).isInstanceOf(ShowCreateCatalogOperation.class);
+ assertThat(operation.asSummaryString()).isEqualTo("SHOW CREATE CATALOG
cat1");
+ }
+
@Test
void testShowFullModules() {
final String sql = "SHOW FULL MODULES";