Airblader commented on a change in pull request #17352:
URL: https://github.com/apache/flink/pull/17352#discussion_r716428189



##########
File path: docs/content/docs/dev/table/sql/show.md
##########
@@ -26,7 +26,7 @@ under the License.
 
 # SHOW Statements
 
-SHOW statements are used to list all catalogs, or list all databases in the 
current catalog, or list all tables/views in the current catalog and the 
current database, or show current catalog and database, or show create 
statement for specified table, or list all functions including system functions 
and user-defined functions in the current catalog and current database, or list 
only user-defined functions in the current catalog and current database, or 
list enabled module names, or list all loaded modules with enabled status in 
the current session, or list the columns of the table or the view with the 
given name and the optional like clause.
+SHOW statements are used to list all catalogs, or list all databases in the 
current catalog, or list all tables/views in the current catalog and the 
current database, or show current catalog and database, or show create 
statement for specified table or specified view, or list all functions 
including system functions and user-defined functions in the current catalog 
and current database, or list only user-defined functions in the current 
catalog and current database, or list enabled module names, or list all loaded 
modules with enabled status in the current session, or list the columns of the 
table or the view with the given name and the optional like clause.

Review comment:
       This isn't entirely related to your PR, but the sentence here is very 
lengthy and pretty much unreadable at this point. Can we maybe rephrase this a 
bit?
   
   > `SHOW` statements are used to list objects within their corresponding 
parent, such as catalogs, databases, tables and views, columns, functions, and 
modules. See the individual commands for more details and additional options.
   > `SHOW CREATE` statements are used to print a DDL statement with which a 
given object can be created.

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -1403,44 +1429,120 @@ private String buildShowCreateTableRow(
                                                             
.asSerializableString()))
                             .collect(Collectors.joining("\n")));
         }
-        // append constraint
-        if (schema.getPrimaryKey().isPresent()) {
-            sb.append(",\n");
-            sb.append(String.format("%s%s", printIndent, 
schema.getPrimaryKey().get()));
-        }
-        sb.append("\n) ");
-        // append comment
+        return Optional.empty();
+    }
+
+    private Optional<String> 
extractFormattedComment(ResolvedCatalogBaseTable<?> table) {
         String comment = table.getComment();
         if (StringUtils.isNotEmpty(comment)) {
-            sb.append(String.format("COMMENT '%s'\n", comment));
+            return Optional.of(EncodingUtils.escapeSingleQuotes(comment));
         }
-        // append partitions
-        ResolvedCatalogTable catalogTable = (ResolvedCatalogTable) table;
-        if (catalogTable.isPartitioned()) {
-            sb.append("PARTITIONED BY (")
-                    .append(
-                            catalogTable.getPartitionKeys().stream()
-                                    .map(EncodingUtils::escapeIdentifier)
-                                    .collect(Collectors.joining(", ")))
-                    .append(")\n");
+        return Optional.empty();
+    }
+
+    private Optional<String> 
extractFormattedPartitionedInfo(ResolvedCatalogTable catalogTable) {
+        if (!catalogTable.isPartitioned()) {
+            return Optional.empty();
         }
+        return Optional.of(
+                catalogTable.getPartitionKeys().stream()
+                        .map(EncodingUtils::escapeIdentifier)
+                        .collect(Collectors.joining(", ")));
+    }
+
+    private Optional<String> extractFormattedOptions(
+            ResolvedCatalogBaseTable<?> table, String printIndent) {
+        if (Objects.isNull(table.getOptions()) || 
table.getOptions().isEmpty()) {
+            return Optional.empty();
+        }
+        return Optional.of(
+                table.getOptions().entrySet().stream()
+                        .map(
+                                entry ->
+                                        String.format(
+                                                "%s'%s' = '%s'",
+                                                printIndent, entry.getKey(), 
entry.getValue()))
+                        .collect(Collectors.joining(",\n")));
+    }
+
+    private String extractFormattedColumnNames(ResolvedCatalogBaseTable<?> 
baseTable) {
+        return baseTable.getResolvedSchema().getColumns().stream()
+                .map(Column::getName)
+                .map(EncodingUtils::escapeIdentifier)
+                .collect(Collectors.joining(", "));
+    }
+
+    private String buildShowCreateTableRow(
+            ResolvedCatalogBaseTable<?> table,
+            ObjectIdentifier tableIdentifier,
+            boolean isTemporary) {
+        if (table.getTableKind() == CatalogBaseTable.TableKind.VIEW) {
+            throw new TableException(
+                    String.format(
+                            "SHOW CREATE TABLE is only supported for tables, 
but '%s' is a view. Please use SHOW CREATE VIEW instead.",
+                            tableIdentifier.asSerializableString()));
+        }
+        final String printIndent = "  ";
+        StringBuilder sb =
+                new StringBuilder()
+                        .append(buildCreateFormattedPrefix("TABLE", 
isTemporary, tableIdentifier));
+        // append columns
+        sb.append(extractFormattedColumns(table, printIndent));
+        // append watermark spec
+        extractFormattedWatermarkSpecs(table, printIndent)
+                .ifPresent(watermarkSpecs -> 
sb.append(",\n").append(watermarkSpecs));
+        // append constraint
+        extractFormattedPrimaryKey(table, printIndent).ifPresent(pk -> 
sb.append(",\n").append(pk));
+        sb.append("\n) ");
+        // append comment
+        extractFormattedComment(table)
+                .ifPresent(c -> sb.append(String.format("COMMENT '%s'\n", c)));
+        // append partitions
+        extractFormattedPartitionedInfo((ResolvedCatalogTable) table)
+                .ifPresent(
+                        partitionedInfoFormatted ->
+                                sb.append("PARTITIONED BY (")
+                                        .append(partitionedInfoFormatted)
+                                        .append(")\n"));
         // append `with` properties
-        Map<String, String> options = table.getOptions();
-        sb.append("WITH (\n")
-                .append(
-                        options.entrySet().stream()
-                                .map(
-                                        entry ->
-                                                String.format(
-                                                        "%s'%s' = '%s'",
-                                                        printIndent,
-                                                        entry.getKey(),
-                                                        entry.getValue()))
-                                .collect(Collectors.joining(",\n")))
-                .append("\n)\n");
+        extractFormattedOptions(table, printIndent)
+                .ifPresent(v -> sb.append("WITH 
(\n").append(v).append("\n)\n"));
         return sb.toString();
     }
 
+    /** Show create view statement only for views. */
+    private String buildShowCreateViewRow(
+            ResolvedCatalogBaseTable<?> view,
+            ObjectIdentifier viewIdentifier,
+            boolean isTemporary) {
+        if (view.getTableKind() != CatalogBaseTable.TableKind.VIEW) {
+            throw new TableException(
+                    String.format(
+                            "SHOW CREATE VIEW is only supported for views, but 
'%s' is a table. Please use SHOW CREATE TABLE instead.",

Review comment:
       nit: I think this was my mistake, sorry, but removing the `''` around 
the `%s` here would probably be better since the identifier will be fully 
escaped anyway.

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -1403,44 +1429,120 @@ private String buildShowCreateTableRow(
                                                             
.asSerializableString()))
                             .collect(Collectors.joining("\n")));
         }
-        // append constraint
-        if (schema.getPrimaryKey().isPresent()) {
-            sb.append(",\n");
-            sb.append(String.format("%s%s", printIndent, 
schema.getPrimaryKey().get()));
-        }
-        sb.append("\n) ");
-        // append comment
+        return Optional.empty();
+    }
+
+    private Optional<String> 
extractFormattedComment(ResolvedCatalogBaseTable<?> table) {
         String comment = table.getComment();
         if (StringUtils.isNotEmpty(comment)) {
-            sb.append(String.format("COMMENT '%s'\n", comment));
+            return Optional.of(EncodingUtils.escapeSingleQuotes(comment));
         }
-        // append partitions
-        ResolvedCatalogTable catalogTable = (ResolvedCatalogTable) table;
-        if (catalogTable.isPartitioned()) {
-            sb.append("PARTITIONED BY (")
-                    .append(
-                            catalogTable.getPartitionKeys().stream()
-                                    .map(EncodingUtils::escapeIdentifier)
-                                    .collect(Collectors.joining(", ")))
-                    .append(")\n");
+        return Optional.empty();
+    }
+
+    private Optional<String> 
extractFormattedPartitionedInfo(ResolvedCatalogTable catalogTable) {
+        if (!catalogTable.isPartitioned()) {
+            return Optional.empty();
         }
+        return Optional.of(
+                catalogTable.getPartitionKeys().stream()
+                        .map(EncodingUtils::escapeIdentifier)
+                        .collect(Collectors.joining(", ")));
+    }
+
+    private Optional<String> extractFormattedOptions(
+            ResolvedCatalogBaseTable<?> table, String printIndent) {
+        if (Objects.isNull(table.getOptions()) || 
table.getOptions().isEmpty()) {
+            return Optional.empty();
+        }
+        return Optional.of(
+                table.getOptions().entrySet().stream()
+                        .map(
+                                entry ->
+                                        String.format(
+                                                "%s'%s' = '%s'",
+                                                printIndent, entry.getKey(), 
entry.getValue()))

Review comment:
       The key and value need to be single-quote escaped as well.

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -1362,35 +1392,31 @@ private TableResult buildShowResult(String columnName, 
String[] objects) {
                 Arrays.stream(objects).map((c) -> new String[] 
{c}).toArray(String[][]::new));
     }
 
-    private String buildShowCreateTableRow(
-            ResolvedCatalogBaseTable<?> table,
-            ObjectIdentifier tableIdentifier,
-            boolean isTemporary) {
-        final String printIndent = "  ";
-        CatalogBaseTable.TableKind kind = table.getTableKind();
-        if (kind == CatalogBaseTable.TableKind.VIEW) {
-            throw new TableException(
-                    String.format(
-                            "SHOW CREATE TABLE does not support showing CREATE 
VIEW statement with identifier %s.",
-                            tableIdentifier.asSerializableString()));
-        }
-        StringBuilder sb =
-                new StringBuilder(
-                        String.format(
-                                "CREATE %sTABLE %s (\n",
-                                isTemporary ? "TEMPORARY " : "",
-                                tableIdentifier.asSerializableString()));
-        ResolvedSchema schema = table.getResolvedSchema();
-        // append columns
-        sb.append(
-                schema.getColumns().stream()
-                        .map(column -> String.format("%s%s", printIndent, 
getColumnString(column)))
-                        .collect(Collectors.joining(",\n")));
-        // append watermark spec
-        if (!schema.getWatermarkSpecs().isEmpty()) {
-            sb.append(",\n");
-            sb.append(
-                    schema.getWatermarkSpecs().stream()
+    private String buildCreateFormattedPrefix(

Review comment:
       Since this class is already very big, and this isn't really related to 
the table environment anymore, maybe it makes sense to move this formatting 
logic into some (`@Internal`, of course) utility class and declutter this class 
by doing so?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to