samredai commented on a change in pull request #3432: URL: https://github.com/apache/iceberg/pull/3432#discussion_r741514786
########## File path: site/docs/cow-and-mor.md ########## @@ -0,0 +1,195 @@ +<!-- + - Licensed to the Apache Software Foundation (ASF) under one or more + - contributor license agreements. See the NOTICE file distributed with + - this work for additional information regarding copyright ownership. + - The ASF licenses this file to You under the Apache License, Version 2.0 + - (the "License"); you may not use this file except in compliance with + - the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, software + - distributed under the License is distributed on an "AS IS" BASIS, + - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + - See the License for the specific language governing permissions and + - limitations under the License. + --> + +# Copy-on-Write and Merge-on-Read + +This page explains the concept of copy-on-write and merge-on-read in the context of Iceberg to provide readers more clarity around Iceberg's table spec design. + +## Introduction + +In Iceberg, copy-on-write and merge-on-read are different ways to handle row-level update and delete operations. Here are their definitions: + +- **copy-on-write (CoW)**: an update/delete directly rewrites the entire affected data files. +- **merge-on-read (MoR)**: update/delete information is encoded in the form of delete files. The table reader can apply all delete information at read time. A compaction process takes care of merging delete files into data files asynchronously. + +Clearly, CoW is more efficient in reading data, but MoR is more efficient in writing data. +Users can choose to use **BOTH** CoW and MoR against the same Iceberg table based on different situations. +A common example is that, for a time-partitioned table, newer partitions with more frequent updates are maintained in the MoR approach through a CDC streaming pipeline, +and older partitions are maintained in the CoW way with less frequent GDPR updates from batch ETL jobs. + +## Copy-on-write + +As the definition states, given a user's update/delete requirement, the CoW write process would search for all the affected data files and perform rewrite. +Spark supports CoW `DELETE`, `UPDATE` and `MERGE` operations through Spark extensions. More details can be found in [Spark Writes](../spark-writes) page. + +## Merge-on-read + +In the next few sections, we provide more details around the Iceberg MoR design. + +### Row-Level Delete File Spec + +As documented in the [Spec](../spec/#row-level-deletes) page, Iceberg supports 2 different types of row-level delete files: **position deletes** and **equality deletes**. +If you are unfamiliar with these concepts, please read the related sections in the spec for more information before proceeding. + +Also note that because row-level delete files are valid Iceberg data files, each file must define the partition it belongs to. +If the file belongs to `Unpartitioned` (the partition spec has no partition field), then the delete file is called a **global delete**. +Otherwise, it is called a **partition delete**. + +### MoR Update as Delete + Insert + +In Iceberg, update is modeled as a delete with an insert within the same transaction, so there is no concept of an "update file". +During a MoR write transaction, new data files and delete files are committed with the same sequence number. +During a MoR read process, delete files are applied to data files of strictly lower sequence numbers. +This ensures the latest updated rows are displayed to users during a MoR read. + +### Delete File Writer + +From the end user's perspective, it is very rare that they could directly request deletion of a specific row of a specific file given the abstraction provided by Iceberg. +A delete requirement almost always comes as a predicate such as `id = 5` or `date < '2000-01-01'`. +Given the predicate, a delete writer can write delete files in one or some combinations of the following ways: + +1. **partition position deletes**: perform a scan \[1\] to know the data files and row positions affected by the predicate and then write partition \[2\] position deletes +2. **partition equality deletes**: convert input predicate \[3\] to partition equality predicates and write partition equality deletes +3. **partition global deletes**: convert input predicate to equality predicates and write global equality deletes + +\[1\] scan here can mean a table scan, or a scan of unflushed files (stored in memory, local RocksDB, etc.) for use cases like streaming + +\[2\] it is in theory possible to write global position deletes, but the writer already knows the exact partitions to write, so it is almost always preferred to write partition position deletes because it costs the same and improves the efficiency of the MoR read process. + +\[3\] if the input inequality predicate cannot be converted to a finite number of equality predicates (e.g. `price > 2.33`), then it is only possible to use position deletes instead. + +### Data File Reader with Deletes + +During MoR read time, the Iceberg reader indexes all the delete files and determines the associated delete files for each data file. Typically, + +- as described before, delete files are only applied to data files that has a strictly lower sequence number +- global deletes are associated with every data file, so it is desirable to have as few global deletes as possible +- partition deletes are pruned based on their statistics and secondary index information so that each data file is associated with the minimum number of necessary delete files possible. + +Because position deletes must be sorted by file path and row position, applying position deletes to data files can be done by streaming the rows in position deletes. +Therefore, there is not too much burden on memory side, but the number of IO increases as the number of position delete files increases, so it's desirable to have a low number of position deletes for each data file. + +For equality deletes, all the equality predicates must be loaded to memory to check against every row in data files. +Therefore, having too many equality deletes can have the risk of running out of memory. +It is possible to use an external storage like RocksDB to store equality predicates, but too many equality deletes would still have huge impact to the read performance. + +Because of this difference, position deletes tend to be more efficient in reading than equality deletes. +This has an impact to our delete compaction strategies. + +### MoR delete compaction + +After being able to read and write delete files, users need to run delete compactions to optimize the read performance. +There are 3 common Iceberg delete compactions: + +#### Rewrite position deletes (REWRITE) + +As we discussed in the [reader](#data-with-delete-reader) section, having fewer position deletes per data file is computationally more efficient. +This compaction optimizes the positions deletes layout, such as combining all position deletes in a single partition to a single file. + +#### Convert equality deletes (CONVERT) + +As we discussed in the [reader](#data-with-delete-reader) section, equality deletes has the potential to introduce out-of-memory issue and should be removed as soon as possible. +This compaction converts equality deletes to position deletes by performing a table scan for the predicates in the equality deletes to figure out the affected file and row positions. + +#### Merge deletes (MERGE) + +This compaction merges deletes into data files and removes the deletes. +This is likely the definition of delete compaction that most people think of first. + +Note that Iceberg also offers [data compaction](../maintenance/#compact-data-files) as a separated concept to optimize data file layout. +As a part of reading data files, the Iceberg reader already merges deletes into the new data file it produces, +so data compaction also "merges" deletes, but it does not remove any delete file because the delete file might still reference some data files not rewritten. +However, MERGE guarantees a delete file can be removed safely by rewriting all the data files affected by it. + +It is arguable that if a table is continuously compacted using data compaction, it is possible that for a delete file, +all the data files it references are already rewritten, and there is no data file of a lower sequence number anymore. +Therefore, instead of MERGE, a user can also choose to only run data compaction with an additional procedure that periodically expires delete files that do not reference any data files. + +However, there is one more feature that data compaction does not provide but MERGE provides, +that is to rewrite data files while preserving the sequence number. +In data compaction, data files are always rewritten with a new sequence number. +However, when there are many deletes coming at the same time, this would result in a lot of commit conflicts. + +Considering the following case where 2 processes P1 and P2 both starts using snapshot S0: + +``` +S0: data.1 data.2 data.3 delete.3 + \ / \ / +P1: data.4 data.5 (data compaction) + +P2: data.1 data.2 data.3 delete.3 + delete.3_2 (update) +``` + +If P2 commits after P1: + +- if `delete.3_2` is a position delete, the update has to be retried from beginning because the old file is removed +- if `delete.3_2` is an equality delete, the update can succeed after retry because `delete.3_2` will also affect `data.5` + +If P1 commits after P2: + +- if `delete.3_2` is a position delete, the compaction has to be retried from beginning because it removes `data.3` which undeletes `delete.3_2`. +- if `delete.3_2` is an equality delete, the compaction has to be retried from beginning because it writes `data.5` with a higher sequence number than `delete.3_2` which undeletes `delete.3_2` + +To solve the issue, we allow MERGE to preserve sequence number. +With this additional feature, we do not need to retry compaction if P1 commits after P2 and `delete.3_2` is an equality delete, +because `data.5` has a lower sequence number than `delete.3_2` so the deletion is still effective. + +This is particularly useful in the CDC streaming case, which is discussed more in the next section. + +### MoR Use Case Analysis + +Now we analyze 2 common MoR use cases in Iceberg. + +#### Use case 1: GDPR updates + +From time to time, users would like to request the removal of all their historical data which has the following characteristics: + +1. there is a relatively long grace period allowed before hard data deletion, like 1-3 months +2. because of that, requests could potentially be processed in batch +2. the data to delete spans across multiple partitions in a table + +CoW is a viable option for users with GDPR requirements, but MoR can also be used for such use case if there are specific SLA requirements around aspects such as logical delete latency. +At write time, the system can can make the following choice: + +1. write global equality deletes on `userId`. +2. if possible, figure out the related partition predicates and write partition equality deletes. +3. perform a scan and write position deletes, the time taken would be similar to a query using the same predicate. + +Note that faster write would usually mean slower read and more compaction needed later. +Here are the possible compaction actions: + +1. run CONVERT to remove equality deletes and avoid out-of-memory risk +2. run REWRITE if the system would like to batch more deletes without doing a MERGE +3. run MERGE or data compaction to clean up deletes + +The choice of the delete type to write and compactions to perform depends on the actual SLA of the system. +Iceberg offers all these tools for users to flexibly customize based on their own needs. + +#### Use case 2: CDC Streaming + +In CDC streaming, for each new event, writer writes a new data row in memory (or temp storage like RocksDB) with: + +- a new equality delete row to remove any duplicated rows from existing Iceberg data files in storage, the equality constraint is formulated based on the primary key (Iceberg identifier fields) Review comment: Maybe hard stop after storage and start "The equality constraint is..." as a new sentence? ########## File path: site/docs/cow-and-mor.md ########## @@ -0,0 +1,195 @@ +<!-- + - Licensed to the Apache Software Foundation (ASF) under one or more + - contributor license agreements. See the NOTICE file distributed with + - this work for additional information regarding copyright ownership. + - The ASF licenses this file to You under the Apache License, Version 2.0 + - (the "License"); you may not use this file except in compliance with + - the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, software + - distributed under the License is distributed on an "AS IS" BASIS, + - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + - See the License for the specific language governing permissions and + - limitations under the License. + --> + +# Copy-on-Write and Merge-on-Read + +This page explains the concept of copy-on-write and merge-on-read in the context of Iceberg to provide readers more clarity around Iceberg's table spec design. + +## Introduction + +In Iceberg, copy-on-write and merge-on-read are different ways to handle row-level update and delete operations. Here are their definitions: + +- **copy-on-write (CoW)**: an update/delete directly rewrites the entire affected data files. +- **merge-on-read (MoR)**: update/delete information is encoded in the form of delete files. The table reader can apply all delete information at read time. A compaction process takes care of merging delete files into data files asynchronously. + +Clearly, CoW is more efficient in reading data, but MoR is more efficient in writing data. +Users can choose to use **BOTH** CoW and MoR against the same Iceberg table based on different situations. +A common example is that, for a time-partitioned table, newer partitions with more frequent updates are maintained in the MoR approach through a CDC streaming pipeline, +and older partitions are maintained in the CoW way with less frequent GDPR updates from batch ETL jobs. + +## Copy-on-write + +As the definition states, given a user's update/delete requirement, the CoW write process would search for all the affected data files and perform rewrite. +Spark supports CoW `DELETE`, `UPDATE` and `MERGE` operations through Spark extensions. More details can be found in [Spark Writes](../spark-writes) page. + +## Merge-on-read + +In the next few sections, we provide more details around the Iceberg MoR design. + +### Row-Level Delete File Spec + +As documented in the [Spec](../spec/#row-level-deletes) page, Iceberg supports 2 different types of row-level delete files: **position deletes** and **equality deletes**. +If you are unfamiliar with these concepts, please read the related sections in the spec for more information before proceeding. + +Also note that because row-level delete files are valid Iceberg data files, each file must define the partition it belongs to. +If the file belongs to `Unpartitioned` (the partition spec has no partition field), then the delete file is called a **global delete**. +Otherwise, it is called a **partition delete**. + +### MoR Update as Delete + Insert + +In Iceberg, update is modeled as a delete with an insert within the same transaction, so there is no concept of an "update file". +During a MoR write transaction, new data files and delete files are committed with the same sequence number. +During a MoR read process, delete files are applied to data files of strictly lower sequence numbers. +This ensures the latest updated rows are displayed to users during a MoR read. + +### Delete File Writer + +From the end user's perspective, it is very rare that they could directly request deletion of a specific row of a specific file given the abstraction provided by Iceberg. +A delete requirement almost always comes as a predicate such as `id = 5` or `date < '2000-01-01'`. +Given the predicate, a delete writer can write delete files in one or some combinations of the following ways: + +1. **partition position deletes**: perform a scan \[1\] to know the data files and row positions affected by the predicate and then write partition \[2\] position deletes +2. **partition equality deletes**: convert input predicate \[3\] to partition equality predicates and write partition equality deletes +3. **partition global deletes**: convert input predicate to equality predicates and write global equality deletes + +\[1\] scan here can mean a table scan, or a scan of unflushed files (stored in memory, local RocksDB, etc.) for use cases like streaming + +\[2\] it is in theory possible to write global position deletes, but the writer already knows the exact partitions to write, so it is almost always preferred to write partition position deletes because it costs the same and improves the efficiency of the MoR read process. + +\[3\] if the input inequality predicate cannot be converted to a finite number of equality predicates (e.g. `price > 2.33`), then it is only possible to use position deletes instead. + +### Data File Reader with Deletes + +During MoR read time, the Iceberg reader indexes all the delete files and determines the associated delete files for each data file. Typically, + +- as described before, delete files are only applied to data files that has a strictly lower sequence number +- global deletes are associated with every data file, so it is desirable to have as few global deletes as possible +- partition deletes are pruned based on their statistics and secondary index information so that each data file is associated with the minimum number of necessary delete files possible. + +Because position deletes must be sorted by file path and row position, applying position deletes to data files can be done by streaming the rows in position deletes. +Therefore, there is not too much burden on memory side, but the number of IO increases as the number of position delete files increases, so it's desirable to have a low number of position deletes for each data file. + +For equality deletes, all the equality predicates must be loaded to memory to check against every row in data files. +Therefore, having too many equality deletes can have the risk of running out of memory. +It is possible to use an external storage like RocksDB to store equality predicates, but too many equality deletes would still have huge impact to the read performance. + +Because of this difference, position deletes tend to be more efficient in reading than equality deletes. +This has an impact to our delete compaction strategies. + +### MoR delete compaction + +After being able to read and write delete files, users need to run delete compactions to optimize the read performance. +There are 3 common Iceberg delete compactions: + +#### Rewrite position deletes (REWRITE) + +As we discussed in the [reader](#data-with-delete-reader) section, having fewer position deletes per data file is computationally more efficient. +This compaction optimizes the positions deletes layout, such as combining all position deletes in a single partition to a single file. + +#### Convert equality deletes (CONVERT) + +As we discussed in the [reader](#data-with-delete-reader) section, equality deletes has the potential to introduce out-of-memory issue and should be removed as soon as possible. +This compaction converts equality deletes to position deletes by performing a table scan for the predicates in the equality deletes to figure out the affected file and row positions. + +#### Merge deletes (MERGE) + +This compaction merges deletes into data files and removes the deletes. +This is likely the definition of delete compaction that most people think of first. + +Note that Iceberg also offers [data compaction](../maintenance/#compact-data-files) as a separated concept to optimize data file layout. +As a part of reading data files, the Iceberg reader already merges deletes into the new data file it produces, +so data compaction also "merges" deletes, but it does not remove any delete file because the delete file might still reference some data files not rewritten. +However, MERGE guarantees a delete file can be removed safely by rewriting all the data files affected by it. + +It is arguable that if a table is continuously compacted using data compaction, it is possible that for a delete file, +all the data files it references are already rewritten, and there is no data file of a lower sequence number anymore. +Therefore, instead of MERGE, a user can also choose to only run data compaction with an additional procedure that periodically expires delete files that do not reference any data files. + +However, there is one more feature that data compaction does not provide but MERGE provides, +that is to rewrite data files while preserving the sequence number. +In data compaction, data files are always rewritten with a new sequence number. +However, when there are many deletes coming at the same time, this would result in a lot of commit conflicts. + +Considering the following case where 2 processes P1 and P2 both starts using snapshot S0: + +``` +S0: data.1 data.2 data.3 delete.3 + \ / \ / +P1: data.4 data.5 (data compaction) + +P2: data.1 data.2 data.3 delete.3 + delete.3_2 (update) +``` + +If P2 commits after P1: + +- if `delete.3_2` is a position delete, the update has to be retried from beginning because the old file is removed +- if `delete.3_2` is an equality delete, the update can succeed after retry because `delete.3_2` will also affect `data.5` + +If P1 commits after P2: + +- if `delete.3_2` is a position delete, the compaction has to be retried from beginning because it removes `data.3` which undeletes `delete.3_2`. +- if `delete.3_2` is an equality delete, the compaction has to be retried from beginning because it writes `data.5` with a higher sequence number than `delete.3_2` which undeletes `delete.3_2` + +To solve the issue, we allow MERGE to preserve sequence number. +With this additional feature, we do not need to retry compaction if P1 commits after P2 and `delete.3_2` is an equality delete, +because `data.5` has a lower sequence number than `delete.3_2` so the deletion is still effective. + +This is particularly useful in the CDC streaming case, which is discussed more in the next section. + +### MoR Use Case Analysis + +Now we analyze 2 common MoR use cases in Iceberg. + +#### Use case 1: GDPR updates + +From time to time, users would like to request the removal of all their historical data which has the following characteristics: + +1. there is a relatively long grace period allowed before hard data deletion, like 1-3 months +2. because of that, requests could potentially be processed in batch +2. the data to delete spans across multiple partitions in a table + +CoW is a viable option for users with GDPR requirements, but MoR can also be used for such use case if there are specific SLA requirements around aspects such as logical delete latency. +At write time, the system can can make the following choice: + +1. write global equality deletes on `userId`. +2. if possible, figure out the related partition predicates and write partition equality deletes. +3. perform a scan and write position deletes, the time taken would be similar to a query using the same predicate. + +Note that faster write would usually mean slower read and more compaction needed later. +Here are the possible compaction actions: + +1. run CONVERT to remove equality deletes and avoid out-of-memory risk +2. run REWRITE if the system would like to batch more deletes without doing a MERGE +3. run MERGE or data compaction to clean up deletes + +The choice of the delete type to write and compactions to perform depends on the actual SLA of the system. +Iceberg offers all these tools for users to flexibly customize based on their own needs. + +#### Use case 2: CDC Streaming + +In CDC streaming, for each new event, writer writes a new data row in memory (or temp storage like RocksDB) with: + +- a new equality delete row to remove any duplicated rows from existing Iceberg data files in storage, the equality constraint is formulated based on the primary key (Iceberg identifier fields) +- a new position delete row if the unflushed data in memory also has a row of the same primary key + +When flushing, each new data file is associated with 0 or 1 position delete, and new equality deletes are added against existing data files in storage. +This CDC flush behavior ensures that each data file in storage MUST BE associated with only 0 or 1 position delete, and any new deletes will come in the form of equality deletes. +With these critical characteristics, MERGE with preserved sequence number could safely remove delete files without commit conflicts, and any update commit can safely retry optimistically. + +If users would like to use CDC streaming and GDPR updates at the same time, it is recommended that for hot partitions with very frequent CDC updates, +equality deletes is used instead of position deletes to minimize MERGE compaction conflicts. Review comment: s/is used/are used -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org