aokolnychyi edited a comment on issue #3941:
URL: https://github.com/apache/iceberg/issues/3941#issuecomment-1081273709


   @flyrain @RussellSpitzer and I chatted a little bit about the design and 
what a potential implementation can look like. There is a promising idea of 
using `_deleted` metadata column for building delete records.
   
   I'll summarize my current way of thinking.
   
   **Algorithm (per snapshot)**
   
   * Build insert records.
       * Read all data files added in this snapshot and apply any matching 
position delete files added in the same snapshot. Equality deletes added in the 
same snapshot cannot apply (according to the spec).
   * Build delete records.
       * Read all data files marked as deleted in this snapshot. All such 
records are considered V1 deletes.
       * Build a predicate to use when looking for data records removed by 
position deletes.
           * Scan through metadata for added position delete files and identify 
affected partitions (no need to open delete files).
           * Build a predicate using min/max values on file path (no need to 
open delete files).
           * [future] If the total number of position delete records is 
reasonably small, read all of the locations and form an IN predicate on file 
path. 
The read can be either distributed or local.
       * Build a predicate to use when looking for data records removed by 
equality deletes.
           * Scan through metadata for added equality delete files and identify 
affected partitions (no need to open delete files).
           * Build a predicate using min/max values on identity columns or sort 
keys if such are present in equality delete files.
           * [future] If the total number of equality deletes is small, read 
all deleted values and form IN predicates on their values. To make this 
efficient, we may need to add support for IN predicates on multiple columns at 
the same time. That’s will require effort given the current expression API in 
Iceberg.
       * Combine the two predicates using OR. This will be the predicate to 
look for V2 deletes.
       * Read data files that have potential V2 deletes, project `_deleted` 
column and keep only deleted records.
       * Union V1 and V2 delete records. This gives us all deletes that 
happened in this snapshot.
   * Append metadata such as `_commit_snapshot_id` or `_record_type` to both 
insert and delete records.
   * Union insert and delete records into a single DataFrame that represents a 
CDC log for this snapshot.
   
   The algorithm is simple with the only downside is that it may be expensive 
to resolve equality deletes as they may not be scoped to any partitions. If 
there is a global equality delete, resolving it and finding all records that 
were deleted will require a full table scan. In the future, we may consider 
exposing an option to include equality deletes as-is without finding actually 
deleted records. If set to false, a delete record will only contain whatever 
values are in the delete file. Outputting equality deletes as-is may be enough 
in many cases.
   
   **How to apply changes to Iceberg tables?**
   We should be able to apply a CDC log from one Iceberg table to another by 
simply converting the log to data and equality delete files without doing a 
MERGE operation.
   
   **How to apply changes to non-Iceberg tables?**
   Use a MERGE operation. Note: whenever a CDC log is fetched, consumers may 
need to collapse changes that happened across snapshots to not violate the 
cardinality check in MERGE.
   
   **Do we have to output _identity_columns if equality deletes are resolved?** 
   Probably, not. That’s why we may omit the requirement for having identity 
columns defined (at least, for now). 
   
   **What should happen if a row is added and removed in the same snapshot?**
   Skip such records by default. If configured to output such records, we may 
do so by having different `_commit_order` within the same snapshot.
   
   **Can we support pre/post images?**
   I think pre/post images can be computed if equality delete resolution is 
enabled and identity columns are defined and never changed. Then we can 
distribute and order records in a way that will co-locate deletes and inserts 
for the same key next to each other. This should be sufficient to produce pre 
and post images.
   
   **Shall we output unchanged rows that were copied over during 
copy-on-write?**
   I'd output copied over rows by default as it is technically correct: we 
remove and add back such records from the table format perspective. We may 
expose an option in the future to skip such records but it will require extra 
work. Similarly to to pre/post images, if we can co-locate deletes and inserts 
for the same key next to each other, we may skip delete and insert if no value 
has changed. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to