aokolnychyi commented on issue #3941:
URL: https://github.com/apache/iceberg/issues/3941#issuecomment-1061153554


   Based on Yufei’s design doc and what we discussed during the sync, I’d like 
to share my thoughts on what can be efficiently supported right now.
   
   ### Assumptions
   
   - Changes are consumed per snapshot, rows that are added and removed in the 
same snapshot can be skipped
   
   Iceberg does not distinguish the order of records within a snapshot. Users 
may manually deduce this using a data column. However, there is nothing in the 
metadata to support this at the moment. I believe it is sufficient to have a 
summary of what changed per snapshot for most use cases (except the audit use 
case). Specifically, if we add a record and immediately delete it in the same 
snapshot (Flink CDC), it is okay to skip this record and not report it in the 
CDC log.
   
   - Output only delete and insert record types
   
   From what I see, just knowing what records to remove and to add should be 
enough to support most use cases. I have an example below even if the target 
system is not Iceberg. In the future, we can add pre/post images but computing 
that with the current table metadata would require a join and would be 
expensive.
   
   - Table must have identity columns defined
   
   Whenever we apply a delta log to a target table, most CDC use cases rely on 
a primary key or a set of identity columns. I think it is reasonable to assume 
Iceberg tables should have identity columns defined to support generation of 
CDC records.
   
   - Delete records may only include values for identity columns
    
   It is not required to output the entire deleted record if other columns are 
not stored in equality delete files. If values for other columns are not 
persisted, this would require an expensive join to reconstruct the entire row.
   
   ### Unsupported
   
   - Audit use cases as they require a seq number per row
   - Pre/post update image (can be added in the future)
   
   The CDC record can include: 
   ```
   _record_type, _commit_snapshot_id, _commit_timestamp, _commit_order, 
_identity_columns, col1, col2, ...
   ```
   ### Algorithm (per snapshot)
   
   - Build insert records
        - Read all data files written in this snapshot, applying any matching 
position delete files produced in the same snapshot. Equality deletes produced 
in this snapshot cannot apply.
   - Build delete records
        - Read all data files marked as deleted in this snapshot (they always 
contain values for identity columns).
        - Read all equality deletes (they always contain values for identity 
columns)
        - Read position deletes, find the list of affected data files from 
previous snapshots, read those data files, join on `_file`, `_pos` with 
deletes, project identity columns.
   - Append metadata such as `_commit_snapshot_id` or `_record_type`
   - Union insert and delete records 
   
   The algorithm would only require one join to find identity columns for 
position deletes. However, it should be fairly efficient as position deletes 
are scoped to partitions and we know the exact file names to read. We don’t 
have to read any other files or do a full table scan.
   
   I think it should be sufficient to have only delete/insert record types in 
most cases. A generic MERGE statement can be used to apply changes for a single 
snapshot to both Iceberg and non-Iceberg tables.
   
   ```
   MERGE INTO target t USING source s
   ON t.id_col = s.id_col AND s._record_type = 'DELETE'
   WHEN MATCHED
     DELETE
   WHEN NOT MATCHED
     INSERT …
   ```
   
   If the destination table is another Iceberg table, we may skip the MERGE 
statement and write equality deletes and data files directly without querying 
the destination table.
   
   Here are a few examples (inspired by what Yufei has put together) I went 
through to see if this approach is going to work (double check it, please).
   
   
![image](https://user-images.githubusercontent.com/6235869/157117118-446f7ecd-77ca-48ae-b250-3254c1dbf529.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to