Hi folks, Imagine a data processing job, which reads from a data source and outputs to Iceberg tables. The catch is that table name, schema / spec, branch, etc. are only available at runtime in the data itself. Many users have worked around this by writing the data to a temporary destination, then spawning off jobs with the correct Iceberg writer configuration. This results in quite some operational overhead.
Enter the Flink Dynamic Iceberg Sink [1] [2] The Flink Dynamic Iceberg Sink can write to any number of tables, create new tables, and evolve the table schema and partition spec. Schema and partition spec are constructed dynamically and provided as input to the sink. Let's focus on the schema evolution (the partition spec evolution works similarly): We compare the input schema to the table schema to assess their compatibility [3]. The fields are compared using their fully-qualified names. This results in one of the following: a. SAME: The schemas are semantically identical, the table schema can be used as-is for writing. b. DATA_ADAPTION_NEEDED: The schemas are not semantically identical, but it is sufficient to adjust the data before writing it with the table schema (e.g. widen a type or add a null value for an optional field) [5]. c. SCHEMA_UPDATE_NEEDED: The table schema needs to be updated to reflect the changes from the input schema For (c), we derive a set of operations to evolve the table schema to be compatible with the input schema, which are then executed with Iceberg's UpdateSchema API [4]. Schema updates are limited to the Iceberg compatible schema changes. In its current implementation, we don't allow deletes or renames, but we support adding new fields and type widening existing fields. The reason is that we want to prevent incompatible changes, e.g. dropping a column that might be re-added later which would result in an entirely different column. Renames are not possible due matching fields by name, but it could theoretically be supported by other means. I wonder, would it make sense to have schema / partition spec evolution also available to other execution engines? Cheers, Max [1] https://docs.google.com/document/d/1R3NZmi65S4lwnmNjH4gLCuXZbgvZV5GNrQKJ5NYdO9s/edit [2] https://github.com/apache/iceberg/pull/12424 [3] https://github.com/apache/iceberg/blob/372a9eac6e22d7de550e4b71ddcad4dc04de1e3f/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java [4] https://github.com/apache/iceberg/blob/372a9eac6e22d7de550e4b71ddcad4dc04de1e3f/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/EvolveSchemaVisitor.java [5] https://github.com/mxm/iceberg/blob/d7e65c833e19169388d4e9173d3cb4f1971b6519/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/RowDataEvolver.java#L43