[
https://issues.apache.org/jira/browse/HUDI-8471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
sivabalan narayanan updated HUDI-8471:
--------------------------------------
Description:
Row writer uses writeClient in an unconventional ways compared to other
operations.
Typical write operation takes the following flow:
```
1.
WriteClient.upsert
{ Instantiate HoodieTable result = table.upsert() postWrite() return
HoodieData<WriteStatus> }
2. writeClient.commitStats(return value from (1) i.e
HoodieData<WriteStatus>,.... ) which internally will commit the write and then
call clean, archive, compaction, clustering etc.
1.a
HoodieTable.upsert()
{ calls into SparkUpsertCommitActionExecutor.execute() }
1.a.i
SparkUpsertCommitActionExecutor.execute()
{ return HoodieWriteHelper.newInstance().write(...) }
1.a.i.1
HoodieWriteHelper.newInstance().write()
{ dedup records tagRecords or index lookup return
BaseCommitActionExecutor.execute() }
1.a.i.1.a
BaseSparkCommitActionExecutor.execute()
{ build workload profile getPartitioner HoodieData<WriteStatus> writeStatuses =
mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner); // this is
where the writes happen update index and return HoodieData<WriteStatus> }
```
While rowWriter looks like below
```
1. HoodieSparkSqlWriter.write
bulkInsertAsRow
{ writeclient.startCommit WriteResult =
BaseDatasetBulkInsertCommitActionExecutor.execute() // by the time we return
from here, data is committed fully along w/ any inline table services. }
1.a BaseDatasetBulkInsertCommitActionExecutor.execute
{ write to custom spark ds }
2. Custom Spark DS:
We have implemented a series of interfaces which goes as follows
DefaultSource -> HoodieDataSourceInternalTable
HoodieDataSourceInternalTable.newWriteBuilder will return
HoodieDataSourceInternalBatchWriteBuilder
this builder has buildForBatch() which will return BatchWrite.
BatchWrite is core to our writes.
{
constructor: {
instantiate dataSourceInternalWriterHelper (DataSourceInternalWriterHelper)
}
DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
return new
HoodieBulkInsertDataInternalWriterFactory(dataSourceInternalWriterHelper.getHoodieTable(),
writeConfig, instantTime, structType, populateMetaFields,
arePartitionRecordsSorted);
}
public void onDataWriterCommit(WriterCommitMessage message) {
dataSourceInternalWriterHelper.onDataWriterCommit(message.toString());
}
@Override
public void commit(WriterCommitMessage[] messages) {
List<WriteStatus> writeStatuses = Arrays.stream(messages).map(m ->
(HoodieWriterCommitMessage) m)
.flatMap(m -> m.getWriteStatuses().stream()).collect(Collectors.toList());
dataSourceInternalWriterHelper.commit(writeStatuses);
}
public void abort(WriterCommitMessage[] messages) {
dataSourceInternalWriterHelper.abort();
}
}
DataSourceInternalWriterHelper {
constructor {
instantiates WriteClient
writeClient.initTable
writeClient.preWrite
}
public void commit(List<WriteStatus> writeStatuses) {
writeClient.commitStats(...)
}
}
So, lets work towards unifying the row writer path w/ non-row writer path.
This might be required for us to get the re-designed DAG ready for row-writer
as well.
was:
Row writer uses writeClient in an unconventional ways compared to other
operations.
Typical write operation takes the following flow:
```
1.
WriteClient.upsert {
Instantiate HoodieTable
result = table.upsert()
postWrite()
return HoodieData<WriteStatus>
}
2. writeClient.commitStats(return value from (1) i.e
HoodieData<WriteStatus>,.... ) which internally will commit the write and then
call clean, archive, compaction, clustering etc.
1.a
HoodieTable.upsert() {
calls into SparkUpsertCommitActionExecutor.execute()
}
1.a.i
SparkUpsertCommitActionExecutor.execute() {
return HoodieWriteHelper.newInstance().write(...)
}
1.a.i.1
HoodieWriteHelper.newInstance().write() {
dedup records
tagRecords or index lookup
return BaseCommitActionExecutor.execute()
}
1.a.i.1.a
BaseSparkCommitActionExecutor.execute() {
build workload profile
getPartitioner
HoodieData<WriteStatus> writeStatuses =
mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner); // this is
where the writes happen
update index and return HoodieData<WriteStatus>
}
```
While rowWriter looks like below
```
1. HoodieSparkSqlWriter.write
bulkInsertAsRow {
writeclient.startCommit
WriteResult = BaseDatasetBulkInsertCommitActionExecutor.execute() // by
the time we return from here, data is committed fully along w/ any inline table
services.
}
1.a BaseDatasetBulkInsertCommitActionExecutor.execute {
write to custom spark ds
}
2. Custom Spark DS:
We have implemented a series of interfaces which goes as follows
DefaultSource -> HoodieDataSourceInternalTable
HoodieDataSourceInternalTable.newWriteBuilder will return
HoodieDataSourceInternalBatchWriteBuilder
this builder has buildForBatch() which will return BatchWrite.
BatchWrite is core to our writes.
> Unify row writer and non-row writer code paths
> ----------------------------------------------
>
> Key: HUDI-8471
> URL: https://issues.apache.org/jira/browse/HUDI-8471
> Project: Apache Hudi
> Issue Type: Sub-task
> Components: writer-core
> Reporter: sivabalan narayanan
> Assignee: sivabalan narayanan
> Priority: Major
>
> Row writer uses writeClient in an unconventional ways compared to other
> operations.
> Typical write operation takes the following flow:
> ```
> 1.
> WriteClient.upsert
> { Instantiate HoodieTable result = table.upsert() postWrite() return
> HoodieData<WriteStatus> }
> 2. writeClient.commitStats(return value from (1) i.e
> HoodieData<WriteStatus>,.... ) which internally will commit the write and
> then call clean, archive, compaction, clustering etc.
> 1.a
> HoodieTable.upsert()
> { calls into SparkUpsertCommitActionExecutor.execute() }
> 1.a.i
> SparkUpsertCommitActionExecutor.execute()
> { return HoodieWriteHelper.newInstance().write(...) }
> 1.a.i.1
> HoodieWriteHelper.newInstance().write()
> { dedup records tagRecords or index lookup return
> BaseCommitActionExecutor.execute() }
> 1.a.i.1.a
> BaseSparkCommitActionExecutor.execute()
> { build workload profile getPartitioner HoodieData<WriteStatus> writeStatuses
> = mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner); // this
> is where the writes happen update index and return HoodieData<WriteStatus> }
> ```
> While rowWriter looks like below
> ```
> 1. HoodieSparkSqlWriter.write
> bulkInsertAsRow
> { writeclient.startCommit WriteResult =
> BaseDatasetBulkInsertCommitActionExecutor.execute() // by the time we return
> from here, data is committed fully along w/ any inline table services. }
> 1.a BaseDatasetBulkInsertCommitActionExecutor.execute
> { write to custom spark ds }
> 2. Custom Spark DS:
> We have implemented a series of interfaces which goes as follows
> DefaultSource -> HoodieDataSourceInternalTable
> HoodieDataSourceInternalTable.newWriteBuilder will return
> HoodieDataSourceInternalBatchWriteBuilder
> this builder has buildForBatch() which will return BatchWrite.
> BatchWrite is core to our writes.
> {
> constructor: {
> instantiate dataSourceInternalWriterHelper (DataSourceInternalWriterHelper)
> }
> DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
> return new
> HoodieBulkInsertDataInternalWriterFactory(dataSourceInternalWriterHelper.getHoodieTable(),
> writeConfig, instantTime, structType, populateMetaFields,
> arePartitionRecordsSorted);
> }
>
> public void onDataWriterCommit(WriterCommitMessage message) {
> dataSourceInternalWriterHelper.onDataWriterCommit(message.toString());
> }
>
> @Override
> public void commit(WriterCommitMessage[] messages) {
> List<WriteStatus> writeStatuses = Arrays.stream(messages).map(m ->
> (HoodieWriterCommitMessage) m)
> .flatMap(m -> m.getWriteStatuses().stream()).collect(Collectors.toList());
> dataSourceInternalWriterHelper.commit(writeStatuses);
> }
>
> public void abort(WriterCommitMessage[] messages) {
> dataSourceInternalWriterHelper.abort();
> }
>
> }
>
> DataSourceInternalWriterHelper {
> constructor {
> instantiates WriteClient
> writeClient.initTable
> writeClient.preWrite
> }
>
> public void commit(List<WriteStatus> writeStatuses) {
> writeClient.commitStats(...)
> }
>
> }
>
> So, lets work towards unifying the row writer path w/ non-row writer path.
> This might be required for us to get the re-designed DAG ready for row-writer
> as well.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)