This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 2cde5613129 [FLINK-36038][table] Output for `SHOW CREATE VIEW` should
follow the `CREATE VIEW` syntax
2cde5613129 is described below
commit 2cde5613129b1343bf473c55511b56719e18559b
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Tue Aug 13 17:54:07 2024 +0200
[FLINK-36038][table] Output for `SHOW CREATE VIEW` should follow the
`CREATE VIEW` syntax
---
.../src/test/resources/sql/catalog_database.q | 7 +-
.../src/test/resources/sql/table.q | 42 ++--
.../flink-sql-client/src/test/resources/sql/view.q | 63 +++--
.../src/test/resources/sql/catalog_database.q | 7 +-
.../src/test/resources/sql/table.q | 15 +-
.../src/test/resources/sql/view.q | 30 ++-
.../flink/table/api/internal/ShowCreateUtil.java | 100 ++++----
.../table/api/internal/ShowCreateUtilTest.java | 275 +++++++++++++++++++++
.../test/program/TableTestProgramRunnerTest.java | 6 +-
.../table/planner/catalog/CatalogTableITCase.scala | 9 +-
.../table/planner/catalog/CatalogViewITCase.scala | 74 ++++--
11 files changed, 497 insertions(+), 131 deletions(-)
diff --git
a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q
b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q
index 93c219f4e52..0ddaa1a46ae 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q
@@ -101,7 +101,9 @@ show create catalog cat_comment;
+--------------------------------------------------------------------------------------------------------------------------------+
|
result |
+--------------------------------------------------------------------------------------------------------------------------------+
-| CREATE CATALOG `cat_comment` COMMENT 'hello ''catalog''' WITH (
+| CREATE CATALOG `cat_comment`
+COMMENT 'hello ''catalog'''
+WITH (
'default-database' = 'db',
'type' = 'generic_in_memory'
)
@@ -150,7 +152,8 @@ show create catalog cat2;
+---------------------------------------------------------------------------------------------+
|
result |
+---------------------------------------------------------------------------------------------+
-| CREATE CATALOG `cat2` WITH (
+| CREATE CATALOG `cat2`
+WITH (
'default-database' = 'db',
'type' = 'generic_in_memory'
)
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/table.q
b/flink-table/flink-sql-client/src/test/resources/sql/table.q
index 5201df9b325..02168b1c6ac 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/table.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/table.q
@@ -90,7 +90,8 @@ show create table orders;
`ptime` AS PROCTIME(),
WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND,
CONSTRAINT `PK_user` PRIMARY KEY (`user`) NOT ENFORCED
-) WITH (
+)
+WITH (
'connector' = 'datagen'
)
|
@@ -384,7 +385,8 @@ show create table orders2;
`ptime` AS PROCTIME(),
WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND,
CONSTRAINT `PK_user` PRIMARY KEY (`user`) NOT ENFORCED
-) WITH (
+)
+WITH (
'connector' = 'kafka',
'scan.startup.mode' = 'earliest-offset'
)
@@ -411,7 +413,8 @@ show create table orders2;
`ptime` AS PROCTIME(),
WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND,
CONSTRAINT `PK_user` PRIMARY KEY (`user`) NOT ENFORCED
-) WITH (
+)
+WITH (
'connector' = 'datagen',
'scan.startup.mode' = 'earliest-offset'
)
@@ -480,7 +483,8 @@ show create table orders2;
`ptime` AS PROCTIME(),
WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND,
CONSTRAINT `PK_user` PRIMARY KEY (`user`) NOT ENFORCED
-) WITH (
+)
+WITH (
'connector' = 'datagen'
)
|
@@ -515,7 +519,8 @@ show create table orders2;
`ptime` AS PROCTIME(),
WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND,
CONSTRAINT `PK_user` PRIMARY KEY (`user`) NOT ENFORCED
-) WITH (
+)
+WITH (
'connector' = 'datagen'
)
|
@@ -560,7 +565,8 @@ show create table orders2;
`ptime` AS PROCTIME(),
WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND,
CONSTRAINT `PK_user` PRIMARY KEY (`user`) NOT ENFORCED
-) WITH (
+)
+WITH (
'connector' = 'datagen'
)
|
@@ -590,7 +596,8 @@ show create table orders2;
`ptime` AS PROCTIME(),
WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND,
CONSTRAINT `PK_user` PRIMARY KEY (`user`) NOT ENFORCED
-) WITH (
+)
+WITH (
'connector' = 'datagen'
)
|
@@ -623,7 +630,8 @@ show create table orders2;
`ptime` AS PROCTIME(),
WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND,
CONSTRAINT `order_constraint` PRIMARY KEY (`trade_order_id`) NOT ENFORCED
-) WITH (
+)
+WITH (
'connector' = 'datagen'
)
|
@@ -653,7 +661,8 @@ show create table orders2;
`ptime` AS PROCTIME(),
WATERMARK FOR `ts` AS `ts` - INTERVAL '1' MINUTE,
CONSTRAINT `order_constraint` PRIMARY KEY (`trade_order_id`) NOT ENFORCED
-) WITH (
+)
+WITH (
'connector' = 'datagen'
)
|
@@ -683,7 +692,8 @@ show create table orders2;
`ptime` AS PROCTIME(),
WATERMARK FOR `ts` AS `ts` - INTERVAL '1' MINUTE,
CONSTRAINT `order_constraint` PRIMARY KEY (`trade_order_id`) NOT ENFORCED
-) WITH (
+)
+WITH (
'connector' = 'datagen'
)
|
@@ -712,7 +722,8 @@ show create table orders2;
`product_id` BIGINT NOT NULL,
`ptime` AS PROCTIME(),
WATERMARK FOR `ts` AS `ts` - INTERVAL '1' MINUTE
-) WITH (
+)
+WITH (
'connector' = 'datagen'
)
|
@@ -740,7 +751,8 @@ show create table orders2;
`user_email` VARCHAR(2147483647) NOT NULL,
`product_id` BIGINT NOT NULL,
`ptime` AS PROCTIME()
-) WITH (
+)
+WITH (
'connector' = 'datagen'
)
|
@@ -838,7 +850,8 @@ show create table tbl1;
`user` BIGINT NOT NULL,
`product` VARCHAR(32),
`amount` INT
-) WITH (
+)
+WITH (
'connector' = 'datagen'
)
|
@@ -970,7 +983,8 @@ show create table orders3;
`ptime` AS PROCTIME() COMMENT 'notice: computed column, named ''ptime''.',
WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND,
CONSTRAINT `PK_user` PRIMARY KEY (`user`) NOT ENFORCED
-) WITH (
+)
+WITH (
'connector' = 'kafka',
'scan.startup.mode' = 'earliest-offset'
)
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/view.q
b/flink-table/flink-sql-client/src/test/resources/sql/view.q
index 915f0b6b3a6..261a43b7bc5 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/view.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/view.q
@@ -59,25 +59,39 @@ create temporary view if not exists v2 as select * from v1;
# test show create a temporary view
show create view v1;
-+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-|
result |
-+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-| CREATE TEMPORARY VIEW `default_catalog`.`default_database`.`v1`(`user`,
`product`, `amount`, `ts`, `ptime`) as
-SELECT *
-FROM `default_catalog`.`default_database`.`orders` |
-+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
++------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+|
result |
++------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| CREATE TEMPORARY VIEW `default_catalog`.`default_database`.`v1` (
+ `user`,
+ `product`,
+ `amount`,
+ `ts`,
+ `ptime`
+)
+AS SELECT *
+FROM `default_catalog`.`default_database`.`orders`
+ |
++------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set
!ok
# test show create a temporary view reference another view
show create view v2;
-+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-|
result |
-+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-| CREATE TEMPORARY VIEW `default_catalog`.`default_database`.`v2`(`user`,
`product`, `amount`, `ts`, `ptime`) as
-SELECT *
-FROM `default_catalog`.`default_database`.`v1` |
-+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
++--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+|
result |
++--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| CREATE TEMPORARY VIEW `default_catalog`.`default_database`.`v2` (
+ `user`,
+ `product`,
+ `amount`,
+ `ts`,
+ `ptime`
+)
+AS SELECT *
+FROM `default_catalog`.`default_database`.`v1`
+ |
++--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set
!ok
@@ -128,13 +142,20 @@ create view permanent_v1 as select * from orders;
# test show create a permanent view
show create view permanent_v1;
-+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-|
result |
-+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-| CREATE VIEW `default_catalog`.`default_database`.`permanent_v1`(`user`,
`product`, `amount`, `ts`, `ptime`) as
-SELECT *
-FROM `default_catalog`.`default_database`.`orders` |
-+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
++------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+|
result |
++------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| CREATE VIEW `default_catalog`.`default_database`.`permanent_v1` (
+ `user`,
+ `product`,
+ `amount`,
+ `ts`,
+ `ptime`
+)
+AS SELECT *
+FROM `default_catalog`.`default_database`.`orders`
+ |
++------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set
!ok
diff --git
a/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q
b/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q
index 644c42774e9..c79ed374590 100644
--- a/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q
@@ -121,7 +121,9 @@ create catalog cat_comment comment 'hello ''catalog''' WITH
('type'='generic_in_
show create catalog cat_comment;
!output
-CREATE CATALOG `cat_comment` COMMENT 'hello ''catalog''' WITH (
+CREATE CATALOG `cat_comment`
+COMMENT 'hello ''catalog'''
+WITH (
'default-database' = 'db',
'type' = 'generic_in_memory'
)
@@ -179,7 +181,8 @@ create catalog cat2 WITH ('type'='generic_in_memory',
'default-database'='db');
show create catalog cat2;
!output
-CREATE CATALOG `cat2` WITH (
+CREATE CATALOG `cat2`
+WITH (
'default-database' = 'db',
'type' = 'generic_in_memory'
)
diff --git a/flink-table/flink-sql-gateway/src/test/resources/sql/table.q
b/flink-table/flink-sql-gateway/src/test/resources/sql/table.q
index 597281a6e33..0b201217f8f 100644
--- a/flink-table/flink-sql-gateway/src/test/resources/sql/table.q
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/table.q
@@ -91,7 +91,8 @@ CREATE TABLE `default_catalog`.`default_database`.`orders` (
`ptime` AS PROCTIME(),
WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND,
CONSTRAINT `PK_user` PRIMARY KEY (`user`) NOT ENFORCED
-) WITH (
+)
+WITH (
'connector' = 'datagen'
)
!ok
@@ -341,7 +342,8 @@ CREATE TABLE `default_catalog`.`default_database`.`orders2`
(
`ptime` AS PROCTIME(),
WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND,
CONSTRAINT `PK_user` PRIMARY KEY (`user`) NOT ENFORCED
-) WITH (
+)
+WITH (
'connector' = 'kafka',
'scan.startup.mode' = 'earliest-offset'
)
@@ -369,7 +371,8 @@ CREATE TABLE `default_catalog`.`default_database`.`orders2`
(
`ptime` AS PROCTIME(),
WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND,
CONSTRAINT `PK_user` PRIMARY KEY (`user`) NOT ENFORCED
-) WITH (
+)
+WITH (
'connector' = 'datagen',
'scan.startup.mode' = 'earliest-offset'
)
@@ -439,7 +442,8 @@ CREATE TABLE `default_catalog`.`default_database`.`orders2`
(
`ptime` AS PROCTIME(),
WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND,
CONSTRAINT `PK_user` PRIMARY KEY (`user`) NOT ENFORCED
-) WITH (
+)
+WITH (
'connector' = 'datagen'
)
!ok
@@ -558,7 +562,8 @@ CREATE TEMPORARY TABLE
`default_catalog`.`default_database`.`tbl1` (
`user` BIGINT NOT NULL,
`product` VARCHAR(32),
`amount` INT
-) WITH (
+)
+WITH (
'connector' = 'datagen'
)
!ok
diff --git a/flink-table/flink-sql-gateway/src/test/resources/sql/view.q
b/flink-table/flink-sql-gateway/src/test/resources/sql/view.q
index dc20a1cb70f..3837fc8b64f 100644
--- a/flink-table/flink-sql-gateway/src/test/resources/sql/view.q
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/view.q
@@ -84,16 +84,28 @@ create temporary view if not exists v2 as select * from v1;
# test show create a temporary view
show create view v1;
!output
-CREATE TEMPORARY VIEW `default_catalog`.`default_database`.`v1`(`user`,
`product`, `amount`, `ts`, `ptime`) as
-SELECT *
+CREATE TEMPORARY VIEW `default_catalog`.`default_database`.`v1` (
+ `user`,
+ `product`,
+ `amount`,
+ `ts`,
+ `ptime`
+)
+AS SELECT *
FROM `default_catalog`.`default_database`.`orders`
!ok
# test show create a temporary view reference another view
show create view v2;
!output
-CREATE TEMPORARY VIEW `default_catalog`.`default_database`.`v2`(`user`,
`product`, `amount`, `ts`, `ptime`) as
-SELECT *
+CREATE TEMPORARY VIEW `default_catalog`.`default_database`.`v2` (
+ `user`,
+ `product`,
+ `amount`,
+ `ts`,
+ `ptime`
+)
+AS SELECT *
FROM `default_catalog`.`default_database`.`v1`
!ok
@@ -159,8 +171,14 @@ create view permanent_v1 as select * from orders;
# test show create a permanent view
show create view permanent_v1;
!output
-CREATE VIEW `default_catalog`.`default_database`.`permanent_v1`(`user`,
`product`, `amount`, `ts`, `ptime`) as
-SELECT *
+CREATE VIEW `default_catalog`.`default_database`.`permanent_v1` (
+ `user`,
+ `product`,
+ `amount`,
+ `ts`,
+ `ptime`
+)
+AS SELECT *
FROM `default_catalog`.`default_database`.`orders`
!ok
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
index b3a5d8e6f02..451ad94d6b0 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
@@ -39,12 +39,12 @@ import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
-import static org.apache.flink.table.utils.EncodingUtils.escapeIdentifier;
-
/** SHOW CREATE statement Util. */
@Internal
public class ShowCreateUtil {
+ private static final String PRINT_INDENT = " ";
+
private ShowCreateUtil() {}
public static String buildShowCreateTableRow(
@@ -57,18 +57,16 @@ public class ShowCreateUtil {
"SHOW CREATE TABLE is only supported for tables,
but %s is a view. Please use SHOW CREATE VIEW instead.",
tableIdentifier.asSerializableString()));
}
- final String printIndent = " ";
StringBuilder sb =
new StringBuilder()
.append(buildCreateFormattedPrefix("TABLE",
isTemporary, tableIdentifier));
- sb.append(extractFormattedColumns(table, printIndent));
- extractFormattedWatermarkSpecs(table, printIndent)
+ sb.append(extractFormattedColumns(table, PRINT_INDENT));
+ extractFormattedWatermarkSpecs(table, PRINT_INDENT)
.ifPresent(watermarkSpecs ->
sb.append(",\n").append(watermarkSpecs));
- extractFormattedPrimaryKey(table, printIndent).ifPresent(pk ->
sb.append(",\n").append(pk));
- sb.append("\n) ");
- extractFormattedComment(table)
- .ifPresent(
- c -> sb.append(String.format("COMMENT '%s'%s", c,
System.lineSeparator())));
+ extractFormattedPrimaryKey(table, PRINT_INDENT)
+ .ifPresent(pk -> sb.append(",\n").append(pk));
+ sb.append("\n)\n");
+ extractComment(table).ifPresent(c ->
sb.append(formatComment(c)).append("\n"));
extractFormattedDistributedInfo((ResolvedCatalogTable)
table).ifPresent(sb::append);
extractFormattedPartitionedInfo((ResolvedCatalogTable) table)
.ifPresent(
@@ -76,7 +74,7 @@ public class ShowCreateUtil {
sb.append("PARTITIONED BY (")
.append(partitionedInfoFormatted)
.append(")\n"));
- extractFormattedOptions(table.getOptions(), printIndent)
+ extractFormattedOptions(table.getOptions(), PRINT_INDENT)
.ifPresent(v -> sb.append("WITH
(\n").append(v).append("\n)\n"));
return sb.toString();
}
@@ -92,43 +90,30 @@ public class ShowCreateUtil {
"SHOW CREATE VIEW is only supported for views, but
%s is a table. Please use SHOW CREATE TABLE instead.",
viewIdentifier.asSerializableString()));
}
- StringBuilder stringBuilder = new StringBuilder();
if (view.getOrigin() instanceof QueryOperationCatalogView) {
throw new TableException(
"SHOW CREATE VIEW is not supported for views registered by
Table API.");
- } else {
- stringBuilder.append(
- String.format(
- "CREATE %sVIEW %s%s as%s%s",
- isTemporary ? "TEMPORARY " : "",
- viewIdentifier.asSerializableString(),
- String.format("(%s)",
extractFormattedColumnNames(view)),
- System.lineSeparator(),
- ((CatalogView)
view.getOrigin()).getExpandedQuery()));
}
- extractFormattedComment(view)
- .ifPresent(
- c ->
- stringBuilder.append(
- String.format(
- " COMMENT '%s'%s", c,
System.lineSeparator())));
- return stringBuilder.toString();
+ StringBuilder sb =
+ new StringBuilder()
+ .append(buildCreateFormattedPrefix("VIEW",
isTemporary, viewIdentifier));
+ sb.append(extractFormattedColumnNames(view,
PRINT_INDENT)).append("\n)\n");
+ extractComment(view).ifPresent(c ->
sb.append(formatComment(c)).append("\n"));
+ sb.append("AS ").append(((CatalogView)
view.getOrigin()).getExpandedQuery()).append("\n");
+
+ return sb.toString();
}
public static String buildShowCreateCatalogRow(CatalogDescriptor
catalogDescriptor) {
- final String printIndent = " ";
- final String comment = catalogDescriptor.getComment().orElse(null);
- return String.format(
- "CREATE CATALOG %s %sWITH (%s%s%s)%s",
- escapeIdentifier(catalogDescriptor.getCatalogName()),
- StringUtils.isNotEmpty(comment)
- ? String.format("COMMENT '%s' ",
EncodingUtils.escapeSingleQuotes(comment))
- : "",
- System.lineSeparator(),
-
extractFormattedOptions(catalogDescriptor.getConfiguration().toMap(),
printIndent)
- .orElse(""),
- System.lineSeparator(),
- System.lineSeparator());
+ final Optional<String> comment = catalogDescriptor.getComment();
+ StringBuilder sb = new StringBuilder();
+ sb.append("CREATE CATALOG ")
+
.append(EncodingUtils.escapeIdentifier(catalogDescriptor.getCatalogName()))
+ .append("\n");
+ comment.ifPresent(c -> sb.append(formatComment(c)).append("\n"));
+ extractFormattedOptions(catalogDescriptor.getConfiguration().toMap(),
PRINT_INDENT)
+ .ifPresent(o -> sb.append("WITH
(\n").append(o).append("\n)\n"));
+ return sb.toString();
}
static String buildCreateFormattedPrefix(
@@ -176,10 +161,7 @@ public class ShowCreateUtil {
comment -> {
if (StringUtils.isNotEmpty(comment)) {
sb.append(" ");
- sb.append(
- String.format(
- "COMMENT '%s'",
-
EncodingUtils.escapeSingleQuotes(comment)));
+ sb.append(formatComment(comment));
}
});
return sb.toString();
@@ -211,12 +193,14 @@ public class ShowCreateUtil {
.collect(Collectors.joining("\n")));
}
- static Optional<String>
extractFormattedComment(ResolvedCatalogBaseTable<?> table) {
- String comment = table.getComment();
- if (StringUtils.isNotEmpty(comment)) {
- return Optional.of(EncodingUtils.escapeSingleQuotes(comment));
- }
- return Optional.empty();
+ private static String formatComment(String comment) {
+ return String.format("COMMENT '%s'",
EncodingUtils.escapeSingleQuotes(comment));
+ }
+
+ static Optional<String> extractComment(ResolvedCatalogBaseTable<?> table) {
+ return StringUtils.isEmpty(table.getComment())
+ ? Optional.empty()
+ : Optional.of(table.getComment());
}
static Optional<String>
extractFormattedDistributedInfo(ResolvedCatalogTable catalogTable) {
@@ -246,13 +230,19 @@ public class ShowCreateUtil {
printIndent,
EncodingUtils.escapeSingleQuotes(entry.getKey()),
EncodingUtils.escapeSingleQuotes(entry.getValue())))
+ .sorted()
.collect(Collectors.joining("," +
System.lineSeparator())));
}
- static String extractFormattedColumnNames(ResolvedCatalogBaseTable<?>
baseTable) {
+ static String extractFormattedColumnNames(
+ ResolvedCatalogBaseTable<?> baseTable, String printIndent) {
return baseTable.getResolvedSchema().getColumns().stream()
- .map(Column::getName)
- .map(EncodingUtils::escapeIdentifier)
- .collect(Collectors.joining(", "));
+ .map(
+ column ->
+ String.format(
+ "%s%s",
+ printIndent,
+
EncodingUtils.escapeIdentifier(column.getName())))
+ .collect(Collectors.joining(",\n"));
}
}
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java
new file mode 100644
index 00000000000..844d9a2d7a2
--- /dev/null
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.internal;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogDescriptor;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedCatalogView;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableDistribution;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test {@link ShowCreateUtil}. */
+public class ShowCreateUtilTest {
+ private static final ObjectIdentifier TABLE_IDENTIFIER =
+ ObjectIdentifier.of("catalogName", "dbName", "tableName");
+ private static final ObjectIdentifier VIEW_IDENTIFIER =
+ ObjectIdentifier.of("catalogName", "dbName", "viewName");
+
+ private static final ResolvedSchema ONE_COLUMN_SCHEMA =
+ ResolvedSchema.of(Column.physical("id", DataTypes.INT()));
+
+ private static final ResolvedSchema TWO_COLUMNS_SCHEMA =
+ ResolvedSchema.of(
+ Column.physical("id", DataTypes.INT()),
+ Column.physical("name", DataTypes.STRING()));
+
+ @ParameterizedTest(name = "{index}: {1}")
+ @MethodSource("argsForShowCreateTable")
+ void showCreateTable(ResolvedCatalogTable resolvedCatalogTable, String
expected) {
+ final String createTableString =
+ ShowCreateUtil.buildShowCreateTableRow(
+ resolvedCatalogTable, TABLE_IDENTIFIER, false);
+ assertThat(createTableString).isEqualTo(expected);
+ }
+
+ @ParameterizedTest(name = "{index}: {1}")
+ @MethodSource("argsForShowCreateView")
+ void showCreateView(ResolvedCatalogView resolvedCatalogView, String
expected) {
+ final String createViewString =
+ ShowCreateUtil.buildShowCreateViewRow(resolvedCatalogView,
VIEW_IDENTIFIER, false);
+ assertThat(createViewString).isEqualTo(expected);
+ }
+
+ @ParameterizedTest(name = "{index}: {1}")
+ @MethodSource("argsForShowCreateCatalog")
+ void showCreateCatalog(CatalogDescriptor catalogDescriptor, String
expected) {
+ final String createCatalogString =
+ ShowCreateUtil.buildShowCreateCatalogRow(catalogDescriptor);
+ assertThat(createCatalogString).isEqualTo(expected);
+ }
+
+ private static Collection<Arguments> argsForShowCreateCatalog() {
+ Collection<Arguments> argList = new ArrayList<>();
+ Map<String, String> options = new HashMap<>();
+ options.put("k_a", "v_a");
+ options.put("k_b", "v_b");
+ options.put("k_c", "v_c");
+ final Configuration configuration = Configuration.fromMap(options);
+ argList.add(
+ Arguments.of(
+ CatalogDescriptor.of("catalogName", configuration),
+ "CREATE CATALOG `catalogName`\n"
+ + "WITH (\n"
+ + " 'k_a' = 'v_a',\n"
+ + " 'k_b' = 'v_b',\n"
+ + " 'k_c' = 'v_c'\n"
+ + ")\n"));
+
+ argList.add(
+ Arguments.of(
+ CatalogDescriptor.of("catalogName", configuration)
+ .setComment("Catalog comment"),
+ "CREATE CATALOG `catalogName`\n"
+ + "COMMENT 'Catalog comment'\n"
+ + "WITH (\n"
+ + " 'k_a' = 'v_a',\n"
+ + " 'k_b' = 'v_b',\n"
+ + " 'k_c' = 'v_c'\n"
+ + ")\n"));
+
+ return argList;
+ }
+
+ private static Collection<Arguments> argsForShowCreateView() {
+ Collection<Arguments> argList = new ArrayList<>();
+ argList.add(
+ Arguments.of(
+ createResolvedView(ONE_COLUMN_SCHEMA, "SELECT 1",
"SELECT 1", null),
+ "CREATE VIEW `catalogName`.`dbName`.`viewName` (\n"
+ + " `id`\n"
+ + ")\n"
+ + "AS SELECT 1\n"));
+
+ argList.add(
+ Arguments.of(
+ createResolvedView(
+ TWO_COLUMNS_SCHEMA,
+ "SELECT id, name FROM tbl_a",
+ "SELECT id, name FROM
`catalogName`.`dbName`.`tbl_a`",
+ "View comment"),
+ "CREATE VIEW `catalogName`.`dbName`.`viewName` (\n"
+ + " `id`,\n"
+ + " `name`\n"
+ + ")\n"
+ + "COMMENT 'View comment'\n"
+ + "AS SELECT id, name FROM
`catalogName`.`dbName`.`tbl_a`\n"));
+ return argList;
+ }
+
+ private static Collection<Arguments> argsForShowCreateTable() {
+ Collection<Arguments> argList = new ArrayList<>();
+ argList.add(
+ Arguments.of(
+ createResolvedTable(
+ ONE_COLUMN_SCHEMA,
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ TableDistribution.of(
+ TableDistribution.Kind.HASH,
+ 2,
+ Arrays.asList("key1", "key2")),
+ null),
+ "CREATE TABLE `catalogName`.`dbName`.`tableName` (\n"
+ + " `id` INT\n"
+ + ")\n"
+ + "DISTRIBUTED BY HASH(`key1`, `key2`) INTO 2
BUCKETS\n"));
+
+ argList.add(
+ Arguments.of(
+ createResolvedTable(
+ ONE_COLUMN_SCHEMA,
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ TableDistribution.of(
+ TableDistribution.Kind.RANGE, 2,
Arrays.asList("1", "10")),
+ "Table comment"),
+ "CREATE TABLE `catalogName`.`dbName`.`tableName` (\n"
+ + " `id` INT\n"
+ + ")\n"
+ + "COMMENT 'Table comment'\n"
+ + "DISTRIBUTED BY RANGE(`1`, `10`) INTO 2
BUCKETS\n"));
+
+ final Map<String, String> options = new HashMap<>();
+ options.put("option_key_a", "option_value_a");
+ options.put("option_key_b", "option_value_b");
+ options.put("option_key_c", "option_value_c");
+
+ argList.add(
+ Arguments.of(
+ createResolvedTable(
+ TWO_COLUMNS_SCHEMA,
+ options,
+ Collections.emptyList(),
+ null,
+ "Another table comment"),
+ "CREATE TABLE `catalogName`.`dbName`.`tableName` (\n"
+ + " `id` INT,\n"
+ + " `name` VARCHAR(2147483647)\n"
+ + ")\n"
+ + "COMMENT 'Another table comment'\n"
+ + "WITH (\n"
+ + " 'option_key_a' = 'option_value_a',\n"
+ + " 'option_key_b' = 'option_value_b',\n"
+ + " 'option_key_c' = 'option_value_c'\n"
+ + ")\n"));
+
+ argList.add(
+ Arguments.of(
+ createResolvedTable(
+ ONE_COLUMN_SCHEMA,
+ Collections.emptyMap(),
+ Arrays.asList("key1", "key2"),
+ null,
+ "comment"),
+ "CREATE TABLE `catalogName`.`dbName`.`tableName` (\n"
+ + " `id` INT\n"
+ + ")\n"
+ + "COMMENT 'comment'\n"
+ + "PARTITIONED BY (`key1`, `key2`)\n"));
+
+ argList.add(
+ Arguments.of(
+ createResolvedTable(
+ TWO_COLUMNS_SCHEMA,
+ options,
+ Arrays.asList("key1", "key2"),
+ TableDistribution.of(
+ TableDistribution.Kind.UNKNOWN,
+ 3,
+ Arrays.asList("1", "2", "3")),
+ "table comment"),
+ "CREATE TABLE `catalogName`.`dbName`.`tableName` (\n"
+ + " `id` INT,\n"
+ + " `name` VARCHAR(2147483647)\n"
+ + ")\n"
+ + "COMMENT 'table comment'\n"
+ + "DISTRIBUTED BY (`1`, `2`, `3`) INTO 3
BUCKETS\n"
+ + "PARTITIONED BY (`key1`, `key2`)\n"
+ + "WITH (\n"
+ + " 'option_key_a' = 'option_value_a',\n"
+ + " 'option_key_b' = 'option_value_b',\n"
+ + " 'option_key_c' = 'option_value_c'\n"
+ + ")\n"));
+ return argList;
+ }
+
+ private static ResolvedCatalogTable createResolvedTable(
+ ResolvedSchema resolvedSchema,
+ Map<String, String> options,
+ List<String> partitionKeys,
+ TableDistribution tableDistribution,
+ String comment) {
+ CatalogTable.Builder tableBuilder =
+ CatalogTable.newBuilder()
+
.schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build())
+ .options(options)
+ .comment(comment)
+ .partitionKeys(partitionKeys);
+ if (tableDistribution != null) {
+ tableBuilder.distribution(tableDistribution);
+ }
+ return new ResolvedCatalogTable(tableBuilder.build(), resolvedSchema);
+ }
+
+ private static ResolvedCatalogView createResolvedView(
+ ResolvedSchema resolvedSchema,
+ String originalQuery,
+ String expandedQuery,
+ String comment) {
+ return new ResolvedCatalogView(
+ CatalogView.of(
+
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
+ comment,
+ originalQuery,
+ expandedQuery,
+ Collections.emptyMap()),
+ resolvedSchema);
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/test/program/TableTestProgramRunnerTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/test/program/TableTestProgramRunnerTest.java
index a0d114c1178..f5b46b6eb6e 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/test/program/TableTestProgramRunnerTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/test/program/TableTestProgramRunnerTest.java
@@ -120,7 +120,8 @@ public class TableTestProgramRunnerTest {
.isEqualTo(
"CREATE TABLE
`default_catalog`.`default_database`.`MyTableSource` (\n"
+ " `i` INT\n"
- + ") WITH (\n"
+ + ")\n"
+ + "WITH (\n"
+ " 'connector' = 'datagen',\n"
+ " 'number-of-rows' = '3'\n"
+ ")\n");
@@ -128,7 +129,8 @@ public class TableTestProgramRunnerTest {
.isEqualTo(
"CREATE TABLE
`default_catalog`.`default_database`.`MyTableSink` (\n"
+ " `i` INT\n"
- + ") WITH (\n"
+ + ")\n"
+ + "WITH (\n"
+ " 'connector' = 'blackhole'\n"
+ ")\n");
}
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
index 200d151596e..ddce04b210f 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
@@ -1043,7 +1043,8 @@ class CatalogTableITCase(isStreamingMode: Boolean)
extends TableITCaseBase {
| `proc` AS PROCTIME(),
| WATERMARK FOR `ts1` AS CAST(TIMESTAMPADD(HOUR, 8, `ts1`) AS
TIMESTAMP(3)),
| CONSTRAINT `test_constraint` PRIMARY KEY (`a`, `b`) NOT ENFORCED
- |) COMMENT 'test show create table statement'
+ |)
+ |COMMENT 'test show create table statement'
|DISTRIBUTED BY (`a`)
|PARTITIONED BY (`b`, `h`)
|WITH (
@@ -1105,7 +1106,8 @@ class CatalogTableITCase(isStreamingMode: Boolean)
extends TableITCaseBase {
| `proc` AS PROCTIME(),
| WATERMARK FOR `ts1` AS CAST(TIMESTAMPADD(HOUR, 8, `ts1`) AS
TIMESTAMP(3)),
| CONSTRAINT `test_constraint` PRIMARY KEY (`pk1`, `pk2`) NOT ENFORCED
- |) COMMENT 'test show create table statement'
+ |)
+ |COMMENT 'test show create table statement'
|DISTRIBUTED INTO 5 BUCKETS
|PARTITIONED BY (`h`)
|WITH (
@@ -1140,7 +1142,8 @@ class CatalogTableITCase(isStreamingMode: Boolean)
extends TableITCaseBase {
| `a` BIGINT NOT NULL,
| `h` VARCHAR(2147483647),
| `b` VARCHAR(2147483647) NOT NULL
- |) COMMENT 'test show create table statement'
+ |)
+ |COMMENT 'test show create table statement'
|DISTRIBUTED BY RANGE(`a`) INTO 7 BUCKETS
|PARTITIONED BY (`b`, `h`)
|WITH (
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogViewITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogViewITCase.scala
index de9b61ea5da..b4c87359782 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogViewITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogViewITCase.scala
@@ -348,9 +348,14 @@ class CatalogViewITCase(isStreamingMode: Boolean) extends
TableITCaseBase {
tView1ShowCreateResult,
Lists.newArrayList(
Row.of(
- s"""CREATE TEMPORARY VIEW
`default_catalog`.`default_database`.`t_v1`(`a`, `b`, `c`) as
- |SELECT `T1`.`a`, `T1`.`b`, `T1`.`c`
- |FROM `default_catalog`.`default_database`.`T1`""".stripMargin
+ s"""CREATE TEMPORARY VIEW
`default_catalog`.`default_database`.`t_v1` (
+ | `a`,
+ | `b`,
+ | `c`
+ |)
+ |AS SELECT `T1`.`a`, `T1`.`b`, `T1`.`c`
+ |FROM `default_catalog`.`default_database`.`T1`
+ |""".stripMargin
)
)
)
@@ -366,9 +371,14 @@ class CatalogViewITCase(isStreamingMode: Boolean) extends
TableITCaseBase {
tView2ShowCreateResult,
Lists.newArrayList(
Row.of(
- s"""CREATE TEMPORARY VIEW
`default_catalog`.`default_database`.`t_v2`(`d`, `e`, `f`) as
- |SELECT `T1`.`a`, `T1`.`b`, `T1`.`c`
- |FROM `default_catalog`.`default_database`.`T1`""".stripMargin
+ s"""CREATE TEMPORARY VIEW
`default_catalog`.`default_database`.`t_v2` (
+ | `d`,
+ | `e`,
+ | `f`
+ |)
+ |AS SELECT `T1`.`a`, `T1`.`b`, `T1`.`c`
+ |FROM `default_catalog`.`default_database`.`T1`
+ |""".stripMargin
)
)
)
@@ -390,9 +400,14 @@ class CatalogViewITCase(isStreamingMode: Boolean) extends
TableITCaseBase {
view1ShowCreateResult,
Lists.newArrayList(
Row.of(
- s"""CREATE VIEW `default_catalog`.`default_database`.`v1`(`a`, `b`,
`c`) as
- |SELECT `T1`.`a`, `T1`.`b`, `T1`.`c`
- |FROM `default_catalog`.`default_database`.`T1`""".stripMargin
+ s"""CREATE VIEW `default_catalog`.`default_database`.`v1` (
+ | `a`,
+ | `b`,
+ | `c`
+ |)
+ |AS SELECT `T1`.`a`, `T1`.`b`, `T1`.`c`
+ |FROM `default_catalog`.`default_database`.`T1`
+ |""".stripMargin
)
)
)
@@ -408,9 +423,14 @@ class CatalogViewITCase(isStreamingMode: Boolean) extends
TableITCaseBase {
view2ShowCreateResult,
Lists.newArrayList(
Row.of(
- s"""CREATE VIEW `default_catalog`.`default_database`.`v2`(`x`, `y`,
`z`) as
- |SELECT `T1`.`a`, `T1`.`b`, `T1`.`c`
- |FROM `default_catalog`.`default_database`.`T1`""".stripMargin
+ s"""CREATE VIEW `default_catalog`.`default_database`.`v2` (
+ | `x`,
+ | `y`,
+ | `z`
+ |)
+ |AS SELECT `T1`.`a`, `T1`.`b`, `T1`.`c`
+ |FROM `default_catalog`.`default_database`.`T1`
+ |""".stripMargin
)
)
)
@@ -435,10 +455,13 @@ class CatalogViewITCase(isStreamingMode: Boolean) extends
TableITCaseBase {
showCreateLeftJoinGroupByViewResult,
Lists.newArrayList(
Row.of(
- s"""CREATE VIEW
`default_catalog`.`default_database`.`viewLeftJoinGroupBy`(`max_value`) as
- |SELECT MAX(`t1`.`a`) AS `max_value`
+ s"""CREATE VIEW
`default_catalog`.`default_database`.`viewLeftJoinGroupBy` (
+ | `max_value`
+ |)
+ |AS SELECT MAX(`t1`.`a`) AS `max_value`
|FROM `default_catalog`.`default_database`.`t1`
- |LEFT JOIN `default_catalog`.`default_database`.`t2` ON `t1`.`c`
= `t2`.`c`""".stripMargin
+ |LEFT JOIN `default_catalog`.`default_database`.`t2` ON `t1`.`c`
= `t2`.`c`
+ |""".stripMargin
)
)
)
@@ -468,10 +491,15 @@ class CatalogViewITCase(isStreamingMode: Boolean) extends
TableITCaseBase {
showCreateCrossJoinViewResult,
Lists.newArrayList(
Row.of(
- s"""CREATE VIEW
`default_catalog`.`default_database`.`viewWithCrossJoin`(`a`, `a1`, `b2`) as
- |SELECT `udfEqualsOne`() AS `a`, `t1`.`a` AS `a1`, `t2`.`b` AS
`b2`
+ s"""CREATE VIEW
`default_catalog`.`default_database`.`viewWithCrossJoin` (
+ | `a`,
+ | `a1`,
+ | `b2`
+ |)
+ |AS SELECT `udfEqualsOne`() AS `a`, `t1`.`a` AS `a1`, `t2`.`b` AS
`b2`
|FROM `default_catalog`.`default_database`.`t1`
- |CROSS JOIN
`default_catalog`.`default_database`.`t2`""".stripMargin
+ |CROSS JOIN `default_catalog`.`default_database`.`t2`
+ |""".stripMargin
)
)
)
@@ -497,10 +525,14 @@ class CatalogViewITCase(isStreamingMode: Boolean) extends
TableITCaseBase {
showCreateInnerJoinViewResult,
Lists.newArrayList(
Row.of(
- s"""CREATE VIEW
`default_catalog`.`default_database`.`innerJoinView`(`a1`, `b2`) as
- |SELECT `t1`.`a` AS `a1`, `t2`.`b` AS `b2`
+ s"""CREATE VIEW `default_catalog`.`default_database`.`innerJoinView`
(
+ | `a1`,
+ | `b2`
+ |)
+ |AS SELECT `t1`.`a` AS `a1`, `t2`.`b` AS `b2`
|FROM `default_catalog`.`default_database`.`t1`
- |INNER JOIN `default_catalog`.`default_database`.`t2` ON `t1`.`c`
= `t2`.`c`""".stripMargin
+ |INNER JOIN `default_catalog`.`default_database`.`t2` ON `t1`.`c`
= `t2`.`c`
+ |""".stripMargin
)
)
)