dennysong created HUDI-6409:
-------------------------------
Summary: Spark Incremental read of MOR surpports
latest_state_with_before format
Key: HUDI-6409
URL: https://issues.apache.org/jira/browse/HUDI-6409
Project: Apache Hudi
Issue Type: Improvement
Components: spark
Reporter: dennysong
Attachments: incremental_format.svg
The current incremental query for the time range *(t1, t2]* returns the latest
record values at *t2.* However, there are situations where we need the record
values at *t1* to obtain insert/update information. This is particularly useful
in cases of data rollback, as we need to retrieve this delta
information(insert/update) from Hudi to roll back external storage (such as
Hbase in downstream systems).
Hudi RFC-51 introduces CDC (Change Data Capture) support, but {*}it returns all
commit change records between (t1, t2]{*}. This can be somewhat inefficient and
wasteful of resources when we only want to know the values at t1 and t2.
Furthermore, the returned records in CDC mode are in {*}JSON format with a
unified schema(a customized format for CDC scenarios){*}, which differs from
the user's original schema. This makes using CDC more difficult compared to
snapshot/incremental reads.
An alternative option to obtain both record values at t1 and t2 is to rely on
the existing incremental query implementation.
We can include the record value at t1 (before value) when returning the latest
record value at t2 in incremental read. For example:
When `hoodie.datasource.query.incremental.format=latest_state`, the returned
record is:
{code:java}
column1: string, column2: array<string>{code}
When `hoodie.datasource.query.incremental.format=latest_state_with_before`, the
returned record is:
{code:java}
column1: string, column1_before_: string, column2: array<string>,
column2_before_: array<string>,{code}
The implementation is simple. Sort both the base file and log file in
advance(e.g., compact in MOR), and then perform an ordered merge to combine
them.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)