Hi Team,
There is ongoing work to bring Flink Table Maintenance to Iceberg [1]. We
already merged the main infrastructure and are currently working on
implementing the data file rewrite [2]. During the implementation we found
that part of the compaction planning implemented for Spark compaction,
could and should, be reused in Flink as well. Created a PR [3] to bring
those changes to the core Iceberg.
The main changes in the API:
- We need to separate the companction planning from the rewrite execution
- The planning would collect the files to be compacted and organize
them to compaction tasks/groups. This could be reused (in the same way as
the query planning)
- The rewrite would actually execute the rewrite. This needs to
contain engine specific code, so we need to have separate implementation
for in for the separate engines
- We need to decide on the new compaction planning API
The planning currently generates the data for multiple levels:
1. Plan level
- Statistics about the plan:
- Total group count
- Group count in a partition
- Target file size
- Output specification id - only relevant in case of the data rewrite
plan
2. Group level
- General group info
- Global index
- Partition index
- Partition value
- List of tasks to read the data
- Split size - reader input split size when rewriting (Spark
specific)
- Number of expected output files - used to calculate shuffling
partition numbers (Spark specific)
I see the following decision points:
- Data organization:
1. Plan is the 'result' - everything below that is only organized
based on the multiplicity of the data. So if some value applies to every
group, then that value belongs to the 'global' plan variables. If a value
is different for every group, then that value belongs to the
group (current
code)
2. The group should contain every information which is required for a
single job. So the job (executor) only receives a single group and every
other bit of information is global. The drawback is that some information
is duplicated, but cleaner on the executor side.
- Parameter handling:
1. Use string maps, like we do with the FileRewriter.options - this
allows for more generic API which will be more stable
2. Use typed, named parameters - when the API is changing the users
might have breaking code, but could easily spot the changes
- Engine specific parameter handling:
1. We generate a common set of parameters
2. Engines get the whole compaction configuration, and can have their
own parameter generators
Currently I am leaning towards:
- Data organization - 2 - group should contain every information
- Parameter handling - 2 - specific types and named parameters
- Engine specific parameters - 1 - create a common set of parameters
Your thoughts?
Thanks,
Peter
[1] - https://github.com/apache/iceberg/issues/10264
[2] - https://github.com/apache/iceberg/pull/11497
[3] - https://github.com/apache/iceberg/pull/11513