[ 
https://issues.apache.org/jira/browse/HUDI-9564?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen updated HUDI-9564:
-----------------------------
    Description: 
Currently, we have 5 ramifications for record merging:

1. partial merging;
2. COMMIT_TIME_ORDERING merge mode;
3. EVENT_TIME_ORDERING merge mode;
4. CUSTOM merge mode with payload;
5. CUSTOM merge mode with merger.

These ramifications are checked per record within 3 core merge methods:
{code:java}
// for log/log merging
FileGroupRecordBuffer.doProcessNextDataRecord

// for log/delete merging
FileGroupRecordBuffer.doProcessNextDeleteRecord

// for base/log merging
FileGroupRecordBuffer.merge
{code}
This is hard to extend/maintain especially recently we want to generalize the 
partial merging.

So here I suggest we make the merging a separate component for easier extension.
{code:java}
public interface BufferedRecordMerger {
  // akka to #doProcessNextDataRecord
  Option<BufferedRecord<T>> deltaMerge(BufferedRecord<T> newRecord, 
BufferedRecord<T> existingRecord)
  // akka to #doProcessNextDeletedRecord
  Option<DeleteRecord> deltaMerge(DeleteRecord deleteRecord, BufferedRecord<T> 
existingRecord);
  // akka to #merge
  Pair<Boolean, T> finalMerge(BufferedRecord<T> olderRecord, BufferedRecord<T> 
newerRecord)
}
{code}

We will have 5 impls for this merger akka to the 5 ramifications, there could 
be more if partial update is generalized. The merger should be initialzied 
together with the FileGroupRecordBuffer because it is deterministic per-query.

> Inroduce BufferRecordMerger API in FileGroupRecordBuffer
> --------------------------------------------------------
>
>                 Key: HUDI-9564
>                 URL: https://issues.apache.org/jira/browse/HUDI-9564
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: reader-core
>            Reporter: Danny Chen
>            Priority: Major
>             Fix For: 1.1.0
>
>
> Currently, we have 5 ramifications for record merging:
> 1. partial merging;
> 2. COMMIT_TIME_ORDERING merge mode;
> 3. EVENT_TIME_ORDERING merge mode;
> 4. CUSTOM merge mode with payload;
> 5. CUSTOM merge mode with merger.
> These ramifications are checked per record within 3 core merge methods:
> {code:java}
> // for log/log merging
> FileGroupRecordBuffer.doProcessNextDataRecord
> // for log/delete merging
> FileGroupRecordBuffer.doProcessNextDeleteRecord
> // for base/log merging
> FileGroupRecordBuffer.merge
> {code}
> This is hard to extend/maintain especially recently we want to generalize the 
> partial merging.
> So here I suggest we make the merging a separate component for easier 
> extension.
> {code:java}
> public interface BufferedRecordMerger {
>   // akka to #doProcessNextDataRecord
>   Option<BufferedRecord<T>> deltaMerge(BufferedRecord<T> newRecord, 
> BufferedRecord<T> existingRecord)
>   // akka to #doProcessNextDeletedRecord
>   Option<DeleteRecord> deltaMerge(DeleteRecord deleteRecord, 
> BufferedRecord<T> existingRecord);
>   // akka to #merge
>   Pair<Boolean, T> finalMerge(BufferedRecord<T> olderRecord, 
> BufferedRecord<T> newerRecord)
> }
> {code}
> We will have 5 impls for this merger akka to the 5 ramifications, there could 
> be more if partial update is generalized. The merger should be initialzied 
> together with the FileGroupRecordBuffer because it is deterministic per-query.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to