wuchong commented on a change in pull request #13011: URL: https://github.com/apache/flink/pull/13011#discussion_r465552827
########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/constraints/UniqueConstraint.java ########## @@ -75,21 +76,26 @@ public ConstraintType getType() { */ @Override public final String asSummaryString() { - final String typeString; - switch (getType()) { - case PRIMARY_KEY: - typeString = "PRIMARY KEY"; - break; - case UNIQUE_KEY: - typeString = "UNIQUE"; - break; - default: - throw new IllegalStateException("Unknown key type: " + getType()); - } - + final String typeString = getTypeString(); return String.format("CONSTRAINT %s %s (%s)", getName(), typeString, String.join(", ", columns)); } + /** + * Returns constraint's canonical summary. All constraints summary will be formatted as + * <pre> + * CONSTRAINT [constraint-name] [constraint-type] ([constraint-definition]) NOT ENFORCED + * + * E.g CONSTRAINT pk PRIMARY KEY (`f0`, `f1`) NOT ENFORCED + * </pre> + */ + public final String asCanonicalString() { + final String typeString = getTypeString(); + return String.format("CONSTRAINT %s %s (%s) NOT ENFORCED", + getName(), + typeString, + String.join(", ", columns.stream().map(col -> String.format("`%s`", col)).collect(Collectors.toList()))); Review comment: Use `EncodingUtils#escapeIdentifier` to escape identifiers. ########## File path: docs/content/docs/dev/table/sql/show.md ########## @@ -427,6 +458,17 @@ SHOW TABLES Show all tables in the current catalog and the current database. + +## SHOW CREATE TABLE + +```sql +SHOW CREATE TABLE +``` + +Show create table statement for specified table. + +<span class="label label-danger">Attention</span> Currently `SHOW CREATE TABLE` only supports table that is created by Flink SQL. Review comment: minor: ```suggestion <span class="label label-danger">Attention</span> Currently `SHOW CREATE TABLE` only supports table that is created by Flink SQL DDL. ``` ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/constraints/UniqueConstraint.java ########## @@ -75,21 +76,26 @@ public ConstraintType getType() { */ @Override public final String asSummaryString() { - final String typeString; - switch (getType()) { - case PRIMARY_KEY: - typeString = "PRIMARY KEY"; - break; - case UNIQUE_KEY: - typeString = "UNIQUE"; - break; - default: - throw new IllegalStateException("Unknown key type: " + getType()); - } - + final String typeString = getTypeString(); return String.format("CONSTRAINT %s %s (%s)", getName(), typeString, String.join(", ", columns)); Review comment: I think we should also add `NOT ENFORCED` for the summary string. Currentlly, it is not correct. Besides, we should add `NOT ENFORCED` according the underlying `enforced` flag, even though it is always flase for now. ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java ########## @@ -1072,6 +1086,75 @@ private TableResult buildShowResult(String columnName, String[] objects) { Arrays.stream(objects).map((c) -> new String[]{c}).toArray(String[][]::new)); } + private TableResult buildShowCreateTableResult(CatalogBaseTable table, ObjectIdentifier sqlIdentifier) { Review comment: I think this must be a `CatalogTable`, because this is `SHOW CREATE TABLE`. We should throw exception before this if it is a view. ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java ########## @@ -1015,6 +1018,17 @@ private TableResult executeOperation(Operation operation) { return buildShowResult("database name", listDatabases()); } else if (operation instanceof ShowCurrentDatabaseOperation) { return buildShowResult("current database name", new String[]{catalogManager.getCurrentDatabase()}); + } else if (operation instanceof ShowCreateTableOperation) { + ShowCreateTableOperation showCreateTableOperation = (ShowCreateTableOperation) operation; + Optional<CatalogManager.TableLookupResult> result = + catalogManager.getTable(showCreateTableOperation.getSqlIdentifier()); + if (result.isPresent()) { + return buildShowCreateTableResult(result.get().getTable(), ((ShowCreateTableOperation) operation).getSqlIdentifier()); Review comment: ```suggestion return buildShowCreateTableResult(result.get().getTable(), showCreateTableOperation.getSqlIdentifier()); ``` ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/constraints/UniqueConstraint.java ########## @@ -75,21 +76,26 @@ public ConstraintType getType() { */ @Override public final String asSummaryString() { - final String typeString; - switch (getType()) { - case PRIMARY_KEY: - typeString = "PRIMARY KEY"; - break; - case UNIQUE_KEY: - typeString = "UNIQUE"; - break; - default: - throw new IllegalStateException("Unknown key type: " + getType()); - } - + final String typeString = getTypeString(); return String.format("CONSTRAINT %s %s (%s)", getName(), typeString, String.join(", ", columns)); } + /** + * Returns constraint's canonical summary. All constraints summary will be formatted as + * <pre> + * CONSTRAINT [constraint-name] [constraint-type] ([constraint-definition]) NOT ENFORCED + * + * E.g CONSTRAINT pk PRIMARY KEY (`f0`, `f1`) NOT ENFORCED + * </pre> + */ + public final String asCanonicalString() { + final String typeString = getTypeString(); + return String.format("CONSTRAINT %s %s (%s) NOT ENFORCED", Review comment: We should add `NOT ENFORCED` according the underlying `enforced` flag, even though it is always flase for now. ########## File path: flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl ########## @@ -408,6 +408,22 @@ SqlShowTables SqlShowTables() : } } +/** +* Parse a "Show Create Table" query command. +*/ +SqlShowCreateTable SqlShowCreateTable() : Review comment: ```suggestion SqlShowCreateTable SqlShowCreateTable() : ``` ########## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala ########## @@ -989,6 +989,30 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends AbstractTestBase { assertEquals(false, tableSchema2.getPrimaryKey.isPresent) } + @Test + def testCreateTableAndShowCreateTable(): Unit = { + val createDDL = + """ |CREATE TABLE `TBL1` ( Review comment: 1. Add a test for DDL statement without quoted and upper cased. 2. Add a test for UDFs as computed columns and complex types (e.g. ARRAY, MAP, ROW). ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/constraints/UniqueConstraint.java ########## @@ -75,21 +76,26 @@ public ConstraintType getType() { */ @Override public final String asSummaryString() { - final String typeString; - switch (getType()) { - case PRIMARY_KEY: - typeString = "PRIMARY KEY"; - break; - case UNIQUE_KEY: - typeString = "UNIQUE"; - break; - default: - throw new IllegalStateException("Unknown key type: " + getType()); - } - + final String typeString = getTypeString(); return String.format("CONSTRAINT %s %s (%s)", getName(), typeString, String.join(", ", columns)); } + /** + * Returns constraint's canonical summary. All constraints summary will be formatted as + * <pre> + * CONSTRAINT [constraint-name] [constraint-type] ([constraint-definition]) NOT ENFORCED + * + * E.g CONSTRAINT pk PRIMARY KEY (`f0`, `f1`) NOT ENFORCED + * </pre> + */ + public final String asCanonicalString() { Review comment: I would suggest to call it `asSerializableString`, keep align with others, e.g. `ObjectIdentifier#asSerializableString`. ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateTableOperation.java ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.table.catalog.ObjectIdentifier; + +/** + * Operation to describe a SHOW CREATE TABLE statement. + * */ +public class ShowCreateTableOperation implements ShowOperation{ Review comment: ```suggestion public class ShowCreateTableOperation implements ShowOperation { ``` ########## File path: flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java ########## @@ -987,19 +987,21 @@ public void testCreateTable() throws Exception { final Executor executor = createDefaultExecutor(clusterClient); final SessionContext session = new SessionContext("test-session", new Environment()); String sessionId = executor.openSession(session); - final String ddlTemplate = "create table %s(\n" + - " a int,\n" + - " b bigint,\n" + - " c varchar\n" + - ") with (\n" + - " 'connector.type'='filesystem',\n" + - " 'format.type'='csv',\n" + - " 'connector.path'='xxx'\n" + - ")\n"; + final String ddlTemplate = "CREATE TABLE %s (\n" + + " `a` INT,\n" + + " `b` BIGINT,\n" + + " `c` STRING\n" + + ") WITH (\n" + + " 'connector.type' = 'filesystem',\n" + + " 'connector.path' = 'xxx',\n" + + " 'format.type' = 'csv'\n" + + ")\n"; try { // Test create table with simple name. executor.executeSql(sessionId, "use catalog catalog1"); executor.executeSql(sessionId, String.format(ddlTemplate, "MyTable1")); + assertShowResult(executor.executeSql(sessionId, String.format("show create table %s", "MyTable1")), + Arrays.asList(String.format(ddlTemplate, "`MyTable1`"))); Review comment: ```suggestion Collections.singletonList(String.format(ddlTemplate, "`MyTable1`"))); ``` ########## File path: flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl ########## @@ -252,6 +252,22 @@ SqlShowTables SqlShowTables() : } } +/** +* Parse a "Show Create Table" query command. +*/ +SqlShowCreateTable SqlShowCreateTable() : +{ + SqlIdentifier tableName; + SqlParserPos pos; +} +{ + <SHOW> <CREATE> <TABLE> { pos = getPos();} Review comment: For Hive dialect, I don't think we should print the same result for SHOW CREATE TABLE. Hive also provides a `SHOW CREATE TABLE` statement which should print the Hive DDL. So I would suggest we can postpone SHOW CREATE TABLE for hive to another issue. ########## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala ########## @@ -812,6 +823,79 @@ abstract class TableEnvImpl( buildResult(Array(columnName), Array(DataTypes.STRING), rows) } + private def buildShowCreateResult( Review comment: You can reuse the show create table util in flink-table-api-java. ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java ########## @@ -1293,6 +1328,121 @@ private TableResult buildShowResult(String columnName, String[] objects) { Arrays.stream(objects).map((c) -> new String[] {c}).toArray(String[][]::new)); } + private String buildShowCreateTableRow( + ResolvedCatalogBaseTable<?> table, + ObjectIdentifier sqlIdentifier, + boolean isTemporary) { + CatalogBaseTable.TableKind kind = table.getTableKind(); + if (kind == CatalogBaseTable.TableKind.VIEW) { + throw new TableException( + String.format( + "SHOW CREATE TABLE does not support showing CREATE VIEW statement with identifier %s.", + sqlIdentifier.asSerializableString())); + } + StringBuilder sb = + new StringBuilder( + String.format( + "CREATE %sTABLE %s (\n", + isTemporary ? "TEMPORARY " : "", + sqlIdentifier.asSerializableString())); + ResolvedSchema schema = table.getResolvedSchema(); + // append columns + sb.append( + schema.getColumns().stream() + .map(column -> String.format("%s%s", printIndent, getColumnString(column))) + .collect(Collectors.joining(",\n"))); + // append watermark spec + if (!schema.getWatermarkSpecs().isEmpty()) { + sb.append(",\n"); + sb.append( + schema.getWatermarkSpecs().stream() + .map( + watermarkSpec -> + String.format( + "%sWATERMARK FOR %s AS %s", + printIndent, + String.join( + ".", Review comment: We don't need this `.`, right? ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java ########## @@ -1293,6 +1328,121 @@ private TableResult buildShowResult(String columnName, String[] objects) { Arrays.stream(objects).map((c) -> new String[] {c}).toArray(String[][]::new)); } + private String buildShowCreateTableRow( + ResolvedCatalogBaseTable<?> table, + ObjectIdentifier sqlIdentifier, + boolean isTemporary) { + CatalogBaseTable.TableKind kind = table.getTableKind(); + if (kind == CatalogBaseTable.TableKind.VIEW) { + throw new TableException( + String.format( + "SHOW CREATE TABLE does not support showing CREATE VIEW statement with identifier %s.", + sqlIdentifier.asSerializableString())); + } + StringBuilder sb = + new StringBuilder( + String.format( + "CREATE %sTABLE %s (\n", + isTemporary ? "TEMPORARY " : "", + sqlIdentifier.asSerializableString())); + ResolvedSchema schema = table.getResolvedSchema(); + // append columns + sb.append( + schema.getColumns().stream() + .map(column -> String.format("%s%s", printIndent, getColumnString(column))) + .collect(Collectors.joining(",\n"))); + // append watermark spec + if (!schema.getWatermarkSpecs().isEmpty()) { + sb.append(",\n"); + sb.append( + schema.getWatermarkSpecs().stream() + .map( + watermarkSpec -> + String.format( + "%sWATERMARK FOR %s AS %s", + printIndent, + String.join( + ".", + EncodingUtils.escapeIdentifier( + watermarkSpec + .getRowtimeAttribute())), + watermarkSpec + .getWatermarkExpression() + .asSummaryString())) Review comment: Use `.asSerializableString()` which will escapes udf expressions. ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java ########## @@ -1293,6 +1328,121 @@ private TableResult buildShowResult(String columnName, String[] objects) { Arrays.stream(objects).map((c) -> new String[] {c}).toArray(String[][]::new)); } + private String buildShowCreateTableRow( + ResolvedCatalogBaseTable<?> table, + ObjectIdentifier sqlIdentifier, + boolean isTemporary) { + CatalogBaseTable.TableKind kind = table.getTableKind(); + if (kind == CatalogBaseTable.TableKind.VIEW) { + throw new TableException( + String.format( + "SHOW CREATE TABLE does not support showing CREATE VIEW statement with identifier %s.", + sqlIdentifier.asSerializableString())); + } + StringBuilder sb = + new StringBuilder( + String.format( + "CREATE %sTABLE %s (\n", + isTemporary ? "TEMPORARY " : "", + sqlIdentifier.asSerializableString())); + ResolvedSchema schema = table.getResolvedSchema(); + // append columns + sb.append( + schema.getColumns().stream() + .map(column -> String.format("%s%s", printIndent, getColumnString(column))) + .collect(Collectors.joining(",\n"))); + // append watermark spec + if (!schema.getWatermarkSpecs().isEmpty()) { + sb.append(",\n"); + sb.append( + schema.getWatermarkSpecs().stream() + .map( + watermarkSpec -> + String.format( + "%sWATERMARK FOR %s AS %s", + printIndent, + String.join( + ".", + EncodingUtils.escapeIdentifier( + watermarkSpec + .getRowtimeAttribute())), + watermarkSpec + .getWatermarkExpression() + .asSummaryString())) + .collect(Collectors.joining("\n"))); + } + // append constraint + if (schema.getPrimaryKey().isPresent()) { + sb.append(",\n"); + sb.append(String.format("%s%s", printIndent, schema.getPrimaryKey().get())); + } + sb.append("\n) "); + // append comment + String comment = table.getComment(); + if (StringUtils.isNotEmpty(comment)) { + sb.append(String.format("COMMENT '%s'\n", comment)); + } + // append partitions + ResolvedCatalogTable catalogTable = (ResolvedCatalogTable) table; + if (catalogTable.isPartitioned()) { + sb.append("PARTITIONED BY (") + .append( + catalogTable.getPartitionKeys().stream() + .map(key -> String.format("`%s`", key)) Review comment: Use `EncodingUtils.escapeIdentifier` to escape which can handle "`" in field names. ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java ########## @@ -178,6 +185,7 @@ private final ModuleManager moduleManager; private final OperationTreeBuilder operationTreeBuilder; private final List<ModifyOperation> bufferedModifyOperations = new ArrayList<>(); + private final String printIndent = " "; Review comment: nit: can be a local variable in `buildShowCreateTableRow` method? ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java ########## @@ -1293,6 +1328,121 @@ private TableResult buildShowResult(String columnName, String[] objects) { Arrays.stream(objects).map((c) -> new String[] {c}).toArray(String[][]::new)); } + private String buildShowCreateTableRow( + ResolvedCatalogBaseTable<?> table, + ObjectIdentifier sqlIdentifier, + boolean isTemporary) { + CatalogBaseTable.TableKind kind = table.getTableKind(); + if (kind == CatalogBaseTable.TableKind.VIEW) { + throw new TableException( + String.format( + "SHOW CREATE TABLE does not support showing CREATE VIEW statement with identifier %s.", + sqlIdentifier.asSerializableString())); + } + StringBuilder sb = + new StringBuilder( + String.format( + "CREATE %sTABLE %s (\n", + isTemporary ? "TEMPORARY " : "", + sqlIdentifier.asSerializableString())); + ResolvedSchema schema = table.getResolvedSchema(); + // append columns + sb.append( + schema.getColumns().stream() + .map(column -> String.format("%s%s", printIndent, getColumnString(column))) + .collect(Collectors.joining(",\n"))); + // append watermark spec + if (!schema.getWatermarkSpecs().isEmpty()) { + sb.append(",\n"); + sb.append( + schema.getWatermarkSpecs().stream() + .map( + watermarkSpec -> + String.format( + "%sWATERMARK FOR %s AS %s", + printIndent, + String.join( + ".", + EncodingUtils.escapeIdentifier( + watermarkSpec + .getRowtimeAttribute())), + watermarkSpec + .getWatermarkExpression() + .asSummaryString())) + .collect(Collectors.joining("\n"))); + } + // append constraint + if (schema.getPrimaryKey().isPresent()) { + sb.append(",\n"); + sb.append(String.format("%s%s", printIndent, schema.getPrimaryKey().get())); + } + sb.append("\n) "); + // append comment + String comment = table.getComment(); + if (StringUtils.isNotEmpty(comment)) { + sb.append(String.format("COMMENT '%s'\n", comment)); + } + // append partitions + ResolvedCatalogTable catalogTable = (ResolvedCatalogTable) table; + if (catalogTable.isPartitioned()) { + sb.append("PARTITIONED BY (") + .append( + catalogTable.getPartitionKeys().stream() + .map(key -> String.format("`%s`", key)) + .collect(Collectors.joining(", "))) + .append(")\n"); + } + // append `with` properties + Map<String, String> options = table.getOptions(); + sb.append("WITH (\n") + .append( + options.entrySet().stream() + .map( + entry -> + String.format( + "%s'%s' = '%s'", + printIndent, + entry.getKey(), + entry.getValue())) + .collect(Collectors.joining(",\n"))) + .append("\n)\n"); + return sb.toString(); + } + + private String getColumnString(Column column) { + final StringBuilder sb = new StringBuilder(); + sb.append(EncodingUtils.escapeIdentifier(column.getName())); + sb.append(" "); + // skip data type for computed column + if (column instanceof Column.ComputedColumn) { + sb.append( + column.explainExtras() + .orElseThrow( + () -> + new TableException( + String.format( + "Column expression can not be null for computed column '%s'", + column.getName())))); + } else { + DataType dataType = column.getDataType(); + String type = dataType.toString(); Review comment: Use `dataType#asSerializableString` instead of `toString`/`asSummaryString`. `asSerializableString` doesn't show timestamp kind and we don't need the following special logic then. Actually, we should always use `asSerializableString`, rather than `asSummaryString`, because only the serialized string is fully supported to be parsed, see `LogicalTypeParser`. Besides, it would be better to avoid using `toString` and explicitly use `asSerializableString`, otherwise, it's hard to trace where the `asSerializableString`/`asSummaryString` are used. ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/UniqueConstraint.java ########## @@ -56,6 +56,17 @@ private UniqueConstraint( return columns; } + private final String getTypeString() { Review comment: nit: don't need `final` for private method. ########## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala ########## @@ -985,6 +984,83 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends AbstractTestBase { assertEquals(false, tableSchema2.getPrimaryKey.isPresent) } + @Test + def testCreateTableAndShowCreateTable(): Unit = { + val executedDDL = + """ + |create temporary table TBL1 ( + | a bigint not null, Review comment: Add some special field names, e.g. ``` `__source__` varchar(255), ```myname``` timestamp_ltz(3) metadata from 'timestamp', ``` ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java ########## @@ -1072,6 +1086,75 @@ private TableResult buildShowResult(String columnName, String[] objects) { Arrays.stream(objects).map((c) -> new String[]{c}).toArray(String[][]::new)); } + private TableResult buildShowCreateTableResult(CatalogBaseTable table, ObjectIdentifier sqlIdentifier) { Review comment: Personally, I don't like to add more and more methods in the `TableEnvironmentImpl`, this makes the class fat and hard to maintain. I think such methods can be utilities in a separate class, e.g. `ShowStatementUtils` ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateTableOperation.java ########## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.table.catalog.ObjectIdentifier; + +/** Operation to describe a SHOW CREATE TABLE statement. */ +public class ShowCreateTableOperation implements ShowOperation { + + private final ObjectIdentifier sqlIdentifier; Review comment: Have a meaningful name here, e.g. `tableIdentifier`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org