liyubin117 commented on code in PR #21182:
URL: https://github.com/apache/flink/pull/21182#discussion_r1031327314
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java:
##########
@@ -346,6 +346,41 @@ public Schema toSchema() {
return builder.build();
}
+ /** Helps to migrate to the new {@link Schema} class, retain comments when
needed. */
+ public Schema toSchema(Map<String, String> comments) {
+ final Schema.Builder builder = Schema.newBuilder();
+
+ columns.forEach(
+ column -> {
+ if (column instanceof PhysicalColumn) {
+ final PhysicalColumn c = (PhysicalColumn) column;
+ builder.column(c.getName(), c.getType());
+ } else if (column instanceof MetadataColumn) {
+ final MetadataColumn c = (MetadataColumn) column;
+ builder.columnByMetadata(
+ c.getName(),
+ c.getType(),
+ c.getMetadataAlias().orElse(null),
+ c.isVirtual());
+ } else if (column instanceof ComputedColumn) {
+ final ComputedColumn c = (ComputedColumn) column;
+ builder.columnByExpression(c.getName(),
c.getExpression());
+ } else {
+ throw new IllegalArgumentException("Unsupported column
type: " + column);
+ }
+ builder.withComment(comments.get(column.getName()));
Review Comment:
Thanks for your kind reminds, we don't need to invoke `withComment` when
comment is null, I have fixed it.
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -1600,23 +1619,38 @@ private Object[][] buildTableColumns(ResolvedSchema
schema) {
"PRI(%s)",
String.join(", ",
columns))));
});
-
+ boolean nonComments = isSchemaNonColumnComments(schema);
return schema.getColumns().stream()
.map(
(c) -> {
final LogicalType logicalType =
c.getDataType().getLogicalType();
+ if (nonComments) {
+ return new Object[] {
+ c.getName(),
+ logicalType.copy(true).asSummaryString(),
+ logicalType.isNullable(),
+
fieldToPrimaryKey.getOrDefault(c.getName(), null),
+ c.explainExtras().orElse(null),
+ fieldToWatermark.getOrDefault(c.getName(),
null)
+ };
+ }
return new Object[] {
c.getName(),
logicalType.copy(true).asSummaryString(),
logicalType.isNullable(),
fieldToPrimaryKey.getOrDefault(c.getName(),
null),
c.explainExtras().orElse(null),
- fieldToWatermark.getOrDefault(c.getName(),
null)
+ fieldToWatermark.getOrDefault(c.getName(),
null),
+ c.getComment().orElse(null)
};
})
.toArray(Object[][]::new);
}
+ private boolean isSchemaNonColumnComments(ResolvedSchema schema) {
+ return schema.getColumns().stream().noneMatch(col ->
col.getComment().isPresent());
Review Comment:
done :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]