nsivabalan opened a new pull request, #12236: URL: https://github.com/apache/hudi/pull/12236
### Change Logs Re-writing writes DAG to write to both DT and MDT using same stage boundaries. This will avoid any out of sync issues that might crop up which are needing special handling as of now. The intention behind this dag rewrite is to ensure we write to both DT and MDT table using single dag w/o any breaks inbetween. This is a WIP patch which might get split into multiple patches depending on feedback. Before we go into new DAG, lets revisit how the current dag looks like. Here is a pictorial representation of how the write dag looks like as of today.   Given the current dag, we feel we could do a streaming write to MDT directly relying on writes to Datatable w/o triggering the actions. So, incase of task retries or stage retries, our marker reconciliation will automatically take care of reconciling any spurious data files. With that in mind, here is what the proposal looks like for the new dag.   I am opening up this patch to get feedback on the design while we try to iterate and get to full blown implementation. Lets go over one piece at a time. 1. We are enabling NBCC(Non Blocking Concurrency Control) and multi-writer to MDT to account for multiple writers to write concurrently to Metadata table. This is a pre-requisite since data table could have multiple writers and each of the dag could be running concurrently. Previous dag does not need this necessity, but the redesigned dag need to have NBCC with MDT. Draft patch: https://github.com/apache/hudi/pull/12209 2. In general we have two flows wrt write commits, namely auto commit enabled and auto commit disabled flow. We are unifying this and we plan to support only auto commit disabled flow. All user facing writers (batch writers and streaming writers) are using auto commit disabled flow and so this should not have any impact to end users. Just that lot of tests are written using auto commit enabled flow and those need to be fixed to use auto commit disabled flow. draft patch: https://github.com/apache/hudi/pull/12204 Auto commit enabled flow ``` writeClient.startcommit(instantTime) returnVal = writeClient.upsert(RDD<records>, instantTime) by this time, the dag is triggered and commit is expected to be complete if there are no errors. ``` Auto commit disabled flow ``` writeClient.startcommit(instantTime) returnVal = writeClient.upsert(RDD<records>, instantTime) writeClient.commitStats(returnVal,....) So, unless user calls `writeClient.commitStats`, the dag may not be triggered and write may not be completed. ``` 3. MDT Writer instances: - We need one instance of MDT writer and MDT write client per ingestion commit/table service commit in the data table w/ the new dag design. So, we are introducing a Map of instantTime -> HoodieMetadataTableWriter in the BaseHoodieWriteClient. Expectation is that, a given commit in the data table will instantiate a new HoodieTableMetadataWriter and use it throughout the lifecycle of the commit. In the end, the HTMW will be closed while wrapping up the commit in data table. - Incase of non-ingestion commits like clean, rollback and restore, we use the old way. Where we instantiate a new HoodieTableMetadataWriter and apply the changes and close it right away. No changes for these actions. 4. Notes on ingestion writes dag. Lets go over upsert operation in data table. - a. BaseHoodieWriteClient.startCommitWithTime() -> will start a new commit in data table. Also, instantiates a new HoodieTableMetadataWriter and starts a new commit. We are introducing new apis in HoodieTableMetadataWriter to support these directly from the Data table write client. ``` void startCommit(String instantTime); ``` - b. User calls writeClient.upsert() Lets unpack what happens w/n this method. ``` . . HoodieWriteMetadata writeMetadata = HoodieTable.upsert(records, commitTime, ...) writeMetadata holds a reference to RDD<WriteStatus> if (metadata table is enabled) { RDD<WriteStatus> mdtWriteStatuses = getMetadataWriter(commitTime).get().prepareAndWriteToMDT(writeMetadata.getDataTableWriteStatuses(), commitTime); writeMetadata.setAllWriteStatus(writeMetadata.getDataTableWriteStatuses().union(mdtWriteStatuses)) } return writeMetadata.clone(allWriteStats) ``` After we write to data table, we have an RDD<WriteStatus> We have introduced new apis in HoodieTableMetadataWriter (prepareAndWriteToMDT) to write to MDT directly based on RDD<WriteStatus> from data table. ``` HoodieData<WriteStatus> prepareAndWriteToMDT(HoodieData<WriteStatus> writeStatus, String instantTime); ``` We wanted to keep the FILES partition out of this write so that we can write in finally in the end after reconciling the commit metadata for data table. So, every other partition or index in Metadata table gets written using this api. This method(prepareAndWriteToMDT) will return metadataTable's RDD<WriteStatus>. We stitch both writeStatus' and send it back. So, WriteClient.upsert() will return a RDD<WriteStatus> which has a mix of data table write status and metadata table write status. btw, do remember that the dag is not yet triggered next api (c) is called. In other words, just by calling writeClient.upsert(), even data files to data table will not be written. c. User calls writeClient.commit(commitTime, return value from (b) above) Lets unpack, what happens within this call. - i. We finally trigger the action from the RDD<WriteStatus> (i.e. return value from (b) above). This will result in all data files written to data table and all files written to Metadata table for all partitions except FILES partition. - ii. Prepare HoodieCommit Metadata for data table. - iii. Perform marker reconciliation for data table. - iv. In step c.i above, we would have collected List<HoodieWriteStats> of metadata table as well. We re-use this and call into HoodieTableMetadataWriter.writeToFilesPartitionAndCommit(instantTime, mdtHoodieWriteStats, HoodieCommitMetadata of interest). ``` void writeToFilesPartitionAndCommit(String instantTime, HoodieEngineContext context, List<HoodieWriteStat> partialMdtWriteStats, HoodieCommitMetadata commitMetadata); ``` What this api does is: - Using HoodieCommitMetadata from data table, we prepare and write to FILES partition in MDT. - Stitch List<HoodieWriteStats> from c.i (i.e. partialMdtWriteStats) and List<HoodieWriteStats> from writes to FILES partition above and complete the commit to MetadataTable. This means, that we would have performed marker reconciliation for Metadata table as well as part of this step. i.e. delete any spurious files in MDT. - v. Wrap up the commit in Data table. Please checkout changes in SparkRDDWriteClient, HoodieTableMetadataWriter, HoodieBackedTableMetadataWriter and SparkHoodieBackedTableMetadataWriter. In this patch, we have fixed upsert() operation to test this dag and it works as expected. i.e. writes to both data table and metadata table happens within a single dag w/o any breaks. writes to FILES partition in MDT happens in the end and finally we wrap up the commit in both metadata table and data table. 5. Metadata Partitioner: One tricky part to achieve above is to design the metadata table partitioner. If we use the out of the box UpsertPartitioner, the workload profile building stage will trigger the action. So, here is what we have done to circumvent that dag trigger. While initializing the HoodieTableMetadataWriter itself, we will know what partitions in MDT is enabled and file group count for the same. So, we use that info to build SparkMetadataUpsertPartitioner. All records are expected to be tagged w/ the fileID location by the time we reach the metadata table upsertPrepped call. So, we leverage that to decide the spark partitioner. By this novel idea, we completely avoid triggering the dag and keep it streaming from data table writes all the way to metadata table writes. 6. UpsertPreppedPartial: Based on the dag re-design, we are writing to Metadata table twice using the same delta commit time. So, none of our apis in writeClient are designed to work that way. So, we are introducing upsertPreppedPartial to assist in this case. We have validations in place to ensure this is used only for metadata table writes. So, its feasible to call writeClient.startCommit(t10.dc), writeClient.upsertPreppedPartial(batch1), writeClient.upsertPreppedPartial(batch2) and finally writeClient.commit(t10.dc..) 7. We have introduced SparkMetadataTableUpsertCommitActionExecutor to assist w/ writing to Metadata table. This will receive RDD<records>, creates an empty inflight file (empty workload profile), use SparkMetadataUpsertPartitioner to repartition records, and write to them. 8. Zooming in on prepareAndWriteToMDT Impl: High level steps unpacked in this method impl is given below: - Input : RDD<WriteStatus>. - We do flatMap over RDD<WriteStatus> and prepare Metadata table records (col stats, RLI, bloom etc). - Some partitions in MDT needs one spark task per physical hudi partition. For eg, partition stats index. So, we also repartition based on hudi partition and process all writeStatuses to prepare records for partition stats index. - Union above set of records. - Tag location - return RDD<HoodieRecord> We have introduced MetadataIndexGenerator and SparkMetadataIndexGenerator to assist with preparing MDT records. Note: We will be fixing all other write operations (bulk insert, insert, delete, insert overwrite, etc) in a similar fashion. In this patch, upsert() is implemented as a reference. 9. Metadata table rollbacks: Prior to this design, clean failed rollback policy was eager in MDT. So, whenever we start a new commit in MDT, if there are any pending commits, we auto rollback it. But w/ NBCC, clean failed rollback policy is lazy. So, this means that heart beat will be emitted by the mdt writeClient when the delta commit starts. Lazily if the processed crashes, later when cleaner in MDT executes, it will check for failed writes (elapsed heart beats) and trigger rollback. With the dag re-design, we can't let this happen. So, we are disabling this rollback by the cleaner for Metadata table. Any failed write in Metadata table will have a failed write in DataTable as well. So, data table will have to trigger rollback at somepoint (based on whether its single writer or multi writer). So, the same will trigger a rollback in Metadata table if the commit of interest exists in Metadata table. So, its safe to disable the auto rollbacks in MDT. 10. WriteStatus changes: Since we have a single RDD<WriteStatus> at the end of writeClient.upsert(), we have introduced a boolean flag in WriteStatus to denote whether is it for data table or Metadata table. So, we use that to bucketize and prepare HoodieCommitMetadata for the Data table. 11. If you have gotten a grasp of ingestion commits dag, lets go to compaction. I have not fixed data table clustering yet in this patch. But changes to compaction should pretty much give you an idea. Lets take a peek at how compaction control flow looks like. We are not going to touch the scheduling of compaction w/ this dag rewrite exercise. Only during compaction execution, we will touch MDT. - a. WriteClient.compact(compactionTime) - b. (a) will call into TableServiceClient.compact(compactionTime, shouldComplete). - All of our HoodieTableMetadataWriter instances and the map is maintained by the data table BaseHoodieTableWriteClient. So, while instantiating TableServiceClient, we pass in a function which can assist in fetching the metadata writer instance ``` Functions.Function2<String, HoodieTableMetaClient, Option<HoodieTableMetadataWriter>> ``` - c. lets expand on compact() impl in TableServiceClient. ``` . . Apply the metadataWriterFunc to get an instance of HoodieTableMetadataWriter. HoodieTableMetadataWriter.startCommit(compactionCommit) HoodieWriteMetadata compactionWriteMetadata = HoodieTable.compact(...) // this compactionWriteMetadata contains RDD<WriteStatus> which is not yet triggered. Write to Metadata table (partial writes except FILES partition) and get hold of RDD<WriteStatus> for metadata table. Stitch both writeStatuses and set it as part of compactionWriteMetadata. return compactionWriteMetadata ``` So, tableServiceWriteClient and writeClient.compact() will return `HoodieWriteMetadata compactionWriteMetadata` which will contain RDD<WriteStatus>. Dag is not yet triggered. - d. caller is expected to call writeClient.commitCompaction(compactionTime, compactionWriteMetadata (from compact() call), ..) Lets zoom in a bit here. - we call collect on the RDD<WriteStatus> in compactionWriteMetadata. This is when the dag will be triggered (i.e writes to both Data table and MDT partitions(except FILES) will happen. - Bucketize data table HoodieWriteStat and MetadataTable HoodieWriteStats. - Prepare HoodieCommitMetadata for data table. - Perform marker reconciliation for data table. - Write to MDT files partition using writeToFilesPartitionAndCommit. - Complete the dc in MDT for `compactionTime` - Complete the compaction commit in Data table. Changes for ingestion writes were mostly straightforward w/ the revamped dag. But for compaction, we had to make some changes. - Before this patch, HoodieTable.compact() call itself triggers the compaction dag in data table. We prepare the HoodieCommitMetadata and return it as part of HoodieTable.compact(). So, HoodieWriteMetadata that was returned as part of HoodieTable.compact() call, contains a List<HoodieWriteStat> and RDD<WriteStatus> too. But this is against our goal of making the dag fully streaming from data table all the way to metadata table. So, we had to make changes to HoodieTable.compact() to trigger the dag. Hence HoodieCommitMetadata cannot be prepared until writeClient.commitCompaction() is invoked which is when the dag will be triggered. Note: As you could see, we are changing the compaction dag here. For eg, even if someone disabled Metadata table completely, the compaction execution uses a diff flow w/ this revamp. So, if you are reviewing, do pay close attention to these code blocks. If you can poke around and let me know of any gaps or anything to what out for, will be very helpful. 12. MDT stats generation: As per the design, our intention is to generate all required stats for MDT record generation in the data table write handle itself and pass it along w/ WriteStatus. FILES and RLI is already taken care. For col stats, only append handles send the stats. We are yet to fix it for create and merge handles. But the idea here is to embed all required stats (col stats, bloom filter, functional index, secondary index) in the WriteStatus returned from writehandles in data table. Things to flush/Pending items: - Non core write operations might need some special handling for some MDT partition. In general, our design relies on the fact that everything will be embedded in write status by the write handles. Incase of insert_overwrite for RLI partition, we might have to poll FSV to fetch latest base files and then record keys in them. So, polling FSV is not a generally recommended from executor. and here we are in need of doing it for lot of files and so we can't just delegate the work to one write handle. So, this needs to be figured out. - One more pending piece in the overall picture is the row writer flow for bulk insert. We also need to align the row writer code path w/ the new dag. We do not want to have any divergence w/ the dag re-write. Tests: - None of the tests are fixed yet. We picked couple of sanity tests for COW and MOR table (compaction) and ensure end to end design is viable and works as expected. While we work on pending items and as we get feedback on the design, we will work towards fixing the tests. - Since we are deprecating auto commit enabled flow, lot of tests are failing and we need to fix all of them. https://github.com/apache/hudi/pull/12204 Note: We will introduce a flag to enable the new optimized dag and will be turned off by default. Optimized dag will be fully implemented for spark. Flink and java will be taken up later. Feel feel to comment on the overall design or any code blocks in particular. Since this has a very large blast radius, we wanted to ensure we don't have any gaps in the design or impl. In the next one day, I will try to add `Notes to reviewer" as well so as to help w/ reviewing. ### Impact _Describe any public API or user-facing feature change or any performance impact._ ### Risk level (write none, low medium or high below) _If medium or high, explain what verification was done to mitigate the risks._ ### Documentation Update _Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none"._ - _The config description must be updated if new configs are added or the default value of the configs are changed_ - _Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the ticket number here and follow the [instruction](https://hudi.apache.org/contribute/developer-setup#website) to make changes to the website._ ### Contributor's checklist - [ ] Read through [contributor's guide](https://hudi.apache.org/contribute/how-to-contribute) - [ ] Change Logs and Impact were stated clearly - [ ] Adequate tests were added if applicable - [ ] CI passed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
