fsk119 commented on a change in pull request #13011:
URL: https://github.com/apache/flink/pull/13011#discussion_r613178761



##########
File path: 
flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
##########
@@ -252,6 +252,22 @@ SqlShowTables SqlShowTables() :
     }
 }
 
+/**
+* Parse a "Show Create Table" query command.
+*/
+SqlShowCreateTable  SqlShowCreateTable() :

Review comment:
       There are two space `SqlShowCreateTable` and `SqlShowCreateTable`

##########
File path: flink-table/flink-sql-client/src/test/resources/sql/table.q
##########
@@ -73,6 +73,27 @@ show tables;
 1 row in set
 !ok
 
+# test SHOW CREATE TABLE
+show create table orders;
++-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+|                                                                              
                                                                                
                                                                                
                                                                          
create table |
++-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| CREATE TABLE `default_catalog`.`default_database`.`orders` (
+  `user` BIGINT NOT NULL,
+  `product` VARCHAR(32),
+  `amount` INT,
+  `ts` TIMESTAMP(3),
+  `ptime` AS PROCTIME(),
+  WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND,
+  CONSTRAINT `PK_3599338` PRIMARY KEY (`user`) NOT ENFORCED
+) WITH (
+  'connector' = 'datagen'
+)
+ |
++-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Review comment:
       I think we don't need to print the chart. You can take a look at how sql 
client print `EXPLAIN` results.
   
   It's very unconvenice to copy the ddl if we have chart.

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -1293,6 +1320,125 @@ private TableResult buildShowResult(String columnName, 
String[] objects) {
                 Arrays.stream(objects).map((c) -> new String[] 
{c}).toArray(String[][]::new));
     }
 
+    private String[] buildShowCreateTableRow(
+            ResolvedCatalogBaseTable<?> table,
+            ObjectIdentifier sqlIdentifier,
+            boolean isTemporary) {

Review comment:
       hive dialect has its own grammar to create table. Do we need to build 
table with hive dialect if the table is `Permannet`?

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
##########
@@ -985,6 +985,80 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends 
AbstractTestBase {
     assertEquals(false, tableSchema2.getPrimaryKey.isPresent)
   }
 
+  @Test

Review comment:
       If the table is created by TableAPi. The show create table result is 
strange, e.g.
   
   ```
   Table t =
                   tEnv.fromDataStream(
                           env.fromCollection(
                                   Arrays.asList(
                                           new Tuple2<>(1, "1"),
                                           new Tuple2<>(1, "1"),
                                           new Tuple2<>(2, "3"),
                                           new Tuple2<>(2, "5"),
                                           new Tuple2<>(3, "5"),
                                           new Tuple2<>(3, "8"))),
                           $("id1"),
                           $("id2"),
                           $("proctime").proctime());
   
           tEnv.createTemporaryView("T", t);
   ```
   
   ```
   CREATE TEMPORARY VIEW `default_catalog`.`default_database`.`T` AS
   DataStream: (id: [default_catalog.default_database.T], fields: [id1, id2, 
proctime])
   ```
   
   

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -1293,6 +1320,125 @@ private TableResult buildShowResult(String columnName, 
String[] objects) {
                 Arrays.stream(objects).map((c) -> new String[] 
{c}).toArray(String[][]::new));
     }
 
+    private String[] buildShowCreateTableRow(
+            ResolvedCatalogBaseTable<?> table,
+            ObjectIdentifier sqlIdentifier,
+            boolean isTemporary) {
+        CatalogBaseTable.TableKind kind = table.getTableKind();
+        StringBuilder sb =
+                new StringBuilder(
+                        String.format(
+                                "CREATE%s%s%s",
+                                isTemporary ? " TEMPORARY" : "",
+                                kind == CatalogBaseTable.TableKind.TABLE ? " 
TABLE " : " VIEW ",
+                                sqlIdentifier.asSerializableString()));
+        if (kind == CatalogBaseTable.TableKind.TABLE) {
+            sb.append(" (\n");
+            ResolvedSchema schema = table.getResolvedSchema();
+            // append columns
+            sb.append(
+                    schema.getColumns().stream()
+                            .map(
+                                    column ->
+                                            String.format(
+                                                    "%s%s", printIndent, 
getColumnString(column)))
+                            .collect(Collectors.joining(",\n")));
+            // append watermark spec
+            if (!schema.getWatermarkSpecs().isEmpty()) {
+                sb.append(",\n");
+                sb.append(
+                        schema.getWatermarkSpecs().stream()
+                                .map(
+                                        watermarkSpec ->
+                                                String.format(
+                                                        "%sWATERMARK FOR %s AS 
%s",
+                                                        printIndent,
+                                                        String.join(
+                                                                ".",
+                                                                
EncodingUtils.escapeIdentifier(
+                                                                        
watermarkSpec
+                                                                               
 .getRowtimeAttribute())),
+                                                        watermarkSpec
+                                                                
.getWatermarkExpression()
+                                                                
.asSummaryString()))
+                                .collect(Collectors.joining("\n")));
+            }
+            // append constraint
+            if (schema.getPrimaryKey().isPresent()) {
+                sb.append(",\n");
+                sb.append(String.format("%s%s", printIndent, 
schema.getPrimaryKey().get()));
+            }
+            sb.append("\n) ");
+            // append comment
+            String comment = table.getComment();
+            if (StringUtils.isNotEmpty(comment)) {
+                sb.append(String.format("COMMENT '%s'\n", comment));
+            }
+
+            // append partitions
+            ResolvedCatalogTable catalogTable = (ResolvedCatalogTable) table;
+            if (catalogTable.isPartitioned()) {
+                sb.append("PARTITIONED BY (")
+                        .append(
+                                catalogTable.getPartitionKeys().stream()
+                                        .map(key -> String.format("`%s`", key))
+                                        .collect(Collectors.joining(", ")))
+                        .append(")\n");
+            }
+            // append `with` properties
+            Map<String, String> options = table.getOptions();
+            sb.append("WITH (\n")
+                    .append(
+                            options.entrySet().stream()
+                                    .map(
+                                            entry ->
+                                                    String.format(
+                                                            "%s'%s' = '%s'",
+                                                            printIndent,
+                                                            entry.getKey(),
+                                                            entry.getValue()))
+                                    .collect(Collectors.joining(",\n")))
+                    .append("\n)\n");
+        } else {
+            sb.append(" AS\n");
+            sb.append(((ResolvedCatalogView) 
table).getExpandedQuery()).append("\n");
+        }
+        return new String[] {sb.toString()};
+    }
+
+    private String getColumnString(Column column) {

Review comment:
       Please add "TODO: Print the comment when FLINK-18958 is fix"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to