rdblue commented on pull request #3323:
URL: https://github.com/apache/iceberg/pull/3323#issuecomment-962068331


   @jackye1995, thanks for taking the time to write up your thoughts. That doc 
is really helpful.
   
   I had previously been thinking about this mostly in terms of an append 
stream and v1 tables. Like @Reo-LEI notes, inline compaction for append streams 
is safe (for v1 tables) and is a good idea if you want frequent checkpoints but 
don't want a ton of small files. For v1 tables, we know that there won't be 
equality or position delete files, so we can easily compact. For v2 tables, 
append streams are pretty much like CDC streams because we may have concurrent 
writers adding equality or position delete files.
   
   That brings us back to Jack's points about adapting this idea to CDC 
streams. First, equality deletes create sequence number boundaries that make it 
difficult to compact. But I think there are still some use cases where this is 
valuable even if we only plan on compacting within a sequence number. Write 
tasks necessarily align with partitions, so compacting across tasks may still 
be valuable. For example, CDC writes to an unpartitioned table from multiple 
tasks will create a data file per task (per checkpoint) that is probably not 
full size.
   
   While there may be multiple files per partition, I think that the larger use 
case is compacting across checkpoints and that will require addressing 
compaction across sequence numbers. Let's assume that we have a fairly regular 
CDC stream for a bucketed table so that the commit for each bucket has one 
equality delete and one data file per checkpoint, and optionally a position 
delete file for the data file. That's the worst case that Jack describes, where 
there is nothing to compact within each sequence number except position 
deletes. Here are the files committed by sequence number for just one bucket 
(bucket_id = 0):
   
   | Type | Seq 107 | Seq 106 | Seq 105 | Seq 104 | Seq 103 |
   | -- | -- | -- | -- | -- | -- |
   | data | data-107.parquet | data-106.parquet | data-105.parquet | 
data-104.parquet | data-103.parquet |
   | eq-deletes | eq-107.parquet | eq-106.parquet | eq-105.parquet | 
eq-104.parquet | eq-103.parquet |
   | pos-deletes |  | pos-106.parquet |  |  |  pos-103.parquet |
   
   I think that we actually can do some compaction in this situation. Say we 
want to compact `data-103.parquet` through `data-107.parquet`. Then we need to 
apply all newer equality deletes to each data file:
   
   | File | Delete files to apply |
   | -- | -- |
   | data-103.parquet | eq-{104,105,106,107}.parquet, pos-103.parquet |
   | data-104.parquet | eq-{105,106,107}.parquet |
   | data-105.parquet | eq-{106,107}.parquet |
   | data-106.parquet | eq-107.parquet, pos-106.parquet |
   | data-107.parquet |  |
   
   The position delete files can be removed because they only reference 
data-103.parquet and data-106.parquet. The equality deletes must remain in the 
table in case they deleted data in other files. The new compacted data file 
should be data-107-compacted.parquet and should be committed at sequence number 
107 so that future equality deletes are still applied correctly.
   
   Another thing to keep in mind is that we may have equality and position 
deletes coming from concurrent writers. But in this case all we would need to 
do is update the task emitting committed data files to emit all delete files, 
even those from concurrent writers. For example, maybe sequence number 5 was 
written by a concurrent commit. Then the delete files for that commit, 
eq-105.parquet, should be used. But the data file should not be part of the 
compaction (it would be compacted by a parallel Flink job).
   
   I think that this compaction strategy would address Jack's concern and is a 
safe way to compact across sequence numbers with equality deletes. The key idea 
is that we leave the equality delete files in the table so we don't have to 
worry about existing data files that aren't part of the stream we are 
compacting.
   
   I think that what I'm proposing is similar to @Reo-LEI's comment, except 
that it limits the scope of the compaction differently. Using time-based 
partitions would make us reasonably confident that the compaction is targeted, 
but the drawback is that we would still need to plan a scan, and it may still 
be impacted by concurrent deletes. But my strategy of keeping the compaction 
scoped to just files written by one Flink job and incrementally including 
delete files is a clean way to handle the issues and ensure that compaction 
won't conflict with one another.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to