This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e22d164ff71 [FLINK-29743][table] CatalogPropertiesUtil supports
de/serializing column comment (#22131)
e22d164ff71 is described below
commit e22d164ff710571ff2708568a90a21dee194116a
Author: Jark Wu <[email protected]>
AuthorDate: Sat Mar 11 22:28:38 2023 +0800
[FLINK-29743][table] CatalogPropertiesUtil supports de/serializing column
comment (#22131)
---
.../catalog/CatalogBaseTableResolutionTest.java | 15 ++++++++++---
.../flink/table/catalog/CatalogPropertiesUtil.java | 25 ++++++++++++++++++++--
2 files changed, 35 insertions(+), 5 deletions(-)
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
index 7b5e67b4859..07b91013d3f 100644
---
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
@@ -71,10 +71,13 @@ class CatalogBaseTableResolutionTest {
Schema.newBuilder()
.column("id", DataTypes.INT().notNull())
.column("region", DataTypes.VARCHAR(200))
+ .withComment("This is a region column.")
.column("county", DataTypes.VARCHAR(200))
.columnByMetadata("topic", DataTypes.VARCHAR(200), true)
+ .withComment("") // empty column comment
.columnByMetadata("orig_ts", DataTypes.TIMESTAMP(3),
"timestamp")
.columnByExpression("ts", COMPUTED_SQL)
+ .withComment("This is a computed column")
.watermark("ts", WATERMARK_SQL)
.primaryKeyNamed("primary_constraint", "id")
.build();
@@ -102,11 +105,14 @@ class CatalogBaseTableResolutionTest {
new ResolvedSchema(
Arrays.asList(
Column.physical("id", DataTypes.INT().notNull()),
- Column.physical("region", DataTypes.VARCHAR(200)),
+ Column.physical("region", DataTypes.VARCHAR(200))
+ .withComment("This is a region column."),
Column.physical("county", DataTypes.VARCHAR(200)),
- Column.metadata("topic", DataTypes.VARCHAR(200),
null, true),
+ Column.metadata("topic", DataTypes.VARCHAR(200),
null, true)
+ .withComment(""), // empty column comment
Column.metadata("orig_ts", DataTypes.TIMESTAMP(3),
"timestamp", false),
- Column.computed("ts", COMPUTED_COLUMN_RESOLVED)),
+ Column.computed("ts", COMPUTED_COLUMN_RESOLVED)
+ .withComment("This is a computed column")),
Collections.singletonList(WatermarkSpec.of("ts",
WATERMARK_RESOLVED)),
UniqueConstraint.primaryKey(
"primary_constraint",
Collections.singletonList("id")));
@@ -217,10 +223,12 @@ class CatalogBaseTableResolutionTest {
properties.put("schema.0.data-type", "INT NOT NULL");
properties.put("schema.1.name", "region");
properties.put("schema.1.data-type", "VARCHAR(200)");
+ properties.put("schema.1.comment", "This is a region column.");
properties.put("schema.2.name", "county");
properties.put("schema.2.data-type", "VARCHAR(200)");
properties.put("schema.3.name", "topic");
properties.put("schema.3.data-type", "VARCHAR(200)");
+ properties.put("schema.3.comment", "");
properties.put("schema.3.metadata", "topic");
properties.put("schema.3.virtual", "true");
properties.put("schema.4.name", "orig_ts");
@@ -230,6 +238,7 @@ class CatalogBaseTableResolutionTest {
properties.put("schema.5.name", "ts");
properties.put("schema.5.data-type", "TIMESTAMP(3)");
properties.put("schema.5.expr", "orig_ts - INTERVAL '60' MINUTE");
+ properties.put("schema.5.comment", "This is a computed column");
properties.put("schema.watermark.0.rowtime", "ts");
properties.put("schema.watermark.0.strategy.data-type",
"TIMESTAMP(3)");
properties.put("schema.watermark.0.strategy.expr", "ts - INTERVAL '5'
SECOND");
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
index 1ffac582c20..1878c996c25 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
@@ -220,6 +220,7 @@ public final class CatalogPropertiesUtil {
final String exprKey = compoundKey(SCHEMA, i, EXPR);
final String metadataKey = compoundKey(SCHEMA, i, METADATA);
final String virtualKey = compoundKey(SCHEMA, i, VIRTUAL);
+ final String commentKey = compoundKey(SCHEMA, i, COMMENT);
final String name = getValue(map, nameKey);
@@ -244,6 +245,12 @@ public final class CatalogPropertiesUtil {
final String dataType = getValue(map, dataTypeKey);
builder.column(name, dataType);
}
+
+ // column comment
+ if (map.containsKey(commentKey)) {
+ final String comment = getValue(map, commentKey);
+ builder.withComment(comment);
+ }
}
}
@@ -303,15 +310,25 @@ public final class CatalogPropertiesUtil {
final String[] expressions = serializeColumnComputations(columns);
final String[] metadata = serializeColumnMetadataKeys(columns);
final String[] virtual = serializeColumnVirtuality(columns);
+ final String[] comments = serializeColumnComments(columns);
final List<List<String>> values = new ArrayList<>();
for (int i = 0; i < columns.size(); i++) {
values.add(
- Arrays.asList(names[i], dataTypes[i], expressions[i],
metadata[i], virtual[i]));
+ Arrays.asList(
+ names[i],
+ dataTypes[i],
+ expressions[i],
+ metadata[i],
+ virtual[i],
+ comments[i]));
}
putIndexedProperties(
- map, SCHEMA, Arrays.asList(NAME, DATA_TYPE, EXPR, METADATA,
VIRTUAL), values);
+ map,
+ SCHEMA,
+ Arrays.asList(NAME, DATA_TYPE, EXPR, METADATA, VIRTUAL,
COMMENT),
+ values);
}
private static String serializeResolvedExpression(ResolvedExpression
resolvedExpression) {
@@ -396,6 +413,10 @@ public final class CatalogPropertiesUtil {
.toArray(String[]::new);
}
+ private static String[] serializeColumnComments(List<Column> columns) {
+ return columns.stream().map(c ->
c.getComment().orElse(null)).toArray(String[]::new);
+ }
+
/**
* Adds an indexed sequence of properties (with sub-properties) under a
common key. It supports
* the property's value to be null, in which case it would be ignored. The
sub-properties should