pvary commented on code in PR #14728:
URL: https://github.com/apache/iceberg/pull/14728#discussion_r2581596634
##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java:
##########
@@ -97,6 +101,11 @@ public Result struct(Types.StructType struct, Integer
tableSchemaId, List<Result
}
if (struct.fields().size() !=
tableSchemaType.asStructType().fields().size()) {
+ if (dropUnusedColumns
+ && struct.fields().size() <
tableSchemaType.asStructType().fields().size()) {
+ // We need to drop fields
+ return Result.SCHEMA_UPDATE_NEEDED;
Review Comment:
Why do we need the `SCHEMA_UPDATE_NEEDED` for `dropUnusedColumns`?
IIUC the use-case is that previously we had a schema which contained all of
the columns, but we received an new schema without the column. My understanding
was that in this case we add a new file with the old schema.
So as an example:
- Table has S1 schema
- R1 arrives with S1 schema - R1 is written to F1
- R2 arrives with S2 schema (added new column) - R2 is written to F2
- R3 arrives with S1 schema - R3 is written to F1
- R4 arrives with S2 schema - R4 is written to F2
Why do we want to change the row?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]