This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit 4365bcfb7b6a7102851f59d10c04394aaaeee0bc
Author: yuzelin <[email protected]>
AuthorDate: Sun Apr 23 13:58:59 2023 +0800

    [flink] Support showing physical column comments for DESC statement (#964)
---
 .../org/apache/paimon/flink/DataCatalogTable.java  | 65 ++++++++++++++++++++++
 .../java/org/apache/paimon/flink/FlinkCatalog.java | 22 +++++++-
 .../apache/paimon/flink/ReadWriteTableITCase.java  | 35 ++++++++++++
 3 files changed, 121 insertions(+), 1 deletion(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataCatalogTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataCatalogTable.java
index 1dd62cb2c..37d1f9fa3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataCatalogTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataCatalogTable.java
@@ -20,8 +20,12 @@ package org.apache.paimon.flink;
 
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.types.DataField;
 
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableColumn;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.constraints.UniqueConstraint;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
@@ -30,6 +34,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /** A {@link CatalogTableImpl} to wrap {@link FileStoreTable}. */
 public class DataCatalogTable extends CatalogTableImpl {
@@ -50,6 +55,66 @@ public class DataCatalogTable extends CatalogTableImpl {
         return table;
     }
 
+    @Override
+    public Schema getUnresolvedSchema() {
+        // add physical column comments
+        Map<String, String> columnComments =
+                table.rowType().getFields().stream()
+                        .filter(dataField -> dataField.description() != null)
+                        .collect(Collectors.toMap(DataField::name, 
DataField::description));
+
+        return toSchema(getSchema(), columnComments);
+    }
+
+    /** Copied from {@link TableSchema#toSchema(Map)} to support versions 
lower than 1.17. */
+    private Schema toSchema(TableSchema tableSchema, Map<String, String> 
comments) {
+        final Schema.Builder builder = Schema.newBuilder();
+
+        tableSchema
+                .getTableColumns()
+                .forEach(
+                        column -> {
+                            if (column instanceof TableColumn.PhysicalColumn) {
+                                final TableColumn.PhysicalColumn c =
+                                        (TableColumn.PhysicalColumn) column;
+                                builder.column(c.getName(), c.getType());
+                            } else if (column instanceof 
TableColumn.MetadataColumn) {
+                                final TableColumn.MetadataColumn c =
+                                        (TableColumn.MetadataColumn) column;
+                                builder.columnByMetadata(
+                                        c.getName(),
+                                        c.getType(),
+                                        c.getMetadataAlias().orElse(null),
+                                        c.isVirtual());
+                            } else if (column instanceof 
TableColumn.ComputedColumn) {
+                                final TableColumn.ComputedColumn c =
+                                        (TableColumn.ComputedColumn) column;
+                                builder.columnByExpression(c.getName(), 
c.getExpression());
+                            } else {
+                                throw new IllegalArgumentException(
+                                        "Unsupported column type: " + column);
+                            }
+                            String colName = column.getName();
+                            if (comments.containsKey(colName)) {
+                                builder.withComment(comments.get(colName));
+                            }
+                        });
+
+        tableSchema
+                .getWatermarkSpecs()
+                .forEach(
+                        spec ->
+                                builder.watermark(
+                                        spec.getRowtimeAttribute(), 
spec.getWatermarkExpr()));
+
+        if (tableSchema.getPrimaryKey().isPresent()) {
+            UniqueConstraint primaryKey = tableSchema.getPrimaryKey().get();
+            builder.primaryKeyNamed(primaryKey.getName(), 
primaryKey.getColumns());
+        }
+
+        return builder.build();
+    }
+
     @Override
     public CatalogBaseTable copy() {
         return new DataCatalogTable(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 988b307b2..800f8b235 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -25,7 +25,9 @@ import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.types.DataField;
 
+import org.apache.flink.table.api.Schema.UnresolvedColumn;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.AbstractCatalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
@@ -59,6 +61,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.table.descriptors.Schema.SCHEMA;
 import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
@@ -367,13 +370,30 @@ public class FlinkCatalog extends AbstractCatalog {
         }
 
         return new Schema(
-                toDataType(rowType).getFields(),
+                addColumnComments(toDataType(rowType).getFields(), 
getColumnComments(catalogTable)),
                 catalogTable.getPartitionKeys(),
                 primaryKeys,
                 options,
                 catalogTable.getComment());
     }
 
+    private static Map<String, String> getColumnComments(CatalogTable 
catalogTable) {
+        return catalogTable.getUnresolvedSchema().getColumns().stream()
+                .filter(c -> c.getComment().isPresent())
+                .collect(Collectors.toMap(UnresolvedColumn::getName, c -> 
c.getComment().get()));
+    }
+
+    private static List<DataField> addColumnComments(
+            List<DataField> fields, Map<String, String> columnComments) {
+        return fields.stream()
+                .map(
+                        field -> {
+                            String comment = columnComments.get(field.name());
+                            return comment == null ? field : 
field.newDescription(comment);
+                        })
+                .collect(Collectors.toList());
+    }
+
     public static Identifier toIdentifier(ObjectPath path) {
         return new Identifier(path.getDatabaseName(), path.getObjectName());
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index 5c1699e9c..df09ad98e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -45,6 +45,7 @@ import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -54,7 +55,9 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 import static 
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
 import static org.apache.paimon.CoreOptions.BUCKET;
@@ -1331,6 +1334,38 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
                 .hasMessage("Paimon doesn't support streaming INSERT 
OVERWRITE.");
     }
 
+    @Test
+    public void testPhysicalColumnComments() {
+        String ddl = "CREATE TABLE T(a INT COMMENT 'comment of a', b INT);";
+        bEnv.executeSql(ddl);
+
+        List<String> result =
+                CollectionUtil.iteratorToList(bEnv.executeSql("DESC 
T").collect()).stream()
+                        .map(Objects::toString)
+                        .collect(Collectors.toList());
+
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        "+I[a, INT, true, null, null, null, comment of a]",
+                        "+I[b, INT, true, null, null, null, null]");
+    }
+
+    @Test
+    public void testUnsupportedComputedColumnComments() {
+        String ddl = "CREATE TABLE T(a INT , b INT, c AS a + b COMMENT 
'computed');";
+        bEnv.executeSql(ddl);
+
+        List<String> result =
+                CollectionUtil.iteratorToList(bEnv.executeSql("DESC 
T").collect()).stream()
+                        .map(Objects::toString)
+                        .collect(Collectors.toList());
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        "+I[a, INT, true, null, null, null]",
+                        "+I[b, INT, true, null, null, null]",
+                        "+I[c, INT, true, null, AS `a` + `b`, null]");
+    }
+
     // 
----------------------------------------------------------------------------------------------------------------
     // Tools
     // 
----------------------------------------------------------------------------------------------------------------

Reply via email to