[
https://issues.apache.org/jira/browse/HUDI-6409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
dennysong updated HUDI-6409:
----------------------------
Description:
The current incremental query for the time range *(t1, t2]* returns the latest
record values at *t2.* However, there are situations where we need the record
values at *t1* to obtain insert/update information. This is particularly useful
in cases of data rollback, as we need to retrieve this delta
information(insert/update) from Hudi to roll back external storage (such as
Hbase in downstream systems).
Hudi RFC-51 introduces CDC (Change Data Capture) support, but {*}it returns all
commit change records between (t1, t2]{*}. This can be somewhat inefficient and
wasteful of resources when we only want to know the values at t1 and t2.
Furthermore, the returned records in CDC mode are in {*}JSON format with a
unified schema(a customized format for CDC scenarios){*}, which differs from
the user's original schema. This makes using CDC more difficult compared to
snapshot/incremental reads.
An alternative option to obtain both record values at t1 and t2 is to rely on
the existing incremental query implementation.
We can include the record value at t1 (before value) when returning the latest
record value at t2 in incremental read. For example:
When `hoodie.datasource.query.incremental.format=latest_state`, the returned
record is:
{code:java}
column1: string, column2: array<string>{code}
When `hoodie.datasource.query.incremental.format=latest_state_with_before`, the
returned record is:
{code:java}
column1: string, column1_before_: string, column2: array<string>,
column2_before_: array<string>,{code}
The implementation is simple. Sort both the base file and log file in
advance(e.g., compact in MOR), and then perform an ordered merge to combine
them.
was:
The current incremental query for the time range *(t1, t2]* returns the latest
record values at *t2.* However, there are situations where we need the record
values at *t1* to obtain insert/update information. This is particularly useful
in cases of data rollback, as we need to retrieve this delta
information(insert/update) from Hudi to roll back external storage (such as
Hbase in downstream systems).
Hudi RFC-51 introduces CDC (Change Data Capture) support, but {*}it returns all
commit change records between (t1, t2]{*}. This can be somewhat inefficient and
wasteful of resources when we only want to know the values at t1 and t2.
Furthermore, the returned records in CDC mode are in {*}JSON format with a
unified schema(a customized format for CDC scenarios){*}, which differs from
the user's original schema. This makes using CDC more difficult compared to
snapshot/incremental reads.
An alternative option to obtain both record values at t1 and t2 is to rely on
the existing incremental query implementation.
We can include the record value at t1 (before value) when returning the latest
record value at t2 in incremental read. For example:
When `hoodie.datasource.query.incremental.format=latest_state`, the returned
record is:
{code:java}
column1: string, column2: array<string>{code}
When `hoodie.datasource.query.incremental.format=latest_state_with_before`, the
returned record is:
{code:java}
column1: string, column1_before_: string, column2: array<string>,
column2_before_: array<string>,{code}
The implementation is simple. Sort both the base file and log file in
advance(e.g., compact in MOR), and then perform an ordered merge to combine
them.
> Spark Incremental read of MOR surpports latest_state_with_before format
> -----------------------------------------------------------------------
>
> Key: HUDI-6409
> URL: https://issues.apache.org/jira/browse/HUDI-6409
> Project: Apache Hudi
> Issue Type: Improvement
> Components: spark
> Reporter: dennysong
> Priority: Major
> Attachments: incremental_format.svg
>
>
> The current incremental query for the time range *(t1, t2]* returns the
> latest record values at *t2.* However, there are situations where we need the
> record values at *t1* to obtain insert/update information. This is
> particularly useful in cases of data rollback, as we need to retrieve this
> delta information(insert/update) from Hudi to roll back external storage
> (such as Hbase in downstream systems).
> Hudi RFC-51 introduces CDC (Change Data Capture) support, but {*}it returns
> all commit change records between (t1, t2]{*}. This can be somewhat
> inefficient and wasteful of resources when we only want to know the values at
> t1 and t2. Furthermore, the returned records in CDC mode are in {*}JSON
> format with a unified schema(a customized format for CDC scenarios){*}, which
> differs from the user's original schema. This makes using CDC more difficult
> compared to snapshot/incremental reads.
> An alternative option to obtain both record values at t1 and t2 is to rely on
> the existing incremental query implementation.
> We can include the record value at t1 (before value) when returning the
> latest record value at t2 in incremental read. For example:
> When `hoodie.datasource.query.incremental.format=latest_state`, the returned
> record is:
> {code:java}
> column1: string, column2: array<string>{code}
> When `hoodie.datasource.query.incremental.format=latest_state_with_before`,
> the returned record is:
> {code:java}
> column1: string, column1_before_: string, column2: array<string>,
> column2_before_: array<string>,{code}
> The implementation is simple. Sort both the base file and log file in
> advance(e.g., compact in MOR), and then perform an ordered merge to combine
> them.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)