wuwenchi commented on code in PR #4625:
URL: https://github.com/apache/iceberg/pull/4625#discussion_r865682252
##########
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java:
##########
@@ -172,4 +217,110 @@ public static TableSchema toSchema(Schema schema) {
return builder.build();
}
+
+
+ /**
+ * Convert a {@link Schema} to a {@link Schema}.
+ *
+ * @param schema iceberg schema to convert.
+ * @return Flink Schema.
+ */
+ public static org.apache.flink.table.api.Schema toSchema(Schema schema,
Map<String, String> properties) {
+
+ org.apache.flink.table.api.Schema.Builder builder =
org.apache.flink.table.api.Schema.newBuilder();
+
+ // get watermark and computed columns
+ Map<String, String> watermarkMap = Maps.newHashMap();
+ Map<String, String> computedColumnsMap = Maps.newHashMap();
+ properties.keySet().stream()
+ .filter(k -> k.startsWith(FLINK_PREFIX) && properties.get(k) != null)
+ .forEach(k -> {
+ final String name = k.substring(k.lastIndexOf('.') + 1);
+ String expr = properties.get(k);
+ if (k.startsWith(WATERMARK_PREFIX)) {
+ watermarkMap.put(name, expr);
+ } else if (k.startsWith(COMPUTED_COLUMNS_PREFIX)) {
+ computedColumnsMap.put(name, expr);
+ }
+ });
+
+ // add physical columns.
+ for (RowType.RowField field : convert(schema).getFields()) {
+ builder.column(field.getName(),
TypeConversions.fromLogicalToDataType(field.getType()));
+ }
+
+ // add computed columns.
+ computedColumnsMap.forEach(builder::columnByExpression);
+
+ // add watermarks.
+ watermarkMap.forEach(builder::watermark);
+
+ // add primary key.
+ List<String> primaryKey = getPrimaryKeyFromSchema(schema);
+ if (!primaryKey.isEmpty()) {
+ builder.primaryKey(primaryKey.toArray(new String[0]));
+ }
+
+ return builder.build();
+ }
+
+ /**
+ * Convert a {@link CatalogTable} to a {@link ResolvedSchema}.
+ *
+ * @param table flink unresolved schema to convert.
+ * @return Flink ResolvedSchema.
+ */
+ public static ResolvedSchema convertToResolvedSchema(CatalogTable table) {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
+ StreamTableEnvironment streamTableEnvironment =
StreamTableEnvironment.create(env);
+ CatalogManager catalogManager = ((TableEnvironmentImpl)
streamTableEnvironment).getCatalogManager();
+ SchemaResolver schemaResolver = catalogManager.getSchemaResolver();
+ return table.getUnresolvedSchema().resolve(schemaResolver);
Review Comment:
This method is indeed better!
But the description is taken from the table's comment:
```
@Override
public Optional<String> getDescription() {
return Optional.of(getComment());
}
```
The comment does not necessarily exist, and if it exists, it is not
necessarily the path of the table, so `tableEnvironment.from` may not be able
to get the table...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]