Danny Chen created HUDI-8704:
--------------------------------

             Summary: Instant time to completion time migration for Flink 
streaming reader
                 Key: HUDI-8704
                 URL: https://issues.apache.org/jira/browse/HUDI-8704
             Project: Apache Hudi
          Issue Type: New Feature
          Components: flink-sql
            Reporter: Danny Chen
            Assignee: voon
             Fix For: 1.0.1


Since 1.0 release, Hudi introduces a new component named:
{code:java}
IncrementalQueryAnalyzer {code}
this is used from completion time based incremental queries, user can specifiy 
the start/end completion time for the commits to subscribe for a incremental 
run.

In IncrementalInputSplits#inputSplits, we use this new component to figure out 
the commits and filters.

In this ticket, we want to keep compatibility for legacy workloads, the legacy 
workload persisted the latest consumed instant time(start time) in Flink 
state-backend in operator 
StreamReadMonitoringFunction, and pass it around to the 
IncrementalInputSplits#inputSplits for current run. We need to translate this 
instant time into a completion time to avoid consuming duplicates.

In 0.x StreamReadMonitoringFunction, Hudi stores one timestamp string: 
issuedInstant, which is the max consumed instant time.

In 1.x StreamReadMonitoringFunction, Hudi stores two timestamp string: 
issuedInstant and issuedOffset, the issuedInstant is the max instant time been 
consumed, the issuedOffset is the max completion time.

So here is the migration logic:
in StreamReadMonitoringFunction, if we detect that the issuedOffset in the 
state-backend is null, we know this is from the legacy table, and we translate 
it into the completion time of the instant(by quering the active timeline), 
then pass it around to the IncrementalInputSplits#inputSplits.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to