Danny Chen created HUDI-8704:
--------------------------------
Summary: Instant time to completion time migration for Flink
streaming reader
Key: HUDI-8704
URL: https://issues.apache.org/jira/browse/HUDI-8704
Project: Apache Hudi
Issue Type: New Feature
Components: flink-sql
Reporter: Danny Chen
Assignee: voon
Fix For: 1.0.1
Since 1.0 release, Hudi introduces a new component named:
{code:java}
IncrementalQueryAnalyzer {code}
this is used from completion time based incremental queries, user can specifiy
the start/end completion time for the commits to subscribe for a incremental
run.
In IncrementalInputSplits#inputSplits, we use this new component to figure out
the commits and filters.
In this ticket, we want to keep compatibility for legacy workloads, the legacy
workload persisted the latest consumed instant time(start time) in Flink
state-backend in operator
StreamReadMonitoringFunction, and pass it around to the
IncrementalInputSplits#inputSplits for current run. We need to translate this
instant time into a completion time to avoid consuming duplicates.
In 0.x StreamReadMonitoringFunction, Hudi stores one timestamp string:
issuedInstant, which is the max consumed instant time.
In 1.x StreamReadMonitoringFunction, Hudi stores two timestamp string:
issuedInstant and issuedOffset, the issuedInstant is the max instant time been
consumed, the issuedOffset is the max completion time.
So here is the migration logic:
in StreamReadMonitoringFunction, if we detect that the issuedOffset in the
state-backend is null, we know this is from the legacy table, and we translate
it into the completion time of the instant(by quering the active timeline),
then pass it around to the IncrementalInputSplits#inputSplits.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)