[ 
https://issues.apache.org/jira/browse/HUDI-8704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-8704:
--------------------------------------
    Remaining Estimate: 12h
     Original Estimate: 12h

> Instant time to completion time migration for Flink streaming reader
> --------------------------------------------------------------------
>
>                 Key: HUDI-8704
>                 URL: https://issues.apache.org/jira/browse/HUDI-8704
>             Project: Apache Hudi
>          Issue Type: Sub-task
>          Components: flink-sql
>            Reporter: Danny Chen
>            Assignee: voon
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.0.1
>
>   Original Estimate: 12h
>  Remaining Estimate: 12h
>
> Since 1.0 release, Hudi introduces a new component named:
> {code:java}
> IncrementalQueryAnalyzer {code}
> this is used from completion time based incremental queries, user can 
> specifiy the start/end completion time for the commits to subscribe for a 
> incremental run.
> In IncrementalInputSplits#inputSplits, we use this new component to figure 
> out the commits and filters.
> In this ticket, we want to keep compatibility for legacy workloads, the 
> legacy workload persisted the latest consumed instant time(start time) in 
> Flink state-backend in operator 
> StreamReadMonitoringFunction, and pass it around to the 
> IncrementalInputSplits#inputSplits for current run. We need to translate this 
> instant time into a completion time to avoid consuming duplicates.
> In 0.x StreamReadMonitoringFunction, Hudi stores one timestamp string: 
> issuedInstant, which is the max consumed instant time.
> In 1.x StreamReadMonitoringFunction, Hudi stores two timestamp string: 
> issuedInstant and issuedOffset, the issuedInstant is the max instant time 
> been consumed, the issuedOffset is the max completion time.
> So here is the migration logic:
> in StreamReadMonitoringFunction, if we detect that the issuedOffset in the 
> state-backend is null, we know this is from the legacy table, and we 
> translate it into the completion time of the instant(by quering the active 
> timeline), then pass it around to the IncrementalInputSplits#inputSplits.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to