[
https://issues.apache.org/jira/browse/HUDI-6409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
dennysong updated HUDI-6409:
----------------------------
Summary: Incremental read surpports latest_state_with_before format (was:
Spark Incremental read of MOR surpports latest_state_with_before format)
> Incremental read surpports latest_state_with_before format
> ----------------------------------------------------------
>
> Key: HUDI-6409
> URL: https://issues.apache.org/jira/browse/HUDI-6409
> Project: Apache Hudi
> Issue Type: Improvement
> Components: spark
> Reporter: dennysong
> Priority: Major
> Attachments: incremental_format.svg
>
>
> The current incremental query for the time range *(t1, t2]* returns the
> latest record values at *t2.* However, there are situations where we need the
> record values at *t1* to obtain insert/update information. This is
> particularly useful in cases of data rollback, as we need to retrieve this
> delta information(insert/update) from Hudi to roll back external storage
> (such as Hbase in downstream systems).
> Hudi RFC-51 introduces CDC (Change Data Capture) support, but {*}it returns
> all commit change records between (t1, t2]{*}. This can be somewhat
> inefficient and wasteful of resources when we only want to know the values at
> t1 and t2. Furthermore, the returned records in CDC mode are in {*}JSON
> format with a unified schema(a customized format for CDC scenarios){*}, which
> differs from the user's original schema. This makes using CDC more difficult
> compared to snapshot/incremental reads.
> An alternative option to obtain both record values at t1 and t2 is to rely on
> the existing incremental query implementation.
> We can include the record value at t1 (before value) when returning the
> latest record value at t2 in incremental read. For example:
> When `hoodie.datasource.query.incremental.format=latest_state`, the returned
> record is:
> {code:java}
> column1: string, column2: array<string>{code}
> When `hoodie.datasource.query.incremental.format=latest_state_with_before`,
> the returned record is:
> {code:java}
> column1: string, column1_before_: string, column2: array<string>,
> column2_before_: array<string>,{code}
> The implementation is simple. Sort both the base file and log file in
> advance(e.g., compact in MOR), and then perform an ordered merge to combine
> them.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)