wuchong commented on a change in pull request #13011:
URL: https://github.com/apache/flink/pull/13011#discussion_r616298791



##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -1293,6 +1328,121 @@ private TableResult buildShowResult(String columnName, 
String[] objects) {
                 Arrays.stream(objects).map((c) -> new String[] 
{c}).toArray(String[][]::new));
     }
 
+    private String buildShowCreateTableRow(
+            ResolvedCatalogBaseTable<?> table,
+            ObjectIdentifier sqlIdentifier,
+            boolean isTemporary) {
+        CatalogBaseTable.TableKind kind = table.getTableKind();
+        if (kind == CatalogBaseTable.TableKind.VIEW) {
+            throw new TableException(
+                    String.format(
+                            "SHOW CREATE TABLE does not support showing CREATE 
VIEW statement with identifier %s.",
+                            sqlIdentifier.asSerializableString()));
+        }
+        StringBuilder sb =
+                new StringBuilder(
+                        String.format(
+                                "CREATE %sTABLE %s (\n",
+                                isTemporary ? "TEMPORARY " : "",
+                                sqlIdentifier.asSerializableString()));
+        ResolvedSchema schema = table.getResolvedSchema();
+        // append columns
+        sb.append(
+                schema.getColumns().stream()
+                        .map(column -> String.format("%s%s", printIndent, 
getColumnString(column)))
+                        .collect(Collectors.joining(",\n")));
+        // append watermark spec
+        if (!schema.getWatermarkSpecs().isEmpty()) {
+            sb.append(",\n");
+            sb.append(
+                    schema.getWatermarkSpecs().stream()
+                            .map(
+                                    watermarkSpec ->
+                                            String.format(
+                                                    "%sWATERMARK FOR %s AS %s",
+                                                    printIndent,
+                                                    String.join(
+                                                            ".",
+                                                            
EncodingUtils.escapeIdentifier(
+                                                                    
watermarkSpec
+                                                                            
.getRowtimeAttribute())),
+                                                    watermarkSpec
+                                                            
.getWatermarkExpression()
+                                                            
.asSummaryString()))
+                            .collect(Collectors.joining("\n")));
+        }
+        // append constraint
+        if (schema.getPrimaryKey().isPresent()) {
+            sb.append(",\n");
+            sb.append(String.format("%s%s", printIndent, 
schema.getPrimaryKey().get()));
+        }
+        sb.append("\n) ");
+        // append comment
+        String comment = table.getComment();
+        if (StringUtils.isNotEmpty(comment)) {
+            sb.append(String.format("COMMENT '%s'\n", comment));
+        }
+        // append partitions
+        ResolvedCatalogTable catalogTable = (ResolvedCatalogTable) table;
+        if (catalogTable.isPartitioned()) {
+            sb.append("PARTITIONED BY (")
+                    .append(
+                            catalogTable.getPartitionKeys().stream()
+                                    .map(key -> String.format("`%s`", key))
+                                    .collect(Collectors.joining(", ")))
+                    .append(")\n");
+        }
+        // append `with` properties
+        Map<String, String> options = table.getOptions();
+        sb.append("WITH (\n")
+                .append(
+                        options.entrySet().stream()
+                                .map(
+                                        entry ->
+                                                String.format(
+                                                        "%s'%s' = '%s'",
+                                                        printIndent,
+                                                        entry.getKey(),
+                                                        entry.getValue()))
+                                .collect(Collectors.joining(",\n")))
+                .append("\n)\n");
+        return sb.toString();
+    }
+
+    private String getColumnString(Column column) {
+        final StringBuilder sb = new StringBuilder();
+        sb.append(EncodingUtils.escapeIdentifier(column.getName()));
+        sb.append(" ");
+        // skip data type for computed column
+        if (column instanceof Column.ComputedColumn) {
+            sb.append(
+                    column.explainExtras()
+                            .orElseThrow(
+                                    () ->
+                                            new TableException(
+                                                    String.format(
+                                                            "Column expression 
can not be null for computed column '%s'",
+                                                            
column.getName()))));
+        } else {
+            DataType dataType = column.getDataType();
+            String type = dataType.toString();

Review comment:
       I think we can leave this into another issue. `STRING`, `BYTES`, 
`TIMESTAMP_LTZ` are just abbreviations, it's fine to just display 
`VARCHAR(MAX)`, `TIMESTAMP(%d) WITH LOCAL TIME ZONE`, because they are single 
source of truth.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to