rdblue commented on pull request #3323:
URL: https://github.com/apache/iceberg/pull/3323#issuecomment-962068331
@jackye1995, thanks for taking the time to write up your thoughts. That doc
is really helpful.
I had previously been thinking about this mostly in terms of an append
stream and v1 tables. Like @Reo-LEI notes, inline compaction for append streams
is safe (for v1 tables) and is a good idea if you want frequent checkpoints but
don't want a ton of small files. For v1 tables, we know that there won't be
equality or position delete files, so we can easily compact. For v2 tables,
append streams are pretty much like CDC streams because we may have concurrent
writers adding equality or position delete files.
That brings us back to Jack's points about adapting this idea to CDC
streams. First, equality deletes create sequence number boundaries that make it
difficult to compact. But I think there are still some use cases where this is
valuable even if we only plan on compacting within a sequence number. Write
tasks necessarily align with partitions, so compacting across tasks may still
be valuable. For example, CDC writes to an unpartitioned table from multiple
tasks will create a data file per task (per checkpoint) that is probably not
full size.
While there may be multiple files per partition, I think that the larger use
case is compacting across checkpoints and that will require addressing
compaction across sequence numbers. Let's assume that we have a fairly regular
CDC stream for a bucketed table so that the commit for each bucket has one
equality delete and one data file per checkpoint, and optionally a position
delete file for the data file. That's the worst case that Jack describes, where
there is nothing to compact within each sequence number except position
deletes. Here are the files committed by sequence number for just one bucket
(bucket_id = 0):
| Type | Seq 107 | Seq 106 | Seq 105 | Seq 104 | Seq 103 |
| -- | -- | -- | -- | -- | -- |
| data | data-107.parquet | data-106.parquet | data-105.parquet |
data-104.parquet | data-103.parquet |
| eq-deletes | eq-107.parquet | eq-106.parquet | eq-105.parquet |
eq-104.parquet | eq-103.parquet |
| pos-deletes | | pos-106.parquet | | | pos-103.parquet |
I think that we actually can do some compaction in this situation. Say we
want to compact `data-103.parquet` through `data-107.parquet`. Then we need to
apply all newer equality deletes to each data file:
| File | Delete files to apply |
| -- | -- |
| data-103.parquet | eq-{104,105,106,107}.parquet, pos-103.parquet |
| data-104.parquet | eq-{105,106,107}.parquet |
| data-105.parquet | eq-{106,107}.parquet |
| data-106.parquet | eq-107.parquet, pos-106.parquet |
| data-107.parquet | |
The position delete files can be removed because they only reference
data-103.parquet and data-106.parquet. The equality deletes must remain in the
table in case they deleted data in other files. The new compacted data file
should be data-107-compacted.parquet and should be committed at sequence number
107 so that future equality deletes are still applied correctly.
Another thing to keep in mind is that we may have equality and position
deletes coming from concurrent writers. But in this case all we would need to
do is update the task emitting committed data files to emit all delete files,
even those from concurrent writers. For example, maybe sequence number 5 was
written by a concurrent commit. Then the delete files for that commit,
eq-105.parquet, should be used. But the data file should not be part of the
compaction (it would be compacted by a parallel Flink job).
I think that this compaction strategy would address Jack's concern and is a
safe way to compact across sequence numbers with equality deletes. The key idea
is that we leave the equality delete files in the table so we don't have to
worry about existing data files that aren't part of the stream we are
compacting.
I think that what I'm proposing is similar to @Reo-LEI's comment, except
that it limits the scope of the compaction differently. Using time-based
partitions would make us reasonably confident that the compaction is targeted,
but the drawback is that we would still need to plan a scan, and it may still
be impacted by concurrent deletes. But my strategy of keeping the compaction
scoped to just files written by one Flink job and incrementally including
delete files is a clean way to handle the issues and ensure that compaction
won't conflict with one another.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]