[
https://issues.apache.org/jira/browse/HUDI-8136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Y Ethan Guo updated HUDI-8136:
------------------------------
Sprint: Hudi 1.0 Sprint 2024/08/26-9/1, Hudi 1.0 Sprint 2024/09/02-08, Hudi
1.0 Sprint 2024/09/09-15, Hudi 1.0.1 Sprint #1, Hudi 1.0.1 Sprint #2 (Jan),
Hudi 1.0.1 Sprint #2 (Jan) 2 (was: Hudi 1.0 Sprint 2024/08/26-9/1, Hudi 1.0
Sprint 2024/09/02-08, Hudi 1.0 Sprint 2024/09/09-15, Hudi 1.0.1 Sprint #1, Hudi
1.0.1 Sprint #2 (Jan))
> New instant time generation for Flink streaming pipeline
> --------------------------------------------------------
>
> Key: HUDI-8136
> URL: https://issues.apache.org/jira/browse/HUDI-8136
> Project: Apache Hudi
> Issue Type: Improvement
> Components: flink
> Reporter: Danny Chen
> Assignee: Danny Chen
> Priority: Blocker
> Fix For: 1.0.1
>
>
> h2. Design
> The new design moves the instant time generation from being blocked on the
> checkpoint completion on the co-ordinator, to the writers cheaply obtaining
> the instant time to use before every file write by sending a light-weight
> request to the coordinator. The coordinator will maintain a mapping from
> checkpoint barriers(ids) to the instant times to be used by writers for file
> written during a checkpoint. When the writer sends a new instant time
> request, coordinator will return an existing pending instant or a new
> one(based on the comparison of the last finished checkpoint barrier from the
> writer and the existing barriers on the coordinator). We allow at most 2
> pending instants on the coordinator. When an instant is committed, the
> pending mapping item from memory should be removed. Note that new instant
> time generation/switching on checkpoint start and instant time serving should
> happen in sequence (the same thread or a in-process lock) in the coordinator.
> The design assumes the following invariants.. * Once a writer task starts
> writing a file with time {{tx}} , it will not write any file with time {{ty}}
> , such that {{ty < tx}}
> * Checkpoint {{i}} will be started on the co-ordinator only after checkpoint
> {{j}} completes such that {{i < j}}
>
> Overall flow is as follows : # During startup co-ordinator generates a new
> instant {{tx}} and request it on the timeline.
> #
> ## Happens within a process-local lock shared by instant time generation
> ## Any request for instant times from writer tasks, will serve {{tx}}
> # Writer tasks fetch an instant time to use for any file written, by issuing
> a call to co-ordinator
> ## writer tasks keep track of all files written by them between checkpoints.
> # When starting a checkpoint, co-ordinator takes the lock again and
> generates a new instant {{ty}} , request it on the timeline
> ## This ensure all instant time fetches for {{tx}} are first served, before
> checkpoint starts. i.e they will be reported back to the co-ordinator when
> the checkpoint completes.
> # When they receive the event to checkpoint, the writer tasks flush all open
> files, return the list of files written as a part of the checkpoint.
> # Once co-ordinator receives a list of all files from all writer tasks, it
> can include two types of files.
> ## files belonging to {{tx}} , which can now be comitted since we know no
> files with time {{tx}} could have been written between steps 3 & 5.
> ## files belonging to {{ty}} , which cannot be committed yet (there could be
> files still being written for {{ty}} ), but co-ordinator needs to ensure
> these files are ultimately included when {{ty}} is committed.
> h4. The rollback of failed instants
> The clean policy should be always configured as lazy, when a {{ck_n,
> instant_m}} was committed to Hudi timeline, all the instants that are earlier
> than {{instant_m}} should invoke explicitly close to shutdown the heartbeat
> thread so that the async cleaner would finally roll back it.
> When a full task failover triggers, all the pending instants should be rolled
> back, but we would hand over this task to async cleaner.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)