[
https://issues.apache.org/jira/browse/HUDI-8471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
sivabalan narayanan updated HUDI-8471:
--------------------------------------
Fix Version/s: 1.0.2
> Unify row writer and non-row writer code paths
> ----------------------------------------------
>
> Key: HUDI-8471
> URL: https://issues.apache.org/jira/browse/HUDI-8471
> Project: Apache Hudi
> Issue Type: Sub-task
> Components: writer-core
> Reporter: sivabalan narayanan
> Assignee: sivabalan narayanan
> Priority: Critical
> Fix For: 1.0.1, 1.0.2
>
>
> Row writer uses writeClient in an unconventional ways compared to other
> operations.
> Typical write operation takes the following flow:
> ```
> 1.
> WriteClient.upsert
> { Instantiate HoodieTable result = table.upsert() postWrite() return
> HoodieData<WriteStatus> }
> 2. writeClient.commitStats(return value from (1) i.e
> HoodieData<WriteStatus>,.... ) which internally will commit the write and
> then call clean, archive, compaction, clustering etc.
> 1.a
> HoodieTable.upsert()
> { calls into SparkUpsertCommitActionExecutor.execute() }
> 1.a.i
> SparkUpsertCommitActionExecutor.execute()
> { return HoodieWriteHelper.newInstance().write(...) }
> 1.a.i.1
> HoodieWriteHelper.newInstance().write()
> { dedup records tagRecords or index lookup return
> BaseCommitActionExecutor.execute() }
> 1.a.i.1.a
> BaseSparkCommitActionExecutor.execute()
> { build workload profile getPartitioner HoodieData<WriteStatus> writeStatuses
> = mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner); // this
> is where the writes happen update index and return HoodieData<WriteStatus> }
> ```
> While rowWriter looks like below
> ```
> 1. HoodieSparkSqlWriter.write
> bulkInsertAsRow
> { writeclient.startCommit WriteResult =
> BaseDatasetBulkInsertCommitActionExecutor.execute() // by the time we return
> from here, data is committed fully along w/ any inline table services. }
> 1.a BaseDatasetBulkInsertCommitActionExecutor.execute
> { write to custom spark ds }
> 2. Custom Spark DS:
> We have implemented a series of interfaces which goes as follows
> DefaultSource -> HoodieDataSourceInternalTable
> HoodieDataSourceInternalTable.newWriteBuilder will return
> HoodieDataSourceInternalBatchWriteBuilder
> this builder has buildForBatch() which will return BatchWrite.
> BatchWrite is core to our writes.
> {
> constructor: {
> instantiate dataSourceInternalWriterHelper (DataSourceInternalWriterHelper)
> }
> DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
> return new
> HoodieBulkInsertDataInternalWriterFactory(dataSourceInternalWriterHelper.getHoodieTable(),
> writeConfig, instantTime, structType, populateMetaFields,
> arePartitionRecordsSorted);
> }
>
> public void onDataWriterCommit(WriterCommitMessage message) {
> dataSourceInternalWriterHelper.onDataWriterCommit(message.toString());
> }
>
> @Override
> public void commit(WriterCommitMessage[] messages) {
> List<WriteStatus> writeStatuses = Arrays.stream(messages).map(m ->
> (HoodieWriterCommitMessage) m)
> .flatMap(m -> m.getWriteStatuses().stream()).collect(Collectors.toList());
> dataSourceInternalWriterHelper.commit(writeStatuses);
> }
>
> public void abort(WriterCommitMessage[] messages) {
> dataSourceInternalWriterHelper.abort();
> }
>
> }
>
> DataSourceInternalWriterHelper {
> constructor {
> instantiates WriteClient
> writeClient.initTable
> writeClient.preWrite
> }
>
> public void commit(List<WriteStatus> writeStatuses) {
> writeClient.commitStats(...)
> }
>
> }
>
> So, lets work towards unifying the row writer path w/ non-row writer path.
> This might be required for us to get the re-designed DAG ready for row-writer
> as well.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)