Airblader commented on a change in pull request #15096:
URL: https://github.com/apache/flink/pull/15096#discussion_r590129102
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
##########
@@ -298,6 +303,40 @@ public DataType toPersistedRowDataType() {
return Optional.ofNullable(primaryKey);
}
+ /** Helps to migrate to the new {@link Schema} class. */
+ public Schema toSchema() {
+ final Schema.Builder builder = Schema.newBuilder();
+
+ columns.forEach(
+ column -> {
+ if (column instanceof PhysicalColumn) {
+ final PhysicalColumn c = (PhysicalColumn) column;
+ builder.column(c.getName(), c.getType());
+ } else if (column instanceof MetadataColumn) {
+ final MetadataColumn c = (MetadataColumn) column;
+ builder.columnByMetadata(
+ c.getName(),
+ c.getType(),
+ c.getMetadataAlias().orElse(null),
+ c.isVirtual());
+ } else if (column instanceof ComputedColumn) {
+ final ComputedColumn c = (ComputedColumn) column;
+ builder.columnByExpression(c.getName(),
c.getExpression());
+ } else {
+ throw new IllegalArgumentException("Unsupported column
type: " + column);
+ }
+ });
+
+ watermarkSpecs.forEach(
+ spec -> builder.watermark(spec.getRowtimeAttribute(),
spec.getWatermarkExpr()));
+
+ if (primaryKey != null) {
+ builder.primaryKeyNamed(primaryKey.getName(),
primaryKey.getColumns());
Review comment:
This is a bit of a side note, but parts of Flink have the enforced flag
for constraints already, but other parts don't allow setting it, and we don't
handle it here either. It might be worth thinking about consistently (not)
having this?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]