Hi folks,

Imagine a data processing job, which reads from a data source and
outputs to Iceberg tables. The catch is that table name, schema /
spec, branch, etc. are only available at runtime in the data itself.
Many users have worked around this by writing the data to a temporary
destination, then spawning off jobs with the correct Iceberg writer
configuration. This results in quite some operational overhead.

Enter the Flink Dynamic Iceberg Sink [1] [2]

The Flink Dynamic Iceberg Sink can write to any number of tables,
create new tables, and evolve the table schema and partition spec.
Schema and partition spec are constructed dynamically and provided as
input to the sink.

Let's focus on the schema evolution (the partition spec evolution
works similarly): We compare the input schema to the table schema to
assess their compatibility [3]. The fields are compared using their
fully-qualified names. This results in one of the following:

a. SAME: The schemas are semantically identical, the table schema can
be used as-is for writing.
b. DATA_ADAPTION_NEEDED: The schemas are not semantically identical,
but it is sufficient to adjust the data before writing it with the
table schema (e.g. widen a type or add a null value for an optional
field) [5].
c. SCHEMA_UPDATE_NEEDED: The table schema needs to be updated to
reflect the changes from the input schema

For (c), we derive a set of operations to evolve the table schema to
be compatible with the input schema, which are then executed with
Iceberg's UpdateSchema API [4]. Schema updates are limited to the
Iceberg compatible schema changes. In its current implementation, we
don't allow deletes or renames, but we support adding new fields and
type widening existing fields. The reason is that we want to prevent
incompatible changes, e.g. dropping a column that might be re-added
later which would result in an entirely different column. Renames are
not possible due matching fields by name, but it could theoretically
be supported by other means.

I wonder, would it make sense to have schema / partition spec
evolution also available to other execution engines?

Cheers,
Max

[1] 
https://docs.google.com/document/d/1R3NZmi65S4lwnmNjH4gLCuXZbgvZV5GNrQKJ5NYdO9s/edit
[2] https://github.com/apache/iceberg/pull/12424
[3] 
https://github.com/apache/iceberg/blob/372a9eac6e22d7de550e4b71ddcad4dc04de1e3f/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java
[4] 
https://github.com/apache/iceberg/blob/372a9eac6e22d7de550e4b71ddcad4dc04de1e3f/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/EvolveSchemaVisitor.java
[5] 
https://github.com/mxm/iceberg/blob/d7e65c833e19169388d4e9173d3cb4f1971b6519/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/RowDataEvolver.java#L43

Reply via email to