godfreyhe commented on a change in pull request #11892:
URL: https://github.com/apache/flink/pull/11892#discussion_r422050608



##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
##########
@@ -766,7 +768,29 @@ abstract class TableEnvImpl(
       case descOperation: DescribeTableOperation =>
         val result = catalogManager.getTable(descOperation.getSqlIdentifier)
         if (result.isPresent) {
-          buildShowResult(Array(result.get().getTable.getSchema.toString))
+          val schema = result.get.getTable.getSchema
+          val fieldToWatermark =
+            schema
+              .getWatermarkSpecs
+              .map(w => (w.getRowtimeAttribute, w.getWatermarkExpr)).toMap
+          val fieldToPrimaryKey = new JHashMap[String, String]()
+          if (schema.getPrimaryKey.isPresent) {
+            val columns = schema.getPrimaryKey.get.getColumns.asScala
+            columns.foreach(c => fieldToPrimaryKey.put(c, 
s"PRI(${columns.mkString(",")})"))
+          }
+          val data = Array.ofDim[String](schema.getFieldCount, 6)
+          schema.getTableColumns.asScala.zipWithIndex.foreach {
+            case (c, i) => {
+              val logicalType = c.getType.getLogicalType
+              data(i)(0) = c.getName
+              data(i)(1) = StringUtils.removeEnd(logicalType.toString, " NOT 
NULL")
+              data(i)(2) = if (logicalType.isNullable) "true" else "false"
+              data(i)(3) = fieldToPrimaryKey.getOrDefault(c.getName, "(NULL)")
+              data(i)(4) = c.getExpr.orElse("(NULL)")
+              data(i)(5) = fieldToWatermark.getOrDefault(c.getName, "(NULL)")
+            }
+          }
+          buildShowResult(Array("name", "type", "null", "key", "compute 
column", "watermark"), data)

Review comment:
       ditto

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -861,23 +865,32 @@ private TableResult executeOperation(Operation operation) 
{
                                        
catalogManager.getTable(describeTableOperation.getSqlIdentifier());
                        if (result.isPresent()) {
                                TableSchema schema = 
result.get().getTable().getSchema();
-                               String[][] rows = Stream.concat(
+                               Map<String, String> fieldToWatermark =
+                                       schema.getWatermarkSpecs()
+                                               .stream()
+                                                       
.collect(Collectors.toMap(WatermarkSpec::getRowtimeAttribute, 
WatermarkSpec::getWatermarkExpr));
+
+                               Map<String, String> fieldToPrimaryKey = new 
HashMap<>();
+                               schema.getPrimaryKey().ifPresent((p) -> {
+                                       List<String> columns = p.getColumns();
+                                       columns.forEach((c) -> 
fieldToPrimaryKey.put(c, String.format("PRI(%s)", String.join(",", columns))));
+                               });
+
+                               String[][] rows =
                                        schema.getTableColumns()
                                                .stream()
-                                                       .map((c) -> new 
String[]{
+                                                       .map((c) -> {
+                                                               LogicalType 
logicalType = c.getType().getLogicalType();
+                                                               return new 
String[]{
                                                                c.getName(),
-                                                               
c.getType().getLogicalType().toString(),
-                                                               
c.getExpr().orElse("(NULL)")}),
-                                       schema.getWatermarkSpecs()
-                                               .stream()
-                                                       .map((w) -> new 
String[]{
-                                                               "WATERMARK",
-                                                               "(NULL)",
-                                                               
w.getWatermarkExpr()
-                                                       })
-                               ).toArray(String[][]::new);
-
-                               return buildShowResult(new String[]{"name", 
"type", "expr"}, rows);
+                                                               
StringUtils.removeEnd(logicalType.toString(), " NOT NULL"),
+                                                               
logicalType.isNullable() ? "true" : "false",
+                                                               
fieldToPrimaryKey.getOrDefault(c.getName(), "(NULL)"),
+                                                               
c.getExpr().orElse("(NULL)"),
+                                                               
fieldToWatermark.getOrDefault(c.getName(), "(NULL)")};
+                                                       
}).toArray(String[][]::new);
+
+                               return buildShowResult(new String[]{"name", 
"type", "null", "key", "compute column", "watermark"}, rows);

Review comment:
       move these code to a separated method ?

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -861,23 +865,32 @@ private TableResult executeOperation(Operation operation) 
{
                                        
catalogManager.getTable(describeTableOperation.getSqlIdentifier());
                        if (result.isPresent()) {
                                TableSchema schema = 
result.get().getTable().getSchema();
-                               String[][] rows = Stream.concat(
+                               Map<String, String> fieldToWatermark =
+                                       schema.getWatermarkSpecs()
+                                               .stream()
+                                                       
.collect(Collectors.toMap(WatermarkSpec::getRowtimeAttribute, 
WatermarkSpec::getWatermarkExpr));
+
+                               Map<String, String> fieldToPrimaryKey = new 
HashMap<>();
+                               schema.getPrimaryKey().ifPresent((p) -> {
+                                       List<String> columns = p.getColumns();
+                                       columns.forEach((c) -> 
fieldToPrimaryKey.put(c, String.format("PRI(%s)", String.join(",", columns))));
+                               });
+
+                               String[][] rows =
                                        schema.getTableColumns()
                                                .stream()
-                                                       .map((c) -> new 
String[]{
+                                                       .map((c) -> {
+                                                               LogicalType 
logicalType = c.getType().getLogicalType();
+                                                               return new 
String[]{
                                                                c.getName(),
-                                                               
c.getType().getLogicalType().toString(),
-                                                               
c.getExpr().orElse("(NULL)")}),
-                                       schema.getWatermarkSpecs()
-                                               .stream()
-                                                       .map((w) -> new 
String[]{
-                                                               "WATERMARK",
-                                                               "(NULL)",
-                                                               
w.getWatermarkExpr()
-                                                       })
-                               ).toArray(String[][]::new);
-
-                               return buildShowResult(new String[]{"name", 
"type", "expr"}, rows);
+                                                               
StringUtils.removeEnd(logicalType.toString(), " NOT NULL"),
+                                                               
logicalType.isNullable() ? "true" : "false",
+                                                               
fieldToPrimaryKey.getOrDefault(c.getName(), "(NULL)"),

Review comment:
       we should use `null` instead of `(NULL)`, `(NULL)` just a display form 
defined in `PrintUtils`.

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -861,23 +865,32 @@ private TableResult executeOperation(Operation operation) 
{
                                        
catalogManager.getTable(describeTableOperation.getSqlIdentifier());
                        if (result.isPresent()) {
                                TableSchema schema = 
result.get().getTable().getSchema();
-                               String[][] rows = Stream.concat(
+                               Map<String, String> fieldToWatermark =
+                                       schema.getWatermarkSpecs()
+                                               .stream()
+                                                       
.collect(Collectors.toMap(WatermarkSpec::getRowtimeAttribute, 
WatermarkSpec::getWatermarkExpr));
+
+                               Map<String, String> fieldToPrimaryKey = new 
HashMap<>();
+                               schema.getPrimaryKey().ifPresent((p) -> {
+                                       List<String> columns = p.getColumns();
+                                       columns.forEach((c) -> 
fieldToPrimaryKey.put(c, String.format("PRI(%s)", String.join(",", columns))));
+                               });
+
+                               String[][] rows =
                                        schema.getTableColumns()
                                                .stream()
-                                                       .map((c) -> new 
String[]{
+                                                       .map((c) -> {
+                                                               LogicalType 
logicalType = c.getType().getLogicalType();
+                                                               return new 
String[]{
                                                                c.getName(),
-                                                               
c.getType().getLogicalType().toString(),
-                                                               
c.getExpr().orElse("(NULL)")}),
-                                       schema.getWatermarkSpecs()
-                                               .stream()
-                                                       .map((w) -> new 
String[]{
-                                                               "WATERMARK",
-                                                               "(NULL)",
-                                                               
w.getWatermarkExpr()
-                                                       })
-                               ).toArray(String[][]::new);
-
-                               return buildShowResult(new String[]{"name", 
"type", "expr"}, rows);
+                                                               
StringUtils.removeEnd(logicalType.toString(), " NOT NULL"),
+                                                               
logicalType.isNullable() ? "true" : "false",

Review comment:
       should be boolean type instead of String




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to