This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.4 in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit 4365bcfb7b6a7102851f59d10c04394aaaeee0bc Author: yuzelin <[email protected]> AuthorDate: Sun Apr 23 13:58:59 2023 +0800 [flink] Support showing physical column comments for DESC statement (#964) --- .../org/apache/paimon/flink/DataCatalogTable.java | 65 ++++++++++++++++++++++ .../java/org/apache/paimon/flink/FlinkCatalog.java | 22 +++++++- .../apache/paimon/flink/ReadWriteTableITCase.java | 35 ++++++++++++ 3 files changed, 121 insertions(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataCatalogTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataCatalogTable.java index 1dd62cb2c..37d1f9fa3 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataCatalogTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataCatalogTable.java @@ -20,8 +20,12 @@ package org.apache.paimon.flink; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; +import org.apache.paimon.types.DataField; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.TableColumn; import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.constraints.UniqueConstraint; import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.CatalogTableImpl; @@ -30,6 +34,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** A {@link CatalogTableImpl} to wrap {@link FileStoreTable}. */ public class DataCatalogTable extends CatalogTableImpl { @@ -50,6 +55,66 @@ public class DataCatalogTable extends CatalogTableImpl { return table; } + @Override + public Schema getUnresolvedSchema() { + // add physical column comments + Map<String, String> columnComments = + table.rowType().getFields().stream() + .filter(dataField -> dataField.description() != null) + .collect(Collectors.toMap(DataField::name, DataField::description)); + + return toSchema(getSchema(), columnComments); + } + + /** Copied from {@link TableSchema#toSchema(Map)} to support versions lower than 1.17. */ + private Schema toSchema(TableSchema tableSchema, Map<String, String> comments) { + final Schema.Builder builder = Schema.newBuilder(); + + tableSchema + .getTableColumns() + .forEach( + column -> { + if (column instanceof TableColumn.PhysicalColumn) { + final TableColumn.PhysicalColumn c = + (TableColumn.PhysicalColumn) column; + builder.column(c.getName(), c.getType()); + } else if (column instanceof TableColumn.MetadataColumn) { + final TableColumn.MetadataColumn c = + (TableColumn.MetadataColumn) column; + builder.columnByMetadata( + c.getName(), + c.getType(), + c.getMetadataAlias().orElse(null), + c.isVirtual()); + } else if (column instanceof TableColumn.ComputedColumn) { + final TableColumn.ComputedColumn c = + (TableColumn.ComputedColumn) column; + builder.columnByExpression(c.getName(), c.getExpression()); + } else { + throw new IllegalArgumentException( + "Unsupported column type: " + column); + } + String colName = column.getName(); + if (comments.containsKey(colName)) { + builder.withComment(comments.get(colName)); + } + }); + + tableSchema + .getWatermarkSpecs() + .forEach( + spec -> + builder.watermark( + spec.getRowtimeAttribute(), spec.getWatermarkExpr())); + + if (tableSchema.getPrimaryKey().isPresent()) { + UniqueConstraint primaryKey = tableSchema.getPrimaryKey().get(); + builder.primaryKeyNamed(primaryKey.getName(), primaryKey.getColumns()); + } + + return builder.build(); + } + @Override public CatalogBaseTable copy() { return new DataCatalogTable( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index 988b307b2..800f8b235 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -25,7 +25,9 @@ import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; +import org.apache.paimon.types.DataField; +import org.apache.flink.table.api.Schema.UnresolvedColumn; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.AbstractCatalog; import org.apache.flink.table.catalog.CatalogBaseTable; @@ -59,6 +61,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.stream.Collectors; import static org.apache.flink.table.descriptors.Schema.SCHEMA; import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; @@ -367,13 +370,30 @@ public class FlinkCatalog extends AbstractCatalog { } return new Schema( - toDataType(rowType).getFields(), + addColumnComments(toDataType(rowType).getFields(), getColumnComments(catalogTable)), catalogTable.getPartitionKeys(), primaryKeys, options, catalogTable.getComment()); } + private static Map<String, String> getColumnComments(CatalogTable catalogTable) { + return catalogTable.getUnresolvedSchema().getColumns().stream() + .filter(c -> c.getComment().isPresent()) + .collect(Collectors.toMap(UnresolvedColumn::getName, c -> c.getComment().get())); + } + + private static List<DataField> addColumnComments( + List<DataField> fields, Map<String, String> columnComments) { + return fields.stream() + .map( + field -> { + String comment = columnComments.get(field.name()); + return comment == null ? field : field.newDescription(comment); + }) + .collect(Collectors.toList()); + } + public static Identifier toIdentifier(ObjectPath path) { return new Identifier(path.getDatabaseName(), path.getObjectName()); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java index 5c1699e9c..df09ad98e 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java @@ -45,6 +45,7 @@ import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.VarCharType; import org.apache.flink.types.Row; +import org.apache.flink.util.CollectionUtil; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -54,7 +55,9 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.UUID; +import java.util.stream.Collectors; import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow; import static org.apache.paimon.CoreOptions.BUCKET; @@ -1331,6 +1334,38 @@ public class ReadWriteTableITCase extends AbstractTestBase { .hasMessage("Paimon doesn't support streaming INSERT OVERWRITE."); } + @Test + public void testPhysicalColumnComments() { + String ddl = "CREATE TABLE T(a INT COMMENT 'comment of a', b INT);"; + bEnv.executeSql(ddl); + + List<String> result = + CollectionUtil.iteratorToList(bEnv.executeSql("DESC T").collect()).stream() + .map(Objects::toString) + .collect(Collectors.toList()); + + assertThat(result) + .containsExactlyInAnyOrder( + "+I[a, INT, true, null, null, null, comment of a]", + "+I[b, INT, true, null, null, null, null]"); + } + + @Test + public void testUnsupportedComputedColumnComments() { + String ddl = "CREATE TABLE T(a INT , b INT, c AS a + b COMMENT 'computed');"; + bEnv.executeSql(ddl); + + List<String> result = + CollectionUtil.iteratorToList(bEnv.executeSql("DESC T").collect()).stream() + .map(Objects::toString) + .collect(Collectors.toList()); + assertThat(result) + .containsExactlyInAnyOrder( + "+I[a, INT, true, null, null, null]", + "+I[b, INT, true, null, null, null]", + "+I[c, INT, true, null, AS `a` + `b`, null]"); + } + // ---------------------------------------------------------------------------------------------------------------- // Tools // ----------------------------------------------------------------------------------------------------------------
