[ 
https://issues.apache.org/jira/browse/HUDI-6409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dennysong updated HUDI-6409:
----------------------------
    Description: 
The current incremental query for the time range *(t1, t2]* returns the latest 
record values at *t2.* However, there are situations where we need the record 
values at *t1* to obtain insert/update information. This is particularly useful 
in cases of data rollback, as we need to retrieve this delta 
information(insert/update) from Hudi to roll back external storage (such as 
Hbase in downstream systems).

Hudi RFC-51 introduces CDC (Change Data Capture) support, but {*}it returns all 
commit change records between (t1, t2]{*}. This can be somewhat inefficient and 
wasteful of resources when we only want to know the values at t1 and t2. 
Furthermore, the returned records in CDC mode are in {*}JSON format with a 
unified schema(a customized format for CDC scenarios){*}, which differs from 
the user's original schema. This makes using CDC more difficult compared to 
snapshot/incremental reads.

An alternative option to obtain both record values at t1 and t2 is to rely on 
the existing incremental query implementation.

We can include the record value at t1 (before value) when returning the latest 
record value at t2 in incremental read. For example:

When `hoodie.datasource.query.incremental.format=latest_state`, the returned 
record is:
{code:java}
column1: string, column2: array<string>{code}
When `hoodie.datasource.query.incremental.format=latest_state_with_before`, the 
returned record is:
{code:java}
column1: string, column1_before_: string, column2: array<string>, 
column2_before_: array<string>,{code}
The implementation is simple. Sort both the base file and log file in 
advance(e.g., compact in MOR), and then perform an ordered merge to combine 
them.

  was:
The current incremental query for the time range *(t1, t2]* returns the latest 
record values at *t2.* However, there are situations where we need the record 
values at *t1* to obtain insert/update information. This is particularly useful 
in cases of data rollback, as we need to retrieve this delta 
information(insert/update) from Hudi to roll back external storage (such as 
Hbase in downstream systems).

 

Hudi RFC-51 introduces CDC (Change Data Capture) support, but {*}it returns all 
commit change records between (t1, t2]{*}. This can be somewhat inefficient and 
wasteful of resources when we only want to know the values at t1 and t2. 
Furthermore, the returned records in CDC mode are in {*}JSON format with a 
unified schema(a customized format for CDC scenarios){*}, which differs from 
the user's original schema. This makes using CDC more difficult compared to 
snapshot/incremental reads.

 

An alternative option to obtain both record values at t1 and t2 is to rely on 
the existing incremental query implementation.

We can include the record value at t1 (before value) when returning the latest 
record value at t2 in incremental read. For example:

When `hoodie.datasource.query.incremental.format=latest_state`, the returned 
record is:

 
{code:java}
column1: string, column2: array<string>{code}
When `hoodie.datasource.query.incremental.format=latest_state_with_before`, the 
returned record is:

 

 
{code:java}
column1: string, column1_before_: string, column2: array<string>, 
column2_before_: array<string>,{code}
 

The implementation is simple. Sort both the base file and log file in 
advance(e.g., compact in MOR), and then perform an ordered merge to combine 
them.

 

 


> Spark Incremental read of MOR surpports latest_state_with_before format
> -----------------------------------------------------------------------
>
>                 Key: HUDI-6409
>                 URL: https://issues.apache.org/jira/browse/HUDI-6409
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: spark
>            Reporter: dennysong
>            Priority: Major
>         Attachments: incremental_format.svg
>
>
> The current incremental query for the time range *(t1, t2]* returns the 
> latest record values at *t2.* However, there are situations where we need the 
> record values at *t1* to obtain insert/update information. This is 
> particularly useful in cases of data rollback, as we need to retrieve this 
> delta information(insert/update) from Hudi to roll back external storage 
> (such as Hbase in downstream systems).
> Hudi RFC-51 introduces CDC (Change Data Capture) support, but {*}it returns 
> all commit change records between (t1, t2]{*}. This can be somewhat 
> inefficient and wasteful of resources when we only want to know the values at 
> t1 and t2. Furthermore, the returned records in CDC mode are in {*}JSON 
> format with a unified schema(a customized format for CDC scenarios){*}, which 
> differs from the user's original schema. This makes using CDC more difficult 
> compared to snapshot/incremental reads.
> An alternative option to obtain both record values at t1 and t2 is to rely on 
> the existing incremental query implementation.
> We can include the record value at t1 (before value) when returning the 
> latest record value at t2 in incremental read. For example:
> When `hoodie.datasource.query.incremental.format=latest_state`, the returned 
> record is:
> {code:java}
> column1: string, column2: array<string>{code}
> When `hoodie.datasource.query.incremental.format=latest_state_with_before`, 
> the returned record is:
> {code:java}
> column1: string, column1_before_: string, column2: array<string>, 
> column2_before_: array<string>,{code}
> The implementation is simple. Sort both the base file and log file in 
> advance(e.g., compact in MOR), and then perform an ordered merge to combine 
> them.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to