This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e22d164ff71 [FLINK-29743][table] CatalogPropertiesUtil supports 
de/serializing column comment (#22131)
e22d164ff71 is described below

commit e22d164ff710571ff2708568a90a21dee194116a
Author: Jark Wu <[email protected]>
AuthorDate: Sat Mar 11 22:28:38 2023 +0800

    [FLINK-29743][table] CatalogPropertiesUtil supports de/serializing column 
comment (#22131)
---
 .../catalog/CatalogBaseTableResolutionTest.java    | 15 ++++++++++---
 .../flink/table/catalog/CatalogPropertiesUtil.java | 25 ++++++++++++++++++++--
 2 files changed, 35 insertions(+), 5 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
index 7b5e67b4859..07b91013d3f 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
@@ -71,10 +71,13 @@ class CatalogBaseTableResolutionTest {
             Schema.newBuilder()
                     .column("id", DataTypes.INT().notNull())
                     .column("region", DataTypes.VARCHAR(200))
+                    .withComment("This is a region column.")
                     .column("county", DataTypes.VARCHAR(200))
                     .columnByMetadata("topic", DataTypes.VARCHAR(200), true)
+                    .withComment("") // empty column comment
                     .columnByMetadata("orig_ts", DataTypes.TIMESTAMP(3), 
"timestamp")
                     .columnByExpression("ts", COMPUTED_SQL)
+                    .withComment("This is a computed column")
                     .watermark("ts", WATERMARK_SQL)
                     .primaryKeyNamed("primary_constraint", "id")
                     .build();
@@ -102,11 +105,14 @@ class CatalogBaseTableResolutionTest {
             new ResolvedSchema(
                     Arrays.asList(
                             Column.physical("id", DataTypes.INT().notNull()),
-                            Column.physical("region", DataTypes.VARCHAR(200)),
+                            Column.physical("region", DataTypes.VARCHAR(200))
+                                    .withComment("This is a region column."),
                             Column.physical("county", DataTypes.VARCHAR(200)),
-                            Column.metadata("topic", DataTypes.VARCHAR(200), 
null, true),
+                            Column.metadata("topic", DataTypes.VARCHAR(200), 
null, true)
+                                    .withComment(""), // empty column comment
                             Column.metadata("orig_ts", DataTypes.TIMESTAMP(3), 
"timestamp", false),
-                            Column.computed("ts", COMPUTED_COLUMN_RESOLVED)),
+                            Column.computed("ts", COMPUTED_COLUMN_RESOLVED)
+                                    .withComment("This is a computed column")),
                     Collections.singletonList(WatermarkSpec.of("ts", 
WATERMARK_RESOLVED)),
                     UniqueConstraint.primaryKey(
                             "primary_constraint", 
Collections.singletonList("id")));
@@ -217,10 +223,12 @@ class CatalogBaseTableResolutionTest {
         properties.put("schema.0.data-type", "INT NOT NULL");
         properties.put("schema.1.name", "region");
         properties.put("schema.1.data-type", "VARCHAR(200)");
+        properties.put("schema.1.comment", "This is a region column.");
         properties.put("schema.2.name", "county");
         properties.put("schema.2.data-type", "VARCHAR(200)");
         properties.put("schema.3.name", "topic");
         properties.put("schema.3.data-type", "VARCHAR(200)");
+        properties.put("schema.3.comment", "");
         properties.put("schema.3.metadata", "topic");
         properties.put("schema.3.virtual", "true");
         properties.put("schema.4.name", "orig_ts");
@@ -230,6 +238,7 @@ class CatalogBaseTableResolutionTest {
         properties.put("schema.5.name", "ts");
         properties.put("schema.5.data-type", "TIMESTAMP(3)");
         properties.put("schema.5.expr", "orig_ts - INTERVAL '60' MINUTE");
+        properties.put("schema.5.comment", "This is a computed column");
         properties.put("schema.watermark.0.rowtime", "ts");
         properties.put("schema.watermark.0.strategy.data-type", 
"TIMESTAMP(3)");
         properties.put("schema.watermark.0.strategy.expr", "ts - INTERVAL '5' 
SECOND");
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
index 1ffac582c20..1878c996c25 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
@@ -220,6 +220,7 @@ public final class CatalogPropertiesUtil {
             final String exprKey = compoundKey(SCHEMA, i, EXPR);
             final String metadataKey = compoundKey(SCHEMA, i, METADATA);
             final String virtualKey = compoundKey(SCHEMA, i, VIRTUAL);
+            final String commentKey = compoundKey(SCHEMA, i, COMMENT);
 
             final String name = getValue(map, nameKey);
 
@@ -244,6 +245,12 @@ public final class CatalogPropertiesUtil {
                 final String dataType = getValue(map, dataTypeKey);
                 builder.column(name, dataType);
             }
+
+            // column comment
+            if (map.containsKey(commentKey)) {
+                final String comment = getValue(map, commentKey);
+                builder.withComment(comment);
+            }
         }
     }
 
@@ -303,15 +310,25 @@ public final class CatalogPropertiesUtil {
         final String[] expressions = serializeColumnComputations(columns);
         final String[] metadata = serializeColumnMetadataKeys(columns);
         final String[] virtual = serializeColumnVirtuality(columns);
+        final String[] comments = serializeColumnComments(columns);
 
         final List<List<String>> values = new ArrayList<>();
         for (int i = 0; i < columns.size(); i++) {
             values.add(
-                    Arrays.asList(names[i], dataTypes[i], expressions[i], 
metadata[i], virtual[i]));
+                    Arrays.asList(
+                            names[i],
+                            dataTypes[i],
+                            expressions[i],
+                            metadata[i],
+                            virtual[i],
+                            comments[i]));
         }
 
         putIndexedProperties(
-                map, SCHEMA, Arrays.asList(NAME, DATA_TYPE, EXPR, METADATA, 
VIRTUAL), values);
+                map,
+                SCHEMA,
+                Arrays.asList(NAME, DATA_TYPE, EXPR, METADATA, VIRTUAL, 
COMMENT),
+                values);
     }
 
     private static String serializeResolvedExpression(ResolvedExpression 
resolvedExpression) {
@@ -396,6 +413,10 @@ public final class CatalogPropertiesUtil {
                 .toArray(String[]::new);
     }
 
+    private static String[] serializeColumnComments(List<Column> columns) {
+        return columns.stream().map(c -> 
c.getComment().orElse(null)).toArray(String[]::new);
+    }
+
     /**
      * Adds an indexed sequence of properties (with sub-properties) under a 
common key. It supports
      * the property's value to be null, in which case it would be ignored. The 
sub-properties should

Reply via email to