Hi Biao,
Thanks for the follow-up and for pointing me back to the Google Doc sections.
I've re-read the highlighted parts on writer-side native state persistence, and
you're absolutely right — that design elegantly resolves both (1) and (2):
No synchronous HDFS write on the barrier path: The FileInfoEvent becomes
fire-and-forget after snapshotState(), since committables are already durably
captured in Flink's native operator state.
Single state system: The coordinator only tracks lightweight metadata
(commitUser, watermark, listeners) through
checkpointCoordinator/resetToCheckpoint, eliminating the dual-state consistency
concern entirely.
Resolved Flink state persistence timing: By moving committable ownership to the
writer side's ListState, the coordinator no longer needs to reason about
pre-barrier durability — Flink's native checkpoint mechanism handles this
transparently, and the timing mismatch between coordinator state snapshot and
committable persistence simply doesn't arise.
I've also prototyped a local revision aligning with this approach — moving
committable persistence to WriteOperator's ListState<CommitMessage> and
simplifying PWC to pure coordination logic. The barrier latency improvement is
significant.
Regarding your proposal to upstream the internal implementation: please do open
the GitHub issue. I'd be very interested to review the production code and
compare notes against my branch. Having concrete code as the anchor will
definitely sharpen the discussion, especially on the UnawareBucketSink + region
failover path.
On the FixedBucketSink scope question — I now agree with your assessment. If
region failover is the primary motivation, including FixedBucketSink risks
scope dilution and future rewrite. I'm happy to refocus PIP-30 strictly on
UnawareBucketSink.
Looking forward to the issue and the code review!
Best,
yugan
原始邮件
发件人:Biao Liu <[email protected]>
发件时间:2026年6月8日 21:28
收件人:dev <[email protected]>
主题:Re: [DISCUSS] PIP-30: Paimon Write Coordinator - Replacing CommitOperator
with JobManager-level Coordination
Hi yugan,
Thanks for the thoughtful response. A few replies inline.
On observation 1 (synchronous wait) and observation 2 (two coexisting state
systems)
Both of these are addressed in the design discussed in the earlier
mailing-list thread / Google Doc I shared (yellow-highlighted sections; the
doc is on Google Docs because we ran into image-editing issues updating the
original PIP-30 on cwiki). The key point: in-flight committables are
persisted on the writer side through Flink's native operator state, not on
the coordinator.
With that, the coordinator does not need to durably persist CommitMessages
before writers unblock — the handoff is a fire-and-forget event after
snapshotState() , no synchronous HDFS write on the barrier path. The
coordinator itself only owns a small amount of state (commitUser,
watermark, listener state), which fits the Flink-native
checkpointCoordinator / resetToCheckpoint channel. One state system on the
JM side, so the dual-state consistency concern doesn't arise.
Worth re-reading those sections — I think it covers (1) and (2) without
needing the Flink-internal API workarounds you mentioned.
On observation 3 (FixedBucketSink scope)
Two concerns.
First, splitting PIP-30 along sink-type lines is risky. FixedBucketSink and
UnawareBucketSink share the same coordinator implementation and are tightly
coupled at the implementation level. Landing "shuffle-free commit for
FixedBucketSink" first will likely force a rewrite of the core design when
UnawareBucketSink + region failover is added later, rather than a clean
extension.
Second, I'm not sure FixedBucketSink should be in PIP-30 at all. Our
internal implementation of this design — which is already deployed in
production — doesn't wire up FixedBucketSink , precisely because the
coordinator commit doesn't solve region failover there (single-region
topology regardless). If region failover is the primary motivation of
PIP-30, including a sink that can't benefit from it seems to dilute the
scope.
Proposal
If you don't mind, I'd like to open a GitHub issue to upstream our internal
implementation (the design itself is in the Google Doc), so the discussion
has concrete code to anchor on alongside your prototype branch. Happy to
have you participate and check whether it addresses the three points.
What do you think?
Thanks,
Biao /'bɪ.aʊ/
On Thu, 4 Jun 2026 at 23:32, 俞淦 <[email protected]> wrote:
> Hi Biao,
>
>
> Thanks for the detailed review and for pointing to the earlier
> mailing-list thread and design doc. I've read both and they provide very
> useful context. Let me address each observation.
>
>
> 1. Synchronous wait on the writer barrier path
>
>
> Acknowledged. The current sender.snapshot(ck).get() blocking design does
> introduce a new latency source on the critical path.
>
>
> My rationale was to ensure strict ordering: PWC must persist state before
> writers complete snapshotState(), so that JM failover never loses in-flight
> committables. However, you're right that the current implementation is too
> pessimistic.Potential mitigations I'd like to explore:
>
>
> (1) PWC sends ACK immediately upon receiving all FileInfoEvents, before
> HDFS hsync. HDFS write proceeds async. Risk: JM crash between ACK and hsync
> → committables lost.
> (2) Writers send FileInfo in background during checkpoint interval, not at
> barrier time. Barrier only sends a lightweight "ready" signal. Requires
> larger change to writer side.
> (3) Flink-internal API (even unstable) to access the checkpoint storage
> location from&nbsp; &nbsp; OperatorCoordinator. This would solve both
> eliminating the need for a custom HDFS state and avoiding the additional
> blocking mechanism.
>
>
> But I think we may only be able to reduce blocking, not eliminate it
> entirely.
>
>
> 2. Two coexisting state systems on the JM side
>
>
> You're absolutely right — this is the deepest design risk in the prototype.
> The root cause is a capability gap in Flink's OperatorCoordinator API:
> checkpointCoordinator() returns byte[], but Flink gives the coordinator no
> visibility into where that byte array is stored, when it is durably
> persisted, or how to correlate it with the
> notifyCheckpointComplete/resetToCheckpoint lifecycle.
>
>
> Specifically:
>
> I cannot call CheckpointStorageAccess.getBaseLocation() (method doesn't
> exist)
>
>
> I cannot derive the checkpoint directory from Context (no
> getJobInformation())
>
>
> Even if I could, the coordinator state is written after
> checkpointCoordinator() returns, while I need persistence before writers
> unblock
>
>
>
> This forced the self-managed HDFS path. But I agree the resulting
> dual-state consistency is fragile.
>
>
> &nbsp; 1) Is there a Flink-internal API (even unstable) to access the
> checkpoint storage location from&nbsp; &nbsp; OperatorCoordinator? This
> would solve both problem 1 and problem 2.
> &nbsp; 2) Alternatively, would the community accept a design where PWC
> state is fully embedded in the coordinator's byte[] state, and we accept
> that snapshotState() on writers blocks only until the coordinator's
> checkpointCoordinator() returns (not until HDFS is durable)? The risk is JM
> crash between checkpointCoordinator() completion and Flink's actual state
> persistence — but perhaps that's acceptable if Paimon's commit is
> idempotent and can filter duplicates?
> &nbsp; 3) Or should PIP-30 scope be reduced to only address the
> network-shuffle elimination (problem 1), and defer the HA-recovery redesign
> until Flink's coordinator API provides better storage integration?
>
>
> 3. FixedBucketSink scope vs region failover
>
>
> Valid catch. The current branch wires only FixedBucketSink because that's
> the immediate production need, but you're correct that FixedBucketSink's
> keyBy(bucketKey) creates an all-to-all edge, resulting in a single
> pipelined region. Region failover is not possible in this topology
> regardless of where the committer lives.
> Clarification on PIP-30 motivation:
>
>
> I think the original PIP-30 lists two goals:
> (a) Eliminate network shuffle for commit messages (delivered by
> coordinator RPC)
> (b) Enable region failover by decoupling commit from writer region
>
>
> Goal (a) is fully achieved even for FixedBucketSink — commit messages no
> longer traverse the shuffle.
> Goal (b) requires UnawareBucketSink (or a bucket-less hash-distribution
> mode), where writers can be colocated with sources without keyBy. This is
> not yet implemented in the prototype.
>
>
> Proposed scope adjustment:
> A. Keep PIP-30 as "coordinator-based commit for all sinks", but
> acknowledge region failover only applies to UnawareBucketSink (future work)
> B. Narrow PIP-30 to "shuffle-free commit for FixedBucketSink", spin out
> region failover to a separate PIP
> C. Delay PIP-30 until UnawareBucketSink coordinator integration is ready,
> so both goals land together
>
>
> I prefer option A if the community is willing to accept incremental
> delivery, or option B if we want stricter scope discipline. What do you
> think?
>
>
> Based on this feedback, I see three possible paths:
>
>
> Iterate on prototype: Implement async HDFS write (problem 1), investigate
> Flink API for unified state (problem 2), keep FixedBucketSink scope with
> documented limitation (problem 3).
>
>
> Split the PIP: Separate "shuffle-free commit" (smaller, ready soon) from
> "region failover architecture" (larger, needs UnawareBucketSink).
>
>
> Pause for Flink API evolution: Wait for Flink to expose checkpoint storage
> to coordinators, then redesign with single-state system.
>
>
> I'd like community guidance on which path to pursue. I'm happy to revise
> the prototype once we have consensus.
>
>
> Thanks,
> yugan
>
>
>
>
> 原始邮件
>
>
> 发件人:Biao Liu <[email protected]&gt;
> 发件时间:2026年5月29日 22:15
> 收件人:dev <[email protected]&gt;
> 主题:Re: [DISCUSS] PIP-30: Paimon Write Coordinator - Replacing
> CommitOperator with JobManager-level Coordination
>
>
>
> Hi&nbsp;yugan,
>
>
> Thanks&nbsp;a&nbsp;lot&nbsp;for&nbsp;sharing&nbsp;the&nbsp;prototype&nbsp;and&nbsp;the&nbsp;detailed&nbsp;write-up.&nbsp;The
>
> PIP-30&nbsp;direction&nbsp;is&nbsp;great,&nbsp;and&nbsp;the&nbsp;prototype&nbsp;gives&nbsp;a&nbsp;very&nbsp;concrete&nbsp;starting
> point&nbsp;for&nbsp;discussion.
>
>
> For&nbsp;background,&nbsp;an&nbsp;earlier&nbsp;mailing-list&nbsp;discussion&nbsp;on&nbsp;the&nbsp;same&nbsp;topic&nbsp;is
> here:&nbsp;
> https://lists.apache.org/thread/5n1mwc7wc3cfjogz56w9k5ql3zyl00lw&nbsp;And
> a&nbsp;related&nbsp;design&nbsp;doc&nbsp;is&nbsp;here
> <https://lists.apache.org/thread/5n1mwc7wc3cfjogz56w9k5ql3zyl00lw&nbsp;Anda&nbsp;related&nbsp;design&nbsp;doc&nbsp;is&nbsp;here>
> :
>
> https://docs.google.com/document/d/1asWKzoytfeB1D8bS_yRIAHnpR40frLf0fnPn2-WSL74/edit?tab=t.0
>
>
> Both&nbsp;might&nbsp;be&nbsp;useful&nbsp;as&nbsp;reference&nbsp;for&nbsp;the&nbsp;discussion&nbsp;below.
>
>
> I&nbsp;have&nbsp;three&nbsp;observations&nbsp;about&nbsp;the&nbsp;current&nbsp;PWC&nbsp;prototype.
>
>
> 1.&nbsp;Synchronous&nbsp;wait&nbsp;on&nbsp;the&nbsp;writer&nbsp;barrier&nbsp;path
>
>
> In&nbsp;the&nbsp;current&nbsp;flow,&nbsp;after&nbsp;the&nbsp;barrier&nbsp;passes&nbsp;through,&nbsp;each&nbsp;writer
>
> subtask's&nbsp;snapshotState()&nbsp;calls&nbsp;sender.snapshot(checkpointId).get()&nbsp;and
>
> blocks&nbsp;until&nbsp;the&nbsp;coordinator&nbsp;side&nbsp;completes&nbsp;the&nbsp;future.&nbsp;On&nbsp;the&nbsp;coordinator
>
> side,&nbsp;that&nbsp;future&nbsp;is&nbsp;only&nbsp;signalled&nbsp;after&nbsp;all&nbsp;subtasks&nbsp;have&nbsp;sent&nbsp;their
>
> FileInfoEvent&nbsp;,&nbsp;the&nbsp;coordinator&nbsp;has&nbsp;aggregated&nbsp;the&nbsp;committables,&nbsp;and
>
> PwcStateManager.writeState&nbsp;has&nbsp;finished&nbsp;its&nbsp;HDFS&nbsp;hsync&nbsp;+&nbsp;rename&nbsp;.&nbsp;The
>
> coordinator&nbsp;runs&nbsp;all&nbsp;of&nbsp;this&nbsp;on&nbsp;a&nbsp;single-thread&nbsp;event&nbsp;loop.
>
>
> PIP-30&nbsp;was&nbsp;opened&nbsp;mainly&nbsp;to&nbsp;help&nbsp;large-scale&nbsp;Flink-to-Paimon&nbsp;write&nbsp;jobs,
>
> where&nbsp;checkpoint&nbsp;duration&nbsp;under&nbsp;high&nbsp;parallelism&nbsp;is&nbsp;already&nbsp;sensitive.&nbsp;With
>
> this&nbsp;design,&nbsp;every&nbsp;subtask's&nbsp;checkpoint&nbsp;additionally&nbsp;has&nbsp;to&nbsp;wait&nbsp;on&nbsp;(a)&nbsp;the
>
> slowest-arriving&nbsp;subtask,&nbsp;(b)&nbsp;one&nbsp;HDFS&nbsp;hsync&nbsp;on&nbsp;the&nbsp;JM,&nbsp;and&nbsp;(c)&nbsp;the&nbsp;JM's
>
> single-thread&nbsp;event&nbsp;loop&nbsp;draining&nbsp;N&nbsp;events.&nbsp;This&nbsp;adds&nbsp;a&nbsp;new&nbsp;source&nbsp;of
> latency&nbsp;directly&nbsp;on&nbsp;the&nbsp;barrier&nbsp;path.
>
>
> 2.&nbsp;Two&nbsp;coexisting&nbsp;state&nbsp;systems&nbsp;on&nbsp;the&nbsp;JM&nbsp;side
>
>
> The&nbsp;prototype&nbsp;writes&nbsp;aggregated&nbsp;CommitMessages&nbsp;directly&nbsp;through&nbsp;Hadoop
>
> FileSystem&nbsp;to&nbsp;<baseDir&gt;/<operatorId&gt;/checkpoint-{ckId}.state&nbsp;,&nbsp;while
>
> checkpointCoordinator&nbsp;itself&nbsp;returns&nbsp;an&nbsp;empty&nbsp;byte&nbsp;array&nbsp;—&nbsp;i.e.&nbsp;the
>
> Flink-native&nbsp;coordinator&nbsp;state&nbsp;channel&nbsp;is&nbsp;not&nbsp;used&nbsp;for&nbsp;the&nbsp;in-flight
> committables.
>
>
> The&nbsp;deeper&nbsp;concern&nbsp;here&nbsp;is&nbsp;not&nbsp;the&nbsp;HDFS&nbsp;layout&nbsp;itself,&nbsp;but&nbsp;that&nbsp;the
>
> coordinator&nbsp;now&nbsp;lives&nbsp;across&nbsp;two&nbsp;state&nbsp;systems&nbsp;at&nbsp;the&nbsp;same&nbsp;time:&nbsp;Flink's
>
> checkpoint/restore&nbsp;lifecycle&nbsp;on&nbsp;one&nbsp;side,&nbsp;and&nbsp;the&nbsp;self-managed&nbsp;HDFS&nbsp;state
>
> on&nbsp;the&nbsp;other.&nbsp;The&nbsp;two&nbsp;have&nbsp;to&nbsp;stay&nbsp;consistent&nbsp;across&nbsp;every&nbsp;transition&nbsp;—
>
> checkpoint&nbsp;complete,&nbsp;checkpoint&nbsp;abort,&nbsp;JM&nbsp;failover,&nbsp;job&nbsp;restart,&nbsp;full&nbsp;job
>
> cancel&nbsp;—&nbsp;and&nbsp;the&nbsp;consistency&nbsp;contract&nbsp;between&nbsp;them&nbsp;is&nbsp;not&nbsp;obvious&nbsp;from&nbsp;the
>
> code.&nbsp;Any&nbsp;divergence&nbsp;(e.g.&nbsp;a&nbsp;notifyCheckpointAborted&nbsp;racing&nbsp;with&nbsp;writeState
>
> ,&nbsp;or&nbsp;a&nbsp;JM&nbsp;crash&nbsp;between&nbsp;committer.commit&nbsp;and&nbsp;stateManager.deleteState&nbsp;)
>
> becomes&nbsp;a&nbsp;correctness&nbsp;question&nbsp;that&nbsp;the&nbsp;coordinator&nbsp;alone&nbsp;has&nbsp;to&nbsp;reason
> about.
>
> 3.&nbsp;FixedBucketSink&nbsp;scope&nbsp;vs&nbsp;region&nbsp;failover
>
>
> In&nbsp;the&nbsp;current&nbsp;branch,&nbsp;FixedBucketSink&nbsp;is&nbsp;the&nbsp;only&nbsp;sink&nbsp;wired&nbsp;to&nbsp;the
>
> coordinator&nbsp;factory.&nbsp;As&nbsp;far&nbsp;as&nbsp;I&nbsp;can&nbsp;tell,&nbsp;FixedBucketSink&nbsp;requires&nbsp;records
>
> to&nbsp;be&nbsp;hashed&nbsp;by&nbsp;bucket&nbsp;key,&nbsp;so&nbsp;the&nbsp;writer&nbsp;chain&nbsp;has&nbsp;an&nbsp;all-to-all&nbsp;upstream
>
> edge&nbsp;and&nbsp;ends&nbsp;up&nbsp;in&nbsp;a&nbsp;single&nbsp;Flink&nbsp;pipelined&nbsp;region.&nbsp;In&nbsp;that&nbsp;case,&nbsp;even
>
> after&nbsp;replacing&nbsp;CommitterOperator&nbsp;with&nbsp;a&nbsp;coordinator,&nbsp;the&nbsp;writer&nbsp;DAG&nbsp;still
>
> has&nbsp;only&nbsp;one&nbsp;region,&nbsp;so&nbsp;region&nbsp;failover&nbsp;would&nbsp;not&nbsp;be&nbsp;possible&nbsp;for&nbsp;this
>
> sink.&nbsp;Since&nbsp;PIP-30&nbsp;lists&nbsp;region&nbsp;failover&nbsp;as&nbsp;a&nbsp;primary&nbsp;motivation,&nbsp;this
>
> scope&nbsp;choice&nbsp;does&nbsp;not&nbsp;appear&nbsp;to&nbsp;deliver&nbsp;that&nbsp;goal&nbsp;as&nbsp;it&nbsp;stands.
>
> What&nbsp;do&nbsp;you&nbsp;think?
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
>
> On&nbsp;Mon,&nbsp;25&nbsp;May&nbsp;2026&nbsp;at&nbsp;10:07,&nbsp;Jingsong&nbsp;Li&nbsp;<
> [email protected]&gt;&nbsp;wrote:
>
> &gt;&nbsp;Thanks&nbsp;fishfishfishfishaa,
> &gt;
>
> &gt;&nbsp;I&nbsp;think&nbsp;you&nbsp;can&nbsp;just&nbsp;raise&nbsp;a&nbsp;PR&nbsp;to&nbsp;this.
> &gt;
> &gt;&nbsp;Best,
> &gt;&nbsp;Jingsong
> &gt;
>
> &gt;&nbsp;On&nbsp;Sun,&nbsp;May&nbsp;24,&nbsp;2026&nbsp;at&nbsp;11:29
PM&nbsp;俞淦&nbsp;<
> [email protected]&gt;&nbsp;wrote:
> &gt;&nbsp;&gt;
> &gt;&nbsp;&gt;&nbsp;Hi&nbsp;Paimon&nbsp;Community,
> &gt;&nbsp;&gt;
>
> &gt;&nbsp;&gt;&nbsp;Following&nbsp;PIP-30&nbsp;(Improvement&nbsp;For&nbsp;Paimon&nbsp;Committer&nbsp;In&nbsp;Flink)&amp;nbsp;,&nbsp;I
>
> &gt;&nbsp;have&nbsp;completed&nbsp;a&nbsp;prototype&nbsp;implementation&nbsp;of&nbsp;the&nbsp;Paimon&nbsp;Write&nbsp;Coordinator
>
> &gt;&nbsp;(PWC)&amp;nbsp;on&nbsp;my&nbsp;personal&nbsp;branch.&nbsp;The&nbsp;design&nbsp;replaces&nbsp;the&nbsp;current
>
> &gt;&nbsp;CommitOperator&amp;nbsp;with&nbsp;a&nbsp;JobManager-level&nbsp;OperatorCoordinator&amp;nbsp;to
>
> &gt;&nbsp;eliminate&nbsp;the&nbsp;network&nbsp;shuffle&nbsp;bottleneck&nbsp;for&nbsp;commit&nbsp;messages.
> &gt;&nbsp;&gt;
> &gt;&nbsp;&gt;&nbsp;Code&nbsp;Branch:&amp;nbsp;
> &gt;&nbsp;https://github.com/fishfishfishfishaa/paimon/tree/yg-pip3-pwc
> &gt;&nbsp;&gt
> <https://github.com/fishfishfishfishaa/paimon/tree/yg-pip3-pwc&gt;&nbsp;&gt>
> ;
>
> &gt;&nbsp;&gt;&nbsp;Key&nbsp;Design&nbsp;Decision:&nbsp;Custom&nbsp;HDFS&nbsp;State
> &gt;&nbsp;&gt;
>
> &gt;&nbsp;&gt;&nbsp;Instead&nbsp;of&nbsp;using&nbsp;Flink's&nbsp;native&nbsp;StateBackend,&nbsp;I&nbsp;chose&nbsp;custom&nbsp;HDFS&nbsp;state
> &gt;&nbsp;management&nbsp;for&nbsp;the&nbsp;following&nbsp;reasons:
> &gt;&nbsp;&gt;
> &gt;&nbsp;&gt;
>
> &gt;&nbsp;&gt;&nbsp;Timing&nbsp;mismatch:&nbsp;Flink's&nbsp;operator&nbsp;state&nbsp;snapshot&nbsp;occurs&nbsp;after
>
> &gt;&nbsp;snapshotState()&amp;nbsp;returns,&nbsp;but&nbsp;PWC&nbsp;needs&nbsp;to&nbsp;persist&nbsp;aggregated
>
> &gt;&nbsp;CommitMessages&nbsp;before&amp;nbsp;acknowledging&nbsp;WriteOperators&nbsp;to&nbsp;proceed.&nbsp;This
>
> &gt;&nbsp;pre-barrier&nbsp;persistence&nbsp;requirement&nbsp;doesn't&nbsp;align&nbsp;well&nbsp;with&nbsp;Flink's&nbsp;native
> &gt;&nbsp;state&nbsp;lifecycle.
> &gt;&nbsp;&gt;
> &gt;&nbsp;&gt;
> &gt;&nbsp;&gt;
>
> &gt;&nbsp;&gt;&nbsp;Explicit&nbsp;recovery&nbsp;control:&nbsp;Custom&nbsp;state&nbsp;enables&nbsp;a&nbsp;clean
>
> &gt;&nbsp;resetToCheckpoint()&amp;nbsp;logic&nbsp;—&nbsp;scan&nbsp;HDFS,&nbsp;filter&nbsp;by&nbsp;checkpointId&nbsp;<=
>
> &gt;&nbsp;restoredCheckpointId,&nbsp;perform&nbsp;idempotent&nbsp;commit&nbsp;via&nbsp;Paimon's&nbsp;native
> &gt;&nbsp;deduplication,&nbsp;then&nbsp;cleanup.
> &gt;&nbsp;&gt;
> &gt;&nbsp;&gt;
> &gt;&nbsp;&gt;
>
> &gt;&nbsp;&gt;&nbsp;Decoupled&nbsp;cleanup:&nbsp;HDFS&nbsp;state&nbsp;deletion&nbsp;is&nbsp;independent&nbsp;of&nbsp;Flink
>
> &gt;&nbsp;checkpoint&nbsp;retention&nbsp;policies,&nbsp;avoiding&nbsp;potential&nbsp;state&nbsp;bloat.
> &gt;&nbsp;&gt;
> &gt;&nbsp;&gt;
> &gt;&nbsp;&gt;&nbsp;State&nbsp;Storage&nbsp;Path:
>
> &gt;&nbsp;&gt;&nbsp;<flink-checkpoint-dir&amp;gt;/pwc/<operatorId&amp;gt;/checkpoint-{ckId}.state
> &gt;&nbsp;&gt;
> &gt;&nbsp;&gt;&nbsp;Current&nbsp;Scope
> &gt;&nbsp;&gt;
> &gt;&nbsp;&gt;
>
> &gt;&nbsp;&gt;&nbsp;Supported:&nbsp;FixedBucketSink&amp;nbsp;(primary&nbsp;use&nbsp;case&nbsp;for&nbsp;PIP-30)
> &gt;&nbsp;&gt;
> &gt;&nbsp;&gt;
> &gt;&nbsp;&gt;&nbsp;Implementation&nbsp;Highlights&nbsp;(Core&nbsp;Flow)
>
> &gt;&nbsp;&gt;&nbsp;WriteOperator.preBarrier()&nbsp;&nbsp;&nbsp;&amp;nbsp;→&nbsp;send&nbsp;FileInfoEvent&nbsp;to&nbsp;PWC&nbsp;(RPC)
>
> &gt;&nbsp;&amp;nbsp;→&nbsp;PWC&nbsp;aggregates,&nbsp;persists&nbsp;to&nbsp;HDFS,&nbsp;sends&nbsp;ACK&nbsp;&nbsp;&amp;nbsp;→
>
> &gt;&nbsp;WriteOperator.snapshotState()&nbsp;completes&nbsp;(local&nbsp;state&nbsp;only)&nbsp;&nbsp;&amp;nbsp;→&nbsp;Flink
>
> &gt;&nbsp;confirms&nbsp;checkpoint&nbsp;&nbsp;&amp;nbsp;→&nbsp;PWC.notifyCheckpointComplete()&nbsp;→
> &gt;&nbsp;Paimon.commit()&nbsp;→&nbsp;delete&nbsp;HDFS&nbsp;state
> &gt;&nbsp;&gt;
> &gt;&nbsp;&gt;
> &gt;&nbsp;&gt;&nbsp;Recovery&nbsp;Logic:
>
> &gt;&nbsp;&gt;&nbsp;resetToCheckpoint()&amp;nbsp;scans&nbsp;HDFS,&nbsp;handles&nbsp;<=&nbsp;restoredCkId&amp;nbsp;via
>
> &gt;&nbsp;idempotent&nbsp;commit,&nbsp;and&nbsp;treats&nbsp;&amp;gt;&nbsp;restoredCkId&amp;nbsp;as&nbsp;orphan&nbsp;cleanup.
> &gt;&nbsp;&gt;
>
> &gt;&nbsp;&gt;&nbsp;Testing&nbsp;Status&nbsp;&amp;amp;&nbsp;Request&nbsp;for&nbsp;Guidance
> &gt;&nbsp;&gt;
>
> &gt;&nbsp;&gt;&nbsp;Currently,&nbsp;I&nbsp;have&nbsp;added&nbsp;unit&amp;nbsp;tests&nbsp;covering&nbsp;the&nbsp;recovery
>
> &gt;&nbsp;logic&amp;nbsp;of&nbsp;the&nbsp;custom&nbsp;HDFS&nbsp;state.&nbsp;I&nbsp;would&nbsp;appreciate&nbsp;the&nbsp;community's
>
> &gt;&nbsp;advice&nbsp;on&nbsp;what&nbsp;other&nbsp;test&nbsp;cases&nbsp;are&nbsp;essential&nbsp;for&nbsp;this&nbsp;PIP.&amp;nbsp;
> &gt;&nbsp;&gt;
>
> &gt;&nbsp;&gt;&nbsp;Once&nbsp;I&nbsp;receive&nbsp;initial&nbsp;feedback&nbsp;and&nbsp;strengthen&nbsp;the&nbsp;test&nbsp;coverage
>
> &gt;&nbsp;accordingly,&nbsp;I&nbsp;will&nbsp;open&nbsp;a&nbsp;formal&nbsp;Pull&nbsp;Request.
> &gt;&nbsp;&gt;
>
> &gt;&nbsp;&gt;&nbsp;Looking&nbsp;forward&nbsp;to&nbsp;your&nbsp;suggestions!
> &gt;&nbsp;&gt;
> &gt;&nbsp;&gt;&nbsp;Thank&nbsp;you,
> &gt;&nbsp;&gt;&nbsp;fishfishfishfishaa/&nbsp;yugan
> &gt;&nbsp;&gt;
> &gt;&nbsp;&gt;
> &gt;&nbsp;&gt;
> &gt;&nbsp;&gt;
> &gt;&nbsp;&gt;&nbsp;俞淦
> &gt;&nbsp;&gt;&nbsp;[email protected]
> &gt;