aokolnychyi commented on issue #3941:
URL: https://github.com/apache/iceberg/issues/3941#issuecomment-1061153554
Based on Yufei’s design doc and what we discussed during the sync, I’d like
to share my thoughts on what can be efficiently supported right now.
### Assumptions
- Changes are consumed per snapshot, rows that are added and removed in the
same snapshot can be skipped
Iceberg does not distinguish the order of records within a snapshot. Users
may manually deduce this using a data column. However, there is nothing in the
metadata to support this at the moment. I believe it is sufficient to have a
summary of what changed per snapshot for most use cases (except the audit use
case). Specifically, if we add a record and immediately delete it in the same
snapshot (Flink CDC), it is okay to skip this record and not report it in the
CDC log.
- Output only delete and insert record types
From what I see, just knowing what records to remove and to add should be
enough to support most use cases. I have an example below even if the target
system is not Iceberg. In the future, we can add pre/post images but computing
that with the current table metadata would require a join and would be
expensive.
- Table must have identity columns defined
Whenever we apply a delta log to a target table, most CDC use cases rely on
a primary key or a set of identity columns. I think it is reasonable to assume
Iceberg tables should have identity columns defined to support generation of
CDC records.
- Delete records may only include values for identity columns
It is not required to output the entire deleted record if other columns are
not stored in equality delete files. If values for other columns are not
persisted, this would require an expensive join to reconstruct the entire row.
### Unsupported
- Audit use cases as they require a seq number per row
- Pre/post update image (can be added in the future)
The CDC record can include:
```
_record_type, _commit_snapshot_id, _commit_timestamp, _commit_order,
_identity_columns, col1, col2, ...
```
### Algorithm (per snapshot)
- Build insert records
- Read all data files written in this snapshot, applying any matching
position delete files produced in the same snapshot. Equality deletes produced
in this snapshot cannot apply.
- Build delete records
- Read all data files marked as deleted in this snapshot (they always
contain values for identity columns).
- Read all equality deletes (they always contain values for identity
columns)
- Read position deletes, find the list of affected data files from
previous snapshots, read those data files, join on `_file`, `_pos` with
deletes, project identity columns.
- Append metadata such as `_commit_snapshot_id` or `_record_type`
- Union insert and delete records
The algorithm would only require one join to find identity columns for
position deletes. However, it should be fairly efficient as position deletes
are scoped to partitions and we know the exact file names to read. We don’t
have to read any other files or do a full table scan.
I think it should be sufficient to have only delete/insert record types in
most cases. A generic MERGE statement can be used to apply changes for a single
snapshot to both Iceberg and non-Iceberg tables.
```
MERGE INTO target t USING source s
ON t.id_col = s.id_col AND s._record_type = 'DELETE'
WHEN MATCHED
DELETE
WHEN NOT MATCHED
INSERT …
```
If the destination table is another Iceberg table, we may skip the MERGE
statement and write equality deletes and data files directly without querying
the destination table.
Here are a few examples (inspired by what Yufei has put together) I went
through to see if this approach is going to work (double check it, please).

--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]