nsivabalan opened a new pull request, #12236:
URL: https://github.com/apache/hudi/pull/12236

   ### Change Logs
   
   Re-writing writes DAG to write to both DT and MDT using same stage 
boundaries. This will avoid any out of sync issues that might crop up which are 
needing special handling as of now. The intention behind this dag rewrite is to 
ensure we write to both DT and MDT table using single dag w/o any breaks 
inbetween. 
   
   This is a WIP patch which might get split into multiple patches depending on 
feedback. 
   
   Before we go into new DAG, lets revisit how the current dag looks like. 
   Here is a pictorial representation of how the write dag looks like as of 
today. 
   
   ![Screenshot 2024-11-11 at 6 14 29 
PM](https://github.com/user-attachments/assets/098f5cf7-1c9b-4738-b3ea-646acb91add3)
   
   ![Screenshot 2024-11-11 at 6 14 38 
PM](https://github.com/user-attachments/assets/1c9a12be-2232-4bc4-840a-0e95bcdff7bc)
   
   
   Given the current dag, we feel we could do a streaming write to MDT directly 
relying on writes to Datatable w/o triggering the actions. So, incase of task 
retries or stage retries, our marker reconciliation will automatically take 
care of reconciling any spurious data files. 
   
   With that in mind, here is what the proposal looks like for the new dag. 
   
   ![Screenshot 2024-11-11 at 6 19 49 
PM](https://github.com/user-attachments/assets/0326489c-89d4-467c-b411-b26850ccd272)
   ![Screenshot 2024-11-11 at 6 20 15 
PM](https://github.com/user-attachments/assets/ae897663-ed0f-4f12-b30f-810c76951dff)
   
   
   I am opening up this patch to get feedback on the design while we try to 
iterate and get to full blown implementation. 
   
   Lets go over one piece at a time. 
   
   1. We are enabling NBCC(Non Blocking Concurrency Control) and multi-writer 
to MDT to account for multiple writers to write concurrently to Metadata table. 
This is a pre-requisite since data table could have multiple writers and each 
of the dag could be running concurrently. Previous dag does not need this 
necessity, but the redesigned dag need to have NBCC with MDT. Draft patch: 
https://github.com/apache/hudi/pull/12209 
   2. In general we have two flows wrt write commits, namely auto commit 
enabled and auto commit disabled flow. We are unifying this and we plan to 
support only auto commit disabled flow. All user facing writers (batch writers 
and streaming writers) are using auto commit disabled flow and so this should 
not have any impact to end users. 
   
   Just that lot of tests are written using auto commit enabled flow and those 
need to be fixed to use auto commit disabled flow. draft patch: 
https://github.com/apache/hudi/pull/12204
   
   Auto commit enabled flow 
   ```
   writeClient.startcommit(instantTime)
   returnVal = writeClient.upsert(RDD<records>, instantTime)
   
   by this time, the dag is triggered and commit is expected to be complete if 
there are no errors. 
   ```
   
   Auto commit disabled flow
   ```
   writeClient.startcommit(instantTime)
   returnVal = writeClient.upsert(RDD<records>, instantTime)
   writeClient.commitStats(returnVal,....) 
   
   So, unless user calls `writeClient.commitStats`, the dag may not be 
triggered and write may not be completed. 
   ```
   
   3. MDT Writer instances: 
   - We need one instance of MDT writer and MDT write client per ingestion 
commit/table service commit in the data table w/ the new dag design. So, we are 
introducing a Map of instantTime -> HoodieMetadataTableWriter in the 
BaseHoodieWriteClient. Expectation is that, a given commit in the data table 
will instantiate a new HoodieTableMetadataWriter and use it throughout the 
lifecycle of the commit. In the end, the HTMW will be closed while wrapping up 
the commit in data table. 
   - Incase of non-ingestion commits like clean, rollback and restore, we use 
the old way. Where we instantiate a new HoodieTableMetadataWriter and apply the 
changes and close it right away. No changes for these actions. 
   
   4. Notes on ingestion writes dag. Lets go over upsert operation in data 
table. 
   - a. BaseHoodieWriteClient.startCommitWithTime() -> will start a new commit 
in data table. Also, instantiates a new HoodieTableMetadataWriter and starts a 
new commit. We are introducing new apis in HoodieTableMetadataWriter to support 
these directly from the Data table write client. 
   ```
   void startCommit(String instantTime);
   ```
   - b. User calls writeClient.upsert() 
   Lets unpack what happens w/n this method. 
   ```
   .
   .
   HoodieWriteMetadata writeMetadata = HoodieTable.upsert(records, commitTime, 
...)
   writeMetadata holds a reference to RDD<WriteStatus> 
   if (metadata table is enabled) {
    RDD<WriteStatus> mdtWriteStatuses =   
getMetadataWriter(commitTime).get().prepareAndWriteToMDT(writeMetadata.getDataTableWriteStatuses(),
 commitTime);
    
writeMetadata.setAllWriteStatus(writeMetadata.getDataTableWriteStatuses().union(mdtWriteStatuses))
   }
   return writeMetadata.clone(allWriteStats)
   ```
   
   After we write to data table, we have an RDD<WriteStatus> 
   We have introduced new apis in HoodieTableMetadataWriter 
(prepareAndWriteToMDT) to write to MDT directly based on RDD<WriteStatus> from 
data table. 
   
   ```
     HoodieData<WriteStatus> prepareAndWriteToMDT(HoodieData<WriteStatus> 
writeStatus, String instantTime);
   ```
   
   We wanted to keep the FILES partition out of this write so that we can write 
in finally in the end after reconciling the commit metadata for data table. So, 
every other partition or index in Metadata table gets written using this api. 
   
   This method(prepareAndWriteToMDT) will return metadataTable's 
RDD<WriteStatus>. 
   We stitch both writeStatus' and send it back. So, WriteClient.upsert() will 
return a RDD<WriteStatus> which has a mix of data table write status and 
metadata table write status. 
   
   btw, do remember that the dag is not yet triggered next api (c) is called. 
In other words, just by calling writeClient.upsert(), even data files to data 
table will not be written. 
   
   c. User calls writeClient.commit(commitTime, return value from (b) above) 
   Lets unpack, what happens within this call. 
   - i. We finally trigger the action from the RDD<WriteStatus> (i.e. return 
value from (b) above). This will result in all data files written to data table 
and all files written to Metadata table for all partitions except FILES 
partition. 
   - ii. Prepare HoodieCommit Metadata for data table. 
   - iii. Perform marker reconciliation for data table. 
   - iv. In step c.i above, we would have collected List<HoodieWriteStats> of 
metadata table as well. We re-use this and call into 
HoodieTableMetadataWriter.writeToFilesPartitionAndCommit(instantTime, 
mdtHoodieWriteStats, HoodieCommitMetadata of interest). 
   
   ```
     void writeToFilesPartitionAndCommit(String instantTime, 
HoodieEngineContext context, List<HoodieWriteStat> partialMdtWriteStats, 
HoodieCommitMetadata commitMetadata);
   ```
   
   What this api does is:
   - Using HoodieCommitMetadata from data table, we prepare and write to FILES 
partition in MDT. 
   - Stitch List<HoodieWriteStats> from c.i (i.e. partialMdtWriteStats) and 
List<HoodieWriteStats> from writes to FILES partition above and complete the 
commit to MetadataTable. This means, that we would have performed marker 
reconciliation for Metadata table as well as part of this step. i.e. delete any 
spurious files in MDT. 
   
   - v. Wrap up the commit in Data table. 
   
   Please checkout changes in SparkRDDWriteClient, HoodieTableMetadataWriter, 
HoodieBackedTableMetadataWriter and SparkHoodieBackedTableMetadataWriter. 
   
   In this patch, we have fixed upsert() operation to test this dag and it 
works as expected. i.e. writes to both data table and metadata table happens 
within a single dag w/o any breaks. writes to FILES partition in MDT happens in 
the end and finally we wrap up the commit in both metadata table and data 
table. 
   
   5. Metadata Partitioner: 
   One tricky part to achieve above is to design the metadata table 
partitioner. If we use the out of the box UpsertPartitioner, the workload 
profile building stage will trigger the action. So, here is what we have done 
to circumvent that dag trigger. 
   While initializing the HoodieTableMetadataWriter itself, we will know what 
partitions in MDT is enabled and file group count for the same. So, we use that 
info to build SparkMetadataUpsertPartitioner. All records are expected to be 
tagged w/ the fileID location by the time we reach the metadata table 
upsertPrepped call. So, we leverage that to decide the spark partitioner. 
   By this novel idea, we completely avoid triggering the dag and keep it 
streaming from data table writes all the way to metadata table writes. 
   
   6. UpsertPreppedPartial: 
   Based on the dag re-design, we are writing to Metadata table twice using the 
same delta commit time. So, none of our apis in writeClient are designed to 
work that way. So, we are introducing upsertPreppedPartial to assist in this 
case. We have validations in place to ensure this is used only for metadata 
table writes. So, its feasible to call writeClient.startCommit(t10.dc), 
writeClient.upsertPreppedPartial(batch1), 
writeClient.upsertPreppedPartial(batch2) 
   and finally writeClient.commit(t10.dc..) 
   
   7. We have introduced SparkMetadataTableUpsertCommitActionExecutor to assist 
w/ writing to Metadata table. This will receive RDD<records>, creates an empty 
inflight file (empty workload profile), use SparkMetadataUpsertPartitioner to 
repartition records, and write to them. 
   
   8. Zooming in on prepareAndWriteToMDT Impl:
   High level steps unpacked in this method impl is given below:
   - Input : RDD<WriteStatus>. 
   - We do flatMap over RDD<WriteStatus> and prepare Metadata table records 
(col stats, RLI, bloom etc). 
   - Some partitions in MDT needs one spark task per physical hudi partition. 
For eg, partition stats index. So, we also repartition based on hudi partition 
and process all writeStatuses to prepare records for partition stats index. 
   - Union above set of records. 
   - Tag location 
   - return RDD<HoodieRecord> 
   
   We have introduced MetadataIndexGenerator and SparkMetadataIndexGenerator to 
assist with preparing MDT records. 
   
   Note: We will be fixing all other write operations (bulk insert, insert, 
delete, insert overwrite, etc) in a similar fashion. In this patch, upsert() is 
implemented as a reference. 
   
   
   9. Metadata table rollbacks: 
   Prior to this design, clean failed rollback policy was eager in MDT. So, 
whenever we start a new commit in MDT, if there are any pending commits, we 
auto rollback it. But w/ NBCC, clean failed rollback policy is lazy. So, this 
means that heart beat will be emitted by the mdt writeClient when the delta 
commit starts. Lazily if the processed crashes, later when cleaner in MDT 
executes, it will check for failed writes (elapsed heart beats) and trigger 
rollback. With the dag re-design, we can't let this happen. So, we are 
disabling this rollback by the cleaner for Metadata table. Any failed write in 
Metadata table will have a failed write in DataTable as well. So, data table 
will have to trigger rollback at somepoint (based on whether its single writer 
or multi writer). So, the same will trigger a rollback in Metadata table if the 
commit of interest exists in Metadata table. So, its safe to disable the auto 
rollbacks in MDT. 
   
   10. WriteStatus changes: 
   Since we have a single RDD<WriteStatus> at the end of writeClient.upsert(), 
we have introduced a boolean flag in WriteStatus to denote whether is it for 
data table or Metadata table. So, we use that to bucketize and prepare 
HoodieCommitMetadata for the Data table. 
   
   11. If you have gotten a grasp of ingestion commits dag, lets go to 
compaction. I have not fixed data table clustering yet in this patch. But 
changes to compaction should pretty much give you an idea. 
   
   Lets take a peek at how compaction control flow looks like. 
   We are not going to touch the scheduling of compaction w/ this dag rewrite 
exercise. Only during compaction execution, we will touch MDT. 
   - a. WriteClient.compact(compactionTime)
   -      b. (a) will call into TableServiceClient.compact(compactionTime, 
shouldComplete). 
   -      All of our HoodieTableMetadataWriter instances and the map is 
maintained by the data table BaseHoodieTableWriteClient. So, while 
instantiating TableServiceClient, we pass in a function which can assist in 
fetching the metadata writer instance 
   ```
   Functions.Function2<String, HoodieTableMetaClient, 
Option<HoodieTableMetadataWriter>>
   ```
   -          c. lets expand on compact() impl in TableServiceClient. 
   ```
   .
   .
   Apply the metadataWriterFunc to get an instance of 
HoodieTableMetadataWriter. 
   HoodieTableMetadataWriter.startCommit(compactionCommit)
   HoodieWriteMetadata compactionWriteMetadata = HoodieTable.compact(...) 
   // this compactionWriteMetadata contains RDD<WriteStatus> which is not yet 
triggered. 
   Write to Metadata table (partial writes except FILES partition) and get hold 
of RDD<WriteStatus> for metadata table. 
   Stitch both writeStatuses and set it as part of compactionWriteMetadata. 
   return compactionWriteMetadata
   ```
   
   So, tableServiceWriteClient and writeClient.compact() will return 
`HoodieWriteMetadata compactionWriteMetadata` which will contain 
RDD<WriteStatus>. Dag is not yet triggered. 
   
   -  d. caller is expected to call 
writeClient.commitCompaction(compactionTime, compactionWriteMetadata (from 
compact() call), ..) 
   Lets zoom in a bit here. 
   - we call collect on the RDD<WriteStatus> in compactionWriteMetadata. This 
is when the dag will be triggered (i.e writes to both Data table and MDT 
partitions(except FILES) will happen. 
   - Bucketize data table HoodieWriteStat and MetadataTable HoodieWriteStats. 
   - Prepare HoodieCommitMetadata for data table. 
   - Perform marker reconciliation for data table. 
   - Write to MDT files partition using writeToFilesPartitionAndCommit. 
   - Complete the dc in MDT for `compactionTime`
   - Complete the compaction commit in Data table. 
   
   Changes for ingestion writes were mostly straightforward w/ the revamped 
dag. But for compaction, we had to make some changes. 
   - Before this patch, HoodieTable.compact() call itself triggers the 
compaction dag in data table. We prepare the HoodieCommitMetadata and return it 
as part of HoodieTable.compact(). So, HoodieWriteMetadata that was returned as 
part of HoodieTable.compact() call, contains a List<HoodieWriteStat> and 
RDD<WriteStatus> too. 
   But this is against our goal of making the dag fully streaming from data 
table all the way to metadata table. So, we had to make changes to 
HoodieTable.compact() to trigger the dag. Hence HoodieCommitMetadata cannot be 
prepared until writeClient.commitCompaction() is invoked which is when the dag 
will be triggered.  
   
   Note: As you could see, we are changing the compaction dag here. For eg, 
even if someone disabled Metadata table completely, the compaction execution 
uses a diff flow w/ this revamp. So, if you are reviewing, do pay close 
attention to these code blocks. If you can poke around and let me know of any 
gaps or anything to what out for, will be very helpful. 
   
   12. MDT stats generation:
   As per the design, our intention is to generate all required stats for MDT 
record generation in the data table write handle itself and pass it along w/ 
WriteStatus. FILES and RLI is already taken care. For col stats, only append 
handles send the stats. We are yet to fix it for create and merge handles. But 
the idea here is to embed all required stats (col stats, bloom filter, 
functional index, secondary index) in the WriteStatus returned from 
writehandles in data table. 
   
   Things to flush/Pending items:
   - Non core write operations might need some special handling for some MDT 
partition. In general, our design relies on the fact that everything will be 
embedded in write status by the write handles. Incase of insert_overwrite for 
RLI partition, we might have to poll FSV to fetch latest base files and then 
record keys in them. So, polling FSV is not a generally recommended from 
executor. and here we are in need of doing it for lot of files and so we can't 
just delegate the work to one write handle. So, this needs to be figured out. 
   - One more pending piece in the overall picture is the row writer flow for 
bulk insert. We also need to align the row writer code path w/ the new dag. We 
do not want to have any divergence w/ the dag re-write. 
   
   
   Tests:
   - None of the tests are fixed yet. We picked couple of sanity tests for COW 
and MOR table (compaction) and ensure end to end design is viable and works as 
expected. While we work on pending items and as we get feedback on the design, 
we will work towards fixing the tests. 
   - Since we are deprecating auto commit enabled flow, lot of tests are 
failing and we need to fix all of them. 
https://github.com/apache/hudi/pull/12204 
   
   Note: We will introduce a flag to enable the new optimized dag and will be 
turned off by default. Optimized dag will be fully implemented for spark. Flink 
and java will be taken up later. 
   
   Feel feel to comment on the overall design or any code blocks in particular. 
Since this has a very large blast radius, we wanted to ensure we don't have any 
gaps in the design or impl.
   
   In the next one day, I will try to add `Notes to reviewer" as well so as to 
help w/ reviewing. 
   
   ### Impact
   
   _Describe any public API or user-facing feature change or any performance 
impact._
   
   ### Risk level (write none, low medium or high below)
   
   _If medium or high, explain what verification was done to mitigate the 
risks._
   
   ### Documentation Update
   
   _Describe any necessary documentation update if there is any new feature, 
config, or user-facing change. If not, put "none"._
   
   - _The config description must be updated if new configs are added or the 
default value of the configs are changed_
   - _Any new feature or user-facing change requires updating the Hudi website. 
Please create a Jira ticket, attach the
     ticket number here and follow the 
[instruction](https://hudi.apache.org/contribute/developer-setup#website) to 
make
     changes to the website._
   
   ### Contributor's checklist
   
   - [ ] Read through [contributor's 
guide](https://hudi.apache.org/contribute/how-to-contribute)
   - [ ] Change Logs and Impact were stated clearly
   - [ ] Adequate tests were added if applicable
   - [ ] CI passed
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to