wuchong commented on a change in pull request #13011:
URL: https://github.com/apache/flink/pull/13011#discussion_r465552827



##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/constraints/UniqueConstraint.java
##########
@@ -75,21 +76,26 @@ public ConstraintType getType() {
         */
        @Override
        public final String asSummaryString() {
-               final String typeString;
-               switch (getType()) {
-                       case PRIMARY_KEY:
-                               typeString = "PRIMARY KEY";
-                               break;
-                       case UNIQUE_KEY:
-                               typeString = "UNIQUE";
-                               break;
-                       default:
-                               throw new IllegalStateException("Unknown key 
type: " + getType());
-               }
-
+               final String typeString = getTypeString();
                return String.format("CONSTRAINT %s %s (%s)", getName(), 
typeString, String.join(", ", columns));
        }
 
+       /**
+        * Returns constraint's canonical summary. All constraints summary will 
be formatted as
+        * <pre>
+        * CONSTRAINT [constraint-name] [constraint-type] 
([constraint-definition]) NOT ENFORCED
+        *
+        * E.g CONSTRAINT pk PRIMARY KEY (`f0`, `f1`) NOT ENFORCED
+        * </pre>
+        */
+       public final String asCanonicalString() {
+               final String typeString = getTypeString();
+               return String.format("CONSTRAINT %s %s (%s) NOT ENFORCED",
+                       getName(),
+                       typeString,
+                       String.join(", ", columns.stream().map(col -> 
String.format("`%s`", col)).collect(Collectors.toList())));

Review comment:
       Use `EncodingUtils#escapeIdentifier` to escape identifiers. 

##########
File path: docs/content/docs/dev/table/sql/show.md
##########
@@ -427,6 +458,17 @@ SHOW TABLES
 
 Show all tables in the current catalog and the current database.
 
+
+## SHOW CREATE TABLE
+
+```sql
+SHOW CREATE TABLE
+```
+
+Show create table statement for specified table.
+
+<span class="label label-danger">Attention</span> Currently `SHOW CREATE 
TABLE` only supports table that is created by Flink SQL. 

Review comment:
       minor:
   
   ```suggestion
   <span class="label label-danger">Attention</span> Currently `SHOW CREATE 
TABLE` only supports table that is created by Flink SQL DDL. 
   ```

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/constraints/UniqueConstraint.java
##########
@@ -75,21 +76,26 @@ public ConstraintType getType() {
         */
        @Override
        public final String asSummaryString() {
-               final String typeString;
-               switch (getType()) {
-                       case PRIMARY_KEY:
-                               typeString = "PRIMARY KEY";
-                               break;
-                       case UNIQUE_KEY:
-                               typeString = "UNIQUE";
-                               break;
-                       default:
-                               throw new IllegalStateException("Unknown key 
type: " + getType());
-               }
-
+               final String typeString = getTypeString();
                return String.format("CONSTRAINT %s %s (%s)", getName(), 
typeString, String.join(", ", columns));

Review comment:
       I think we should also add `NOT ENFORCED` for the summary string. 
Currentlly, it is not correct. 
   Besides, we should add `NOT ENFORCED` according the underlying `enforced` 
flag, even though it is always flase for now. 

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -1072,6 +1086,75 @@ private TableResult buildShowResult(String columnName, 
String[] objects) {
                        Arrays.stream(objects).map((c) -> new 
String[]{c}).toArray(String[][]::new));
        }
 
+       private TableResult buildShowCreateTableResult(CatalogBaseTable table, 
ObjectIdentifier sqlIdentifier) {

Review comment:
       I think this must be a `CatalogTable`, because this is `SHOW CREATE 
TABLE`.  We should throw exception before this if it is a view. 

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -1015,6 +1018,17 @@ private TableResult executeOperation(Operation 
operation) {
                        return buildShowResult("database name", 
listDatabases());
                } else if (operation instanceof ShowCurrentDatabaseOperation) {
                        return buildShowResult("current database name", new 
String[]{catalogManager.getCurrentDatabase()});
+               } else if (operation instanceof ShowCreateTableOperation) {
+                       ShowCreateTableOperation showCreateTableOperation = 
(ShowCreateTableOperation) operation;
+                       Optional<CatalogManager.TableLookupResult> result =
+                               
catalogManager.getTable(showCreateTableOperation.getSqlIdentifier());
+                       if (result.isPresent()) {
+                               return 
buildShowCreateTableResult(result.get().getTable(), ((ShowCreateTableOperation) 
operation).getSqlIdentifier());

Review comment:
       ```suggestion
                                return 
buildShowCreateTableResult(result.get().getTable(), 
showCreateTableOperation.getSqlIdentifier());
   ```

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/constraints/UniqueConstraint.java
##########
@@ -75,21 +76,26 @@ public ConstraintType getType() {
         */
        @Override
        public final String asSummaryString() {
-               final String typeString;
-               switch (getType()) {
-                       case PRIMARY_KEY:
-                               typeString = "PRIMARY KEY";
-                               break;
-                       case UNIQUE_KEY:
-                               typeString = "UNIQUE";
-                               break;
-                       default:
-                               throw new IllegalStateException("Unknown key 
type: " + getType());
-               }
-
+               final String typeString = getTypeString();
                return String.format("CONSTRAINT %s %s (%s)", getName(), 
typeString, String.join(", ", columns));
        }
 
+       /**
+        * Returns constraint's canonical summary. All constraints summary will 
be formatted as
+        * <pre>
+        * CONSTRAINT [constraint-name] [constraint-type] 
([constraint-definition]) NOT ENFORCED
+        *
+        * E.g CONSTRAINT pk PRIMARY KEY (`f0`, `f1`) NOT ENFORCED
+        * </pre>
+        */
+       public final String asCanonicalString() {
+               final String typeString = getTypeString();
+               return String.format("CONSTRAINT %s %s (%s) NOT ENFORCED",

Review comment:
       We should add `NOT ENFORCED` according the underlying `enforced` flag, 
even though it is always flase for now. 

##########
File path: 
flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
##########
@@ -408,6 +408,22 @@ SqlShowTables SqlShowTables() :
     }
 }
 
+/**
+* Parse a "Show Create Table" query command.
+*/
+SqlShowCreateTable  SqlShowCreateTable() :

Review comment:
       ```suggestion
   SqlShowCreateTable SqlShowCreateTable() :
   ```

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
##########
@@ -989,6 +989,30 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends 
AbstractTestBase {
     assertEquals(false, tableSchema2.getPrimaryKey.isPresent)
   }
 
+  @Test
+  def testCreateTableAndShowCreateTable(): Unit = {
+    val createDDL =
+    """ |CREATE TABLE `TBL1` (

Review comment:
       1. Add a test for DDL statement without quoted and upper cased. 
   2. Add a test for UDFs as computed columns and complex types (e.g. ARRAY, 
MAP, ROW).

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/constraints/UniqueConstraint.java
##########
@@ -75,21 +76,26 @@ public ConstraintType getType() {
         */
        @Override
        public final String asSummaryString() {
-               final String typeString;
-               switch (getType()) {
-                       case PRIMARY_KEY:
-                               typeString = "PRIMARY KEY";
-                               break;
-                       case UNIQUE_KEY:
-                               typeString = "UNIQUE";
-                               break;
-                       default:
-                               throw new IllegalStateException("Unknown key 
type: " + getType());
-               }
-
+               final String typeString = getTypeString();
                return String.format("CONSTRAINT %s %s (%s)", getName(), 
typeString, String.join(", ", columns));
        }
 
+       /**
+        * Returns constraint's canonical summary. All constraints summary will 
be formatted as
+        * <pre>
+        * CONSTRAINT [constraint-name] [constraint-type] 
([constraint-definition]) NOT ENFORCED
+        *
+        * E.g CONSTRAINT pk PRIMARY KEY (`f0`, `f1`) NOT ENFORCED
+        * </pre>
+        */
+       public final String asCanonicalString() {

Review comment:
       I would suggest to call it `asSerializableString`,  keep align with 
others, e.g. `ObjectIdentifier#asSerializableString`.

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateTableOperation.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+/**
+ * Operation to describe a SHOW CREATE TABLE statement.
+ * */
+public class ShowCreateTableOperation implements ShowOperation{

Review comment:
       ```suggestion
   public class ShowCreateTableOperation implements ShowOperation {
   ```

##########
File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
##########
@@ -987,19 +987,21 @@ public void testCreateTable() throws Exception {
                final Executor executor = createDefaultExecutor(clusterClient);
                final SessionContext session = new 
SessionContext("test-session", new Environment());
                String sessionId = executor.openSession(session);
-               final String ddlTemplate = "create table %s(\n" +
-                               "  a int,\n" +
-                               "  b bigint,\n" +
-                               "  c varchar\n" +
-                               ") with (\n" +
-                               "  'connector.type'='filesystem',\n" +
-                               "  'format.type'='csv',\n" +
-                               "  'connector.path'='xxx'\n" +
-                               ")\n";
+               final String ddlTemplate = "CREATE TABLE %s (\n" +
+                               "  `a` INT,\n" +
+                               "  `b` BIGINT,\n" +
+                               "  `c` STRING\n" +
+                               ") WITH (\n" +
+                               "  'connector.type' = 'filesystem',\n" +
+                               "  'connector.path' = 'xxx',\n" +
+                               "  'format.type' = 'csv'\n" +
+                       ")\n";
                try {
                        // Test create table with simple name.
                        executor.executeSql(sessionId, "use catalog catalog1");
                        executor.executeSql(sessionId, 
String.format(ddlTemplate, "MyTable1"));
+                       assertShowResult(executor.executeSql(sessionId, 
String.format("show create table %s", "MyTable1")),
+                               Arrays.asList(String.format(ddlTemplate, 
"`MyTable1`")));

Review comment:
       ```suggestion
                                
Collections.singletonList(String.format(ddlTemplate, "`MyTable1`")));
   ```

##########
File path: 
flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
##########
@@ -252,6 +252,22 @@ SqlShowTables SqlShowTables() :
     }
 }
 
+/**
+* Parse a "Show Create Table" query command.
+*/
+SqlShowCreateTable  SqlShowCreateTable() :
+{
+    SqlIdentifier tableName;
+    SqlParserPos pos;
+}
+{
+    <SHOW> <CREATE> <TABLE> { pos = getPos();}

Review comment:
       For Hive dialect, I don't think we should print the same result for SHOW 
CREATE TABLE. 
   
   Hive also provides a `SHOW CREATE TABLE` statement which should print the 
Hive DDL. So I would suggest we can postpone SHOW CREATE TABLE for hive to 
another issue.

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
##########
@@ -812,6 +823,79 @@ abstract class TableEnvImpl(
     buildResult(Array(columnName), Array(DataTypes.STRING), rows)
   }
 
+  private def buildShowCreateResult(

Review comment:
       You can reuse the show create table util in flink-table-api-java.

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -1293,6 +1328,121 @@ private TableResult buildShowResult(String columnName, 
String[] objects) {
                 Arrays.stream(objects).map((c) -> new String[] 
{c}).toArray(String[][]::new));
     }
 
+    private String buildShowCreateTableRow(
+            ResolvedCatalogBaseTable<?> table,
+            ObjectIdentifier sqlIdentifier,
+            boolean isTemporary) {
+        CatalogBaseTable.TableKind kind = table.getTableKind();
+        if (kind == CatalogBaseTable.TableKind.VIEW) {
+            throw new TableException(
+                    String.format(
+                            "SHOW CREATE TABLE does not support showing CREATE 
VIEW statement with identifier %s.",
+                            sqlIdentifier.asSerializableString()));
+        }
+        StringBuilder sb =
+                new StringBuilder(
+                        String.format(
+                                "CREATE %sTABLE %s (\n",
+                                isTemporary ? "TEMPORARY " : "",
+                                sqlIdentifier.asSerializableString()));
+        ResolvedSchema schema = table.getResolvedSchema();
+        // append columns
+        sb.append(
+                schema.getColumns().stream()
+                        .map(column -> String.format("%s%s", printIndent, 
getColumnString(column)))
+                        .collect(Collectors.joining(",\n")));
+        // append watermark spec
+        if (!schema.getWatermarkSpecs().isEmpty()) {
+            sb.append(",\n");
+            sb.append(
+                    schema.getWatermarkSpecs().stream()
+                            .map(
+                                    watermarkSpec ->
+                                            String.format(
+                                                    "%sWATERMARK FOR %s AS %s",
+                                                    printIndent,
+                                                    String.join(
+                                                            ".",

Review comment:
       We don't need this `.`, right? 

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -1293,6 +1328,121 @@ private TableResult buildShowResult(String columnName, 
String[] objects) {
                 Arrays.stream(objects).map((c) -> new String[] 
{c}).toArray(String[][]::new));
     }
 
+    private String buildShowCreateTableRow(
+            ResolvedCatalogBaseTable<?> table,
+            ObjectIdentifier sqlIdentifier,
+            boolean isTemporary) {
+        CatalogBaseTable.TableKind kind = table.getTableKind();
+        if (kind == CatalogBaseTable.TableKind.VIEW) {
+            throw new TableException(
+                    String.format(
+                            "SHOW CREATE TABLE does not support showing CREATE 
VIEW statement with identifier %s.",
+                            sqlIdentifier.asSerializableString()));
+        }
+        StringBuilder sb =
+                new StringBuilder(
+                        String.format(
+                                "CREATE %sTABLE %s (\n",
+                                isTemporary ? "TEMPORARY " : "",
+                                sqlIdentifier.asSerializableString()));
+        ResolvedSchema schema = table.getResolvedSchema();
+        // append columns
+        sb.append(
+                schema.getColumns().stream()
+                        .map(column -> String.format("%s%s", printIndent, 
getColumnString(column)))
+                        .collect(Collectors.joining(",\n")));
+        // append watermark spec
+        if (!schema.getWatermarkSpecs().isEmpty()) {
+            sb.append(",\n");
+            sb.append(
+                    schema.getWatermarkSpecs().stream()
+                            .map(
+                                    watermarkSpec ->
+                                            String.format(
+                                                    "%sWATERMARK FOR %s AS %s",
+                                                    printIndent,
+                                                    String.join(
+                                                            ".",
+                                                            
EncodingUtils.escapeIdentifier(
+                                                                    
watermarkSpec
+                                                                            
.getRowtimeAttribute())),
+                                                    watermarkSpec
+                                                            
.getWatermarkExpression()
+                                                            
.asSummaryString()))

Review comment:
       Use `.asSerializableString()` which will escapes udf expressions. 

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -1293,6 +1328,121 @@ private TableResult buildShowResult(String columnName, 
String[] objects) {
                 Arrays.stream(objects).map((c) -> new String[] 
{c}).toArray(String[][]::new));
     }
 
+    private String buildShowCreateTableRow(
+            ResolvedCatalogBaseTable<?> table,
+            ObjectIdentifier sqlIdentifier,
+            boolean isTemporary) {
+        CatalogBaseTable.TableKind kind = table.getTableKind();
+        if (kind == CatalogBaseTable.TableKind.VIEW) {
+            throw new TableException(
+                    String.format(
+                            "SHOW CREATE TABLE does not support showing CREATE 
VIEW statement with identifier %s.",
+                            sqlIdentifier.asSerializableString()));
+        }
+        StringBuilder sb =
+                new StringBuilder(
+                        String.format(
+                                "CREATE %sTABLE %s (\n",
+                                isTemporary ? "TEMPORARY " : "",
+                                sqlIdentifier.asSerializableString()));
+        ResolvedSchema schema = table.getResolvedSchema();
+        // append columns
+        sb.append(
+                schema.getColumns().stream()
+                        .map(column -> String.format("%s%s", printIndent, 
getColumnString(column)))
+                        .collect(Collectors.joining(",\n")));
+        // append watermark spec
+        if (!schema.getWatermarkSpecs().isEmpty()) {
+            sb.append(",\n");
+            sb.append(
+                    schema.getWatermarkSpecs().stream()
+                            .map(
+                                    watermarkSpec ->
+                                            String.format(
+                                                    "%sWATERMARK FOR %s AS %s",
+                                                    printIndent,
+                                                    String.join(
+                                                            ".",
+                                                            
EncodingUtils.escapeIdentifier(
+                                                                    
watermarkSpec
+                                                                            
.getRowtimeAttribute())),
+                                                    watermarkSpec
+                                                            
.getWatermarkExpression()
+                                                            
.asSummaryString()))
+                            .collect(Collectors.joining("\n")));
+        }
+        // append constraint
+        if (schema.getPrimaryKey().isPresent()) {
+            sb.append(",\n");
+            sb.append(String.format("%s%s", printIndent, 
schema.getPrimaryKey().get()));
+        }
+        sb.append("\n) ");
+        // append comment
+        String comment = table.getComment();
+        if (StringUtils.isNotEmpty(comment)) {
+            sb.append(String.format("COMMENT '%s'\n", comment));
+        }
+        // append partitions
+        ResolvedCatalogTable catalogTable = (ResolvedCatalogTable) table;
+        if (catalogTable.isPartitioned()) {
+            sb.append("PARTITIONED BY (")
+                    .append(
+                            catalogTable.getPartitionKeys().stream()
+                                    .map(key -> String.format("`%s`", key))

Review comment:
       Use `EncodingUtils.escapeIdentifier` to escape which can handle "`" in 
field names.

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -178,6 +185,7 @@
     private final ModuleManager moduleManager;
     private final OperationTreeBuilder operationTreeBuilder;
     private final List<ModifyOperation> bufferedModifyOperations = new 
ArrayList<>();
+    private final String printIndent = "  ";

Review comment:
       nit: can be a local variable in `buildShowCreateTableRow` method?

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -1293,6 +1328,121 @@ private TableResult buildShowResult(String columnName, 
String[] objects) {
                 Arrays.stream(objects).map((c) -> new String[] 
{c}).toArray(String[][]::new));
     }
 
+    private String buildShowCreateTableRow(
+            ResolvedCatalogBaseTable<?> table,
+            ObjectIdentifier sqlIdentifier,
+            boolean isTemporary) {
+        CatalogBaseTable.TableKind kind = table.getTableKind();
+        if (kind == CatalogBaseTable.TableKind.VIEW) {
+            throw new TableException(
+                    String.format(
+                            "SHOW CREATE TABLE does not support showing CREATE 
VIEW statement with identifier %s.",
+                            sqlIdentifier.asSerializableString()));
+        }
+        StringBuilder sb =
+                new StringBuilder(
+                        String.format(
+                                "CREATE %sTABLE %s (\n",
+                                isTemporary ? "TEMPORARY " : "",
+                                sqlIdentifier.asSerializableString()));
+        ResolvedSchema schema = table.getResolvedSchema();
+        // append columns
+        sb.append(
+                schema.getColumns().stream()
+                        .map(column -> String.format("%s%s", printIndent, 
getColumnString(column)))
+                        .collect(Collectors.joining(",\n")));
+        // append watermark spec
+        if (!schema.getWatermarkSpecs().isEmpty()) {
+            sb.append(",\n");
+            sb.append(
+                    schema.getWatermarkSpecs().stream()
+                            .map(
+                                    watermarkSpec ->
+                                            String.format(
+                                                    "%sWATERMARK FOR %s AS %s",
+                                                    printIndent,
+                                                    String.join(
+                                                            ".",
+                                                            
EncodingUtils.escapeIdentifier(
+                                                                    
watermarkSpec
+                                                                            
.getRowtimeAttribute())),
+                                                    watermarkSpec
+                                                            
.getWatermarkExpression()
+                                                            
.asSummaryString()))
+                            .collect(Collectors.joining("\n")));
+        }
+        // append constraint
+        if (schema.getPrimaryKey().isPresent()) {
+            sb.append(",\n");
+            sb.append(String.format("%s%s", printIndent, 
schema.getPrimaryKey().get()));
+        }
+        sb.append("\n) ");
+        // append comment
+        String comment = table.getComment();
+        if (StringUtils.isNotEmpty(comment)) {
+            sb.append(String.format("COMMENT '%s'\n", comment));
+        }
+        // append partitions
+        ResolvedCatalogTable catalogTable = (ResolvedCatalogTable) table;
+        if (catalogTable.isPartitioned()) {
+            sb.append("PARTITIONED BY (")
+                    .append(
+                            catalogTable.getPartitionKeys().stream()
+                                    .map(key -> String.format("`%s`", key))
+                                    .collect(Collectors.joining(", ")))
+                    .append(")\n");
+        }
+        // append `with` properties
+        Map<String, String> options = table.getOptions();
+        sb.append("WITH (\n")
+                .append(
+                        options.entrySet().stream()
+                                .map(
+                                        entry ->
+                                                String.format(
+                                                        "%s'%s' = '%s'",
+                                                        printIndent,
+                                                        entry.getKey(),
+                                                        entry.getValue()))
+                                .collect(Collectors.joining(",\n")))
+                .append("\n)\n");
+        return sb.toString();
+    }
+
+    private String getColumnString(Column column) {
+        final StringBuilder sb = new StringBuilder();
+        sb.append(EncodingUtils.escapeIdentifier(column.getName()));
+        sb.append(" ");
+        // skip data type for computed column
+        if (column instanceof Column.ComputedColumn) {
+            sb.append(
+                    column.explainExtras()
+                            .orElseThrow(
+                                    () ->
+                                            new TableException(
+                                                    String.format(
+                                                            "Column expression 
can not be null for computed column '%s'",
+                                                            
column.getName()))));
+        } else {
+            DataType dataType = column.getDataType();
+            String type = dataType.toString();

Review comment:
       Use `dataType#asSerializableString` instead of 
`toString`/`asSummaryString`. `asSerializableString` doesn't show timestamp 
kind and we don't need the following special logic then. 
   
   Actually, we should always use `asSerializableString`, rather than 
`asSummaryString`, because only the serialized string is fully supported to be 
parsed, see `LogicalTypeParser`. 
   
   Besides, it would be better to avoid using `toString` and explicitly use 
`asSerializableString`, otherwise, it's hard to trace where the 
`asSerializableString`/`asSummaryString` are used. 

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/UniqueConstraint.java
##########
@@ -56,6 +56,17 @@ private UniqueConstraint(
         return columns;
     }
 
+    private final String getTypeString() {

Review comment:
       nit: don't need `final` for private method.

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
##########
@@ -985,6 +984,83 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends 
AbstractTestBase {
     assertEquals(false, tableSchema2.getPrimaryKey.isPresent)
   }
 
+  @Test
+  def testCreateTableAndShowCreateTable(): Unit = {
+    val executedDDL =
+      """
+        |create temporary table TBL1 (
+        |  a bigint not null,

Review comment:
       Add some special field names, e.g.
   
   ```
   `__source__` varchar(255),
   ```myname``` timestamp_ltz(3) metadata from 'timestamp',
   ```

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -1072,6 +1086,75 @@ private TableResult buildShowResult(String columnName, 
String[] objects) {
                        Arrays.stream(objects).map((c) -> new 
String[]{c}).toArray(String[][]::new));
        }
 
+       private TableResult buildShowCreateTableResult(CatalogBaseTable table, 
ObjectIdentifier sqlIdentifier) {

Review comment:
       Personally, I don't like to add more and more methods in the 
`TableEnvironmentImpl`, this makes the class fat and hard to maintain. I think 
such methods can be utilities in a separate class, e.g. `ShowStatementUtils` 

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateTableOperation.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+/** Operation to describe a SHOW CREATE TABLE statement. */
+public class ShowCreateTableOperation implements ShowOperation {
+
+    private final ObjectIdentifier sqlIdentifier;

Review comment:
       Have a meaningful name here, e.g. `tableIdentifier`. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to