openinx commented on a change in pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410#discussion_r631515294
##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java
##########
@@ -61,7 +65,22 @@ public static Schema convert(TableSchema schema) {
RowType root = (RowType) schemaType;
Type converted = root.accept(new FlinkTypeToType(root));
- return new Schema(converted.asStructType().fields());
+ Schema iSchema = new Schema(converted.asStructType().fields());
+ return freshIdentifierFieldIds(iSchema, schema);
+ }
+
+ private static Schema freshIdentifierFieldIds(Schema iSchema, TableSchema
schema) {
Review comment:
I don't think we can reuse the
`TypeUtil.refreshIdentifierFields(Types.StructType freshSchema, Schema
baseSchema)` because in this method we've had an existing identifier field id
list inside the `baseSchema`. While for this case, the identifier fields are
actually came from flink's `TableSchema`, converting the flink's `TableSchema`
with primary keys to the iceberg table `schema` with identifier field id list
is exactly the thing we are trying to accomplish in this method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]