GitHub user suryaprasanna created a discussion: Support Concurrent Clustering of Small MOR File Groups with Upserts Using pendingReplacedFileIdMap
I would like to discuss a design for supporting concurrent clustering and upserts for MOR tables using a new structure called pendingReplacedFileIdMap in the FileSystemView. ## Problem: Assume there are file groups F1 and F2 with the following file structure. F1: 1 base file + 2 log files F2: 1 base file + 3 log files Assume clustering selects file groups F1 and F2 and plans to replace them and finally create F3. - The issue is that once clustering has decided to replace F1 and F2 with F3, new writes should ideally stop going to F1/F2 and start going to F3. Another motivation is write amplification. - Even though Spark MOR upsert can handle small base files by routing inserts into an existing small file group and rewriting a new base file, doing this from the ingestion writer side can still cause significant write amplification. The ingestion path ends up repeatedly rewriting small file groups while regular writes are flowing, instead of letting clustering handle file-group consolidation in a more controlled way. - The same concern applies to Flink as well. Even if small-file handling is available on the writer side, relying on ingestion-time correction can still introduce unnecessary write amplification, especially under continuous writes. - Because of this, it is desirable for clustering to publish its replacement target early and let future writes move to the pending replacement file group, instead of continuing to correct small files through the ingestion writer path. ## Proposal Store a pendingReplacedFileIdMap in the File system view interface. It is something like, F1 -> F3 & F2 -> F3. So, the requested clustering instant would contain following information. This requires a change in the clustering plan version. - input file groups being replaced - replacement file group ids - pendingReplacedFileIdMap ## Expected behavior ### Writer If clustering plans to replace F1 and F2 with F3, then new writes should be redirected to F3 instead of continuing to write to F1 and F2. This we may not be able to achieve in a straight forward manner, so let us consider all the scenarios and what happens in each of these scenarios. #### Spark (Batch flow) - **Scenario 1:** If the clustering plan is created before the write job is routed, the job sees pendingReplacedFileIdMap and writes directly to F3. - **Scenario 2:** If the clustering plan is created after the write job has already been routed, that job still writes to F1 and F2. In that case clustering should fail conflict resolution and retry. The next write job should see pendingReplacedFileIdMap and route to F3. - **Scenario 3:** A partially routed case is not really a Spark batch scenario because one Spark batch job uses one routing snapshot. #### Flink (Streaming flow) I do not have complete knowledge about Flink internals on which operator is best suited for the change. That I will leave it to Flink experts to decide, but overall following scenarios can happen, - **Scenario 1:** If pendingReplacedFileIdMap is visible before routing, writes that would have gone to F1/F2 can be redirected to F3. - **Scenario 2:** If some records were already routed before the alias map became visible, those records may still go to F1/F2, and clustering may need conflict handling for that case. - **Scenario 3:** I think partial routed case maybe possible for Flink, even then one file group may still go to existing filegroup let us say F1 but the the other can go to new file group F3. Even then ingestion does not fail, clustering fails on the first attempt and the logic is similar to scenario 2. Not sure, if we want to persist pendingReplacedFileIdMap to operator for seamless translation of F1 -> F3 and F2 -> F3. ### Reader #### Read Optimized Read optimized is easy to handle as it returns the base files directly. #### Snapshot reads: Snapshot reads might need some changes, since when F1 and F2 are getting replaced by F3 and the F3 base files is not created but updates on F1 and F2 might still be present on F3 log file. GitHub link: https://github.com/apache/hudi/discussions/18433 ---- This is an automatically sent email for [email protected]. To unsubscribe, please send an email to: [email protected]
