[ 
https://issues.apache.org/jira/browse/HUDI-8471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-8471:
--------------------------------------
    Description: 
Row writer uses writeClient in an unconventional ways compared to other 
operations.

Typical write operation takes the following flow:

```
1. 
WriteClient.upsert

{ Instantiate HoodieTable result = table.upsert() postWrite() return 
HoodieData<WriteStatus> }

2. writeClient.commitStats(return value from (1) i.e 
HoodieData<WriteStatus>,.... ) which internally will commit the write and then 
call clean, archive, compaction, clustering etc.

1.a 
HoodieTable.upsert()

{ calls into SparkUpsertCommitActionExecutor.execute() }

1.a.i 
SparkUpsertCommitActionExecutor.execute()

{ return HoodieWriteHelper.newInstance().write(...) }

1.a.i.1
HoodieWriteHelper.newInstance().write()

{ dedup records tagRecords or index lookup return 
BaseCommitActionExecutor.execute() }

1.a.i.1.a
BaseSparkCommitActionExecutor.execute()

{ build workload profile getPartitioner HoodieData<WriteStatus> writeStatuses = 
mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner); // this is 
where the writes happen update index and return HoodieData<WriteStatus> }

```

While rowWriter looks like below

```
1. HoodieSparkSqlWriter.write
bulkInsertAsRow

{ writeclient.startCommit WriteResult = 
BaseDatasetBulkInsertCommitActionExecutor.execute() // by the time we return 
from here, data is committed fully along w/ any inline table services. }

1.a BaseDatasetBulkInsertCommitActionExecutor.execute

{ write to custom spark ds }

2. Custom Spark DS: 
We have implemented a series of interfaces which goes as follows 
DefaultSource -> HoodieDataSourceInternalTable

HoodieDataSourceInternalTable.newWriteBuilder will return 
HoodieDataSourceInternalBatchWriteBuilder

this builder has buildForBatch() which will return BatchWrite.

BatchWrite is core to our writes.

{

constructor: {

 instantiate dataSourceInternalWriterHelper (DataSourceInternalWriterHelper)

}   

DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {

   return new 
HoodieBulkInsertDataInternalWriterFactory(dataSourceInternalWriterHelper.getHoodieTable(),
writeConfig, instantTime, structType, populateMetaFields, 
arePartitionRecordsSorted);

}

 

public void onDataWriterCommit(WriterCommitMessage message) {
dataSourceInternalWriterHelper.onDataWriterCommit(message.toString());
}

 

@Override
public void commit(WriterCommitMessage[] messages) {
List<WriteStatus> writeStatuses = Arrays.stream(messages).map(m -> 
(HoodieWriterCommitMessage) m)
.flatMap(m -> m.getWriteStatuses().stream()).collect(Collectors.toList());
dataSourceInternalWriterHelper.commit(writeStatuses);
}

 

public void abort(WriterCommitMessage[] messages) {
dataSourceInternalWriterHelper.abort();
}

 

}

 

DataSourceInternalWriterHelper {

constructor {

 instantiates WriteClient

writeClient.initTable

writeClient.preWrite

} 

 

public void commit(List<WriteStatus> writeStatuses) {

     writeClient.commitStats(...)

}

 

}

 

So, lets work towards unifying the row writer path w/ non-row writer path. 

This might be required for us to get the re-designed DAG ready for row-writer 
as well. 

 

  was:
Row writer uses writeClient in an unconventional ways compared to other 
operations. 

Typical write operation takes the following flow:

```
1. 
WriteClient.upsert  { 
   Instantiate HoodieTable
   result = table.upsert() 
   postWrite()
   return HoodieData<WriteStatus>
 }
2. writeClient.commitStats(return value from (1) i.e 
HoodieData<WriteStatus>,.... ) which internally will commit the write and then 
call clean, archive, compaction, clustering etc. 

1.a 
HoodieTable.upsert() { 
    calls into SparkUpsertCommitActionExecutor.execute() 
}

1.a.i 
SparkUpsertCommitActionExecutor.execute() {

     return HoodieWriteHelper.newInstance().write(...)
}

1.a.i.1
HoodieWriteHelper.newInstance().write() {
   dedup records
   tagRecords or index lookup 
   return BaseCommitActionExecutor.execute() 
}

1.a.i.1.a
BaseSparkCommitActionExecutor.execute() {
   build workload profile
   getPartitioner
   HoodieData<WriteStatus> writeStatuses = 
mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner); // this is 
where the writes happen
    update index and return HoodieData<WriteStatus>
}

```

While rowWriter looks like below 

```
1. HoodieSparkSqlWriter.write
    bulkInsertAsRow {
      writeclient.startCommit 
      WriteResult = BaseDatasetBulkInsertCommitActionExecutor.execute() // by 
the time we return from here, data is committed fully along w/ any inline table 
services. 
   }

1.a BaseDatasetBulkInsertCommitActionExecutor.execute {
   write to custom spark ds
}

2. Custom Spark DS: 
We have implemented a series of interfaces which goes as follows 
DefaultSource -> HoodieDataSourceInternalTable

HoodieDataSourceInternalTable.newWriteBuilder will return 
HoodieDataSourceInternalBatchWriteBuilder 

this builder has buildForBatch() which will return BatchWrite. 

BatchWrite is core to our writes. 
 











> Unify row writer and non-row writer code paths
> ----------------------------------------------
>
>                 Key: HUDI-8471
>                 URL: https://issues.apache.org/jira/browse/HUDI-8471
>             Project: Apache Hudi
>          Issue Type: Sub-task
>          Components: writer-core
>            Reporter: sivabalan narayanan
>            Assignee: sivabalan narayanan
>            Priority: Major
>
> Row writer uses writeClient in an unconventional ways compared to other 
> operations.
> Typical write operation takes the following flow:
> ```
> 1. 
> WriteClient.upsert
> { Instantiate HoodieTable result = table.upsert() postWrite() return 
> HoodieData<WriteStatus> }
> 2. writeClient.commitStats(return value from (1) i.e 
> HoodieData<WriteStatus>,.... ) which internally will commit the write and 
> then call clean, archive, compaction, clustering etc.
> 1.a 
> HoodieTable.upsert()
> { calls into SparkUpsertCommitActionExecutor.execute() }
> 1.a.i 
> SparkUpsertCommitActionExecutor.execute()
> { return HoodieWriteHelper.newInstance().write(...) }
> 1.a.i.1
> HoodieWriteHelper.newInstance().write()
> { dedup records tagRecords or index lookup return 
> BaseCommitActionExecutor.execute() }
> 1.a.i.1.a
> BaseSparkCommitActionExecutor.execute()
> { build workload profile getPartitioner HoodieData<WriteStatus> writeStatuses 
> = mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner); // this 
> is where the writes happen update index and return HoodieData<WriteStatus> }
> ```
> While rowWriter looks like below
> ```
> 1. HoodieSparkSqlWriter.write
> bulkInsertAsRow
> { writeclient.startCommit WriteResult = 
> BaseDatasetBulkInsertCommitActionExecutor.execute() // by the time we return 
> from here, data is committed fully along w/ any inline table services. }
> 1.a BaseDatasetBulkInsertCommitActionExecutor.execute
> { write to custom spark ds }
> 2. Custom Spark DS: 
> We have implemented a series of interfaces which goes as follows 
> DefaultSource -> HoodieDataSourceInternalTable
> HoodieDataSourceInternalTable.newWriteBuilder will return 
> HoodieDataSourceInternalBatchWriteBuilder
> this builder has buildForBatch() which will return BatchWrite.
> BatchWrite is core to our writes.
> {
> constructor: {
>  instantiate dataSourceInternalWriterHelper (DataSourceInternalWriterHelper)
> }   
> DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
>    return new 
> HoodieBulkInsertDataInternalWriterFactory(dataSourceInternalWriterHelper.getHoodieTable(),
> writeConfig, instantTime, structType, populateMetaFields, 
> arePartitionRecordsSorted);
> }
>  
> public void onDataWriterCommit(WriterCommitMessage message) {
> dataSourceInternalWriterHelper.onDataWriterCommit(message.toString());
> }
>  
> @Override
> public void commit(WriterCommitMessage[] messages) {
> List<WriteStatus> writeStatuses = Arrays.stream(messages).map(m -> 
> (HoodieWriterCommitMessage) m)
> .flatMap(m -> m.getWriteStatuses().stream()).collect(Collectors.toList());
> dataSourceInternalWriterHelper.commit(writeStatuses);
> }
>  
> public void abort(WriterCommitMessage[] messages) {
> dataSourceInternalWriterHelper.abort();
> }
>  
> }
>  
> DataSourceInternalWriterHelper {
> constructor {
>  instantiates WriteClient
> writeClient.initTable
> writeClient.preWrite
> } 
>  
> public void commit(List<WriteStatus> writeStatuses) {
>      writeClient.commitStats(...)
> }
>  
> }
>  
> So, lets work towards unifying the row writer path w/ non-row writer path. 
> This might be required for us to get the re-designed DAG ready for row-writer 
> as well. 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to