twalthr commented on a change in pull request #15096:
URL: https://github.com/apache/flink/pull/15096#discussion_r590197050



##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
##########
@@ -298,6 +303,40 @@ public DataType toPersistedRowDataType() {
         return Optional.ofNullable(primaryKey);
     }
 
+    /** Helps to migrate to the new {@link Schema} class. */
+    public Schema toSchema() {
+        final Schema.Builder builder = Schema.newBuilder();
+
+        columns.forEach(
+                column -> {
+                    if (column instanceof PhysicalColumn) {
+                        final PhysicalColumn c = (PhysicalColumn) column;
+                        builder.column(c.getName(), c.getType());
+                    } else if (column instanceof MetadataColumn) {
+                        final MetadataColumn c = (MetadataColumn) column;
+                        builder.columnByMetadata(
+                                c.getName(),
+                                c.getType(),
+                                c.getMetadataAlias().orElse(null),
+                                c.isVirtual());
+                    } else if (column instanceof ComputedColumn) {
+                        final ComputedColumn c = (ComputedColumn) column;
+                        builder.columnByExpression(c.getName(), 
c.getExpression());
+                    } else {
+                        throw new IllegalArgumentException("Unsupported column 
type: " + column);
+                    }
+                });
+
+        watermarkSpecs.forEach(
+                spec -> builder.watermark(spec.getRowtimeAttribute(), 
spec.getWatermarkExpr()));
+
+        if (primaryKey != null) {
+            builder.primaryKeyNamed(primaryKey.getName(), 
primaryKey.getColumns());

Review comment:
       We will most likely never support enforced primary keys. However, to be 
SQL compliant we added the `NOT ENFORCED` keywords. The API doesn't offer this 
because we don't support enforcing anyways. It is true that there is some 
inconsistency between Table API and SQL. But otherwise the method name would be 
`primaryKeyNotEnforcedNamed()`. So in the future we can overload this method 
with a boolean flag if necessary.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to