Hi Biao,

Thanks for the follow-up and for pointing me back to the Google Doc sections. 
I've re-read the highlighted parts on writer-side native state persistence, and 
you're absolutely right — that design elegantly resolves both (1) and (2):



No synchronous HDFS write on the barrier path: The FileInfoEvent becomes 
fire-and-forget after snapshotState(), since committables are already durably 
captured in Flink's native operator state.




Single state system: The coordinator only tracks lightweight metadata 
(commitUser, watermark, listeners) through 
checkpointCoordinator/resetToCheckpoint, eliminating the dual-state consistency 
concern entirely.




Resolved Flink state persistence timing: By moving committable ownership to the 
writer side's ListState, the coordinator no longer needs to reason about 
pre-barrier durability — Flink's native checkpoint mechanism handles this 
transparently, and the timing mismatch between coordinator state snapshot and 
committable persistence simply doesn't arise.



I've also prototyped a local revision aligning with this approach — moving 
committable persistence to WriteOperator's ListState<CommitMessage&gt; and 
simplifying PWC to pure coordination logic. The barrier latency improvement is 
significant.
Regarding your proposal to upstream the internal implementation: please do open 
the GitHub issue. I'd be very interested to review the production code and 
compare notes against my branch. Having concrete code as the anchor will 
definitely sharpen the discussion, especially on the UnawareBucketSink + region 
failover path.


On the FixedBucketSink scope question — I now agree with your assessment. If 
region failover is the primary motivation, including FixedBucketSink risks 
scope dilution and future rewrite. I'm happy to refocus PIP-30 strictly on 
UnawareBucketSink.
Looking forward to the issue and the code review!


Best,
yugan


         原始邮件
         
       
发件人:Biao Liu <[email protected]&gt;
发件时间:2026年6月8日 21:28
收件人:dev <[email protected]&gt;
主题:Re: [DISCUSS] PIP-30: Paimon Write Coordinator - Replacing CommitOperator 
with JobManager-level Coordination



       Hi&nbsp;yugan,

Thanks&nbsp;for&nbsp;the&nbsp;thoughtful&nbsp;response.&nbsp;A&nbsp;few&nbsp;replies&nbsp;inline.

On&nbsp;observation&nbsp;1&nbsp;(synchronous&nbsp;wait)&nbsp;and&nbsp;observation&nbsp;2&nbsp;(two&nbsp;coexisting&nbsp;state
systems)

Both&nbsp;of&nbsp;these&nbsp;are&nbsp;addressed&nbsp;in&nbsp;the&nbsp;design&nbsp;discussed&nbsp;in&nbsp;the&nbsp;earlier
mailing-list&nbsp;thread&nbsp;/&nbsp;Google&nbsp;Doc&nbsp;I&nbsp;shared&nbsp;(yellow-highlighted&nbsp;sections;&nbsp;the
doc&nbsp;is&nbsp;on&nbsp;Google&nbsp;Docs&nbsp;because&nbsp;we&nbsp;ran&nbsp;into&nbsp;image-editing&nbsp;issues&nbsp;updating&nbsp;the
original&nbsp;PIP-30&nbsp;on&nbsp;cwiki).&nbsp;The&nbsp;key&nbsp;point:&nbsp;in-flight&nbsp;committables&nbsp;are
persisted&nbsp;on&nbsp;the&nbsp;writer&nbsp;side&nbsp;through&nbsp;Flink's&nbsp;native&nbsp;operator&nbsp;state,&nbsp;not&nbsp;on
the&nbsp;coordinator.

With&nbsp;that,&nbsp;the&nbsp;coordinator&nbsp;does&nbsp;not&nbsp;need&nbsp;to&nbsp;durably&nbsp;persist&nbsp;CommitMessages
before&nbsp;writers&nbsp;unblock&nbsp;—&nbsp;the&nbsp;handoff&nbsp;is&nbsp;a&nbsp;fire-and-forget&nbsp;event&nbsp;after
snapshotState()&nbsp;,&nbsp;no&nbsp;synchronous&nbsp;HDFS&nbsp;write&nbsp;on&nbsp;the&nbsp;barrier&nbsp;path.&nbsp;The
coordinator&nbsp;itself&nbsp;only&nbsp;owns&nbsp;a&nbsp;small&nbsp;amount&nbsp;of&nbsp;state&nbsp;(commitUser,
watermark,&nbsp;listener&nbsp;state),&nbsp;which&nbsp;fits&nbsp;the&nbsp;Flink-native
checkpointCoordinator&nbsp;/&nbsp;resetToCheckpoint&nbsp;channel.&nbsp;One&nbsp;state&nbsp;system&nbsp;on&nbsp;the
JM&nbsp;side,&nbsp;so&nbsp;the&nbsp;dual-state&nbsp;consistency&nbsp;concern&nbsp;doesn't&nbsp;arise.

Worth&nbsp;re-reading&nbsp;those&nbsp;sections&nbsp;—&nbsp;I&nbsp;think&nbsp;it&nbsp;covers&nbsp;(1)&nbsp;and&nbsp;(2)&nbsp;without
needing&nbsp;the&nbsp;Flink-internal&nbsp;API&nbsp;workarounds&nbsp;you&nbsp;mentioned.

On&nbsp;observation&nbsp;3&nbsp;(FixedBucketSink&nbsp;scope)

Two&nbsp;concerns.

First,&nbsp;splitting&nbsp;PIP-30&nbsp;along&nbsp;sink-type&nbsp;lines&nbsp;is&nbsp;risky.&nbsp;FixedBucketSink&nbsp;and
UnawareBucketSink&nbsp;share&nbsp;the&nbsp;same&nbsp;coordinator&nbsp;implementation&nbsp;and&nbsp;are&nbsp;tightly
coupled&nbsp;at&nbsp;the&nbsp;implementation&nbsp;level.&nbsp;Landing&nbsp;"shuffle-free&nbsp;commit&nbsp;for
FixedBucketSink"&nbsp;first&nbsp;will&nbsp;likely&nbsp;force&nbsp;a&nbsp;rewrite&nbsp;of&nbsp;the&nbsp;core&nbsp;design&nbsp;when
UnawareBucketSink&nbsp;+&nbsp;region&nbsp;failover&nbsp;is&nbsp;added&nbsp;later,&nbsp;rather&nbsp;than&nbsp;a&nbsp;clean
extension.

Second,&nbsp;I'm&nbsp;not&nbsp;sure&nbsp;FixedBucketSink&nbsp;should&nbsp;be&nbsp;in&nbsp;PIP-30&nbsp;at&nbsp;all.&nbsp;Our
internal&nbsp;implementation&nbsp;of&nbsp;this&nbsp;design&nbsp;—&nbsp;which&nbsp;is&nbsp;already&nbsp;deployed&nbsp;in
production&nbsp;—&nbsp;doesn't&nbsp;wire&nbsp;up&nbsp;FixedBucketSink&nbsp;,&nbsp;precisely&nbsp;because&nbsp;the
coordinator&nbsp;commit&nbsp;doesn't&nbsp;solve&nbsp;region&nbsp;failover&nbsp;there&nbsp;(single-region
topology&nbsp;regardless).&nbsp;If&nbsp;region&nbsp;failover&nbsp;is&nbsp;the&nbsp;primary&nbsp;motivation&nbsp;of
PIP-30,&nbsp;including&nbsp;a&nbsp;sink&nbsp;that&nbsp;can't&nbsp;benefit&nbsp;from&nbsp;it&nbsp;seems&nbsp;to&nbsp;dilute&nbsp;the
scope.

Proposal

If&nbsp;you&nbsp;don't&nbsp;mind,&nbsp;I'd&nbsp;like&nbsp;to&nbsp;open&nbsp;a&nbsp;GitHub&nbsp;issue&nbsp;to&nbsp;upstream&nbsp;our&nbsp;internal
implementation&nbsp;(the&nbsp;design&nbsp;itself&nbsp;is&nbsp;in&nbsp;the&nbsp;Google&nbsp;Doc),&nbsp;so&nbsp;the&nbsp;discussion
has&nbsp;concrete&nbsp;code&nbsp;to&nbsp;anchor&nbsp;on&nbsp;alongside&nbsp;your&nbsp;prototype&nbsp;branch.&nbsp;Happy&nbsp;to
have&nbsp;you&nbsp;participate&nbsp;and&nbsp;check&nbsp;whether&nbsp;it&nbsp;addresses&nbsp;the&nbsp;three&nbsp;points.

What&nbsp;do&nbsp;you&nbsp;think?

Thanks,
Biao&nbsp;/'bɪ.aʊ/



On&nbsp;Thu,&nbsp;4&nbsp;Jun&nbsp;2026&nbsp;at&nbsp;23:32,&nbsp;俞淦&nbsp;<[email protected]&gt;&nbsp;wrote:

&gt;&nbsp;Hi&nbsp;Biao,
&gt;
&gt;
&gt;&nbsp;Thanks&nbsp;for&nbsp;the&nbsp;detailed&nbsp;review&nbsp;and&nbsp;for&nbsp;pointing&nbsp;to&nbsp;the&nbsp;earlier
&gt;&nbsp;mailing-list&nbsp;thread&nbsp;and&nbsp;design&nbsp;doc.&nbsp;I've&nbsp;read&nbsp;both&nbsp;and&nbsp;they&nbsp;provide&nbsp;very
&gt;&nbsp;useful&nbsp;context.&nbsp;Let&nbsp;me&nbsp;address&nbsp;each&nbsp;observation.
&gt;
&gt;
&gt;&nbsp;1.&nbsp;Synchronous&nbsp;wait&nbsp;on&nbsp;the&nbsp;writer&nbsp;barrier&nbsp;path
&gt;
&gt;
&gt;&nbsp;Acknowledged.&nbsp;The&nbsp;current&nbsp;sender.snapshot(ck).get()&nbsp;blocking&nbsp;design&nbsp;does
&gt;&nbsp;introduce&nbsp;a&nbsp;new&nbsp;latency&nbsp;source&nbsp;on&nbsp;the&nbsp;critical&nbsp;path.
&gt;
&gt;
&gt;&nbsp;My&nbsp;rationale&nbsp;was&nbsp;to&nbsp;ensure&nbsp;strict&nbsp;ordering:&nbsp;PWC&nbsp;must&nbsp;persist&nbsp;state&nbsp;before
&gt;&nbsp;writers&nbsp;complete&nbsp;snapshotState(),&nbsp;so&nbsp;that&nbsp;JM&nbsp;failover&nbsp;never&nbsp;loses&nbsp;in-flight
&gt;&nbsp;committables.&nbsp;However,&nbsp;you're&nbsp;right&nbsp;that&nbsp;the&nbsp;current&nbsp;implementation&nbsp;is&nbsp;too
&gt;&nbsp;pessimistic.Potential&nbsp;mitigations&nbsp;I'd&nbsp;like&nbsp;to&nbsp;explore:
&gt;
&gt;
&gt;&nbsp;(1)&nbsp;PWC&nbsp;sends&nbsp;ACK&nbsp;immediately&nbsp;upon&nbsp;receiving&nbsp;all&nbsp;FileInfoEvents,&nbsp;before
&gt;&nbsp;HDFS&nbsp;hsync.&nbsp;HDFS&nbsp;write&nbsp;proceeds&nbsp;async.&nbsp;Risk:&nbsp;JM&nbsp;crash&nbsp;between&nbsp;ACK&nbsp;and&nbsp;hsync
&gt;&nbsp;→&nbsp;committables&nbsp;lost.
&gt;&nbsp;(2)&nbsp;Writers&nbsp;send&nbsp;FileInfo&nbsp;in&nbsp;background&nbsp;during&nbsp;checkpoint&nbsp;interval,&nbsp;not&nbsp;at
&gt;&nbsp;barrier&nbsp;time.&nbsp;Barrier&nbsp;only&nbsp;sends&nbsp;a&nbsp;lightweight&nbsp;"ready"&nbsp;signal.&nbsp;Requires
&gt;&nbsp;larger&nbsp;change&nbsp;to&nbsp;writer&nbsp;side.
&gt;&nbsp;(3)&nbsp;Flink-internal&nbsp;API&nbsp;(even&nbsp;unstable)&nbsp;to&nbsp;access&nbsp;the&nbsp;checkpoint&nbsp;storage
&gt;&nbsp;location&nbsp;from&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;OperatorCoordinator.&nbsp;This&nbsp;would&nbsp;solve&nbsp;both
&gt;&nbsp;eliminating&nbsp;the&nbsp;need&nbsp;for&nbsp;a&nbsp;custom&nbsp;HDFS&nbsp;state&nbsp;and&nbsp;avoiding&nbsp;the&nbsp;additional
&gt;&nbsp;blocking&nbsp;mechanism.
&gt;
&gt;
&gt;&nbsp;But&nbsp;I&nbsp;think&nbsp;we&nbsp;may&nbsp;only&nbsp;be&nbsp;able&nbsp;to&nbsp;reduce&nbsp;blocking,&nbsp;not&nbsp;eliminate&nbsp;it
&gt;&nbsp;entirely.
&gt;
&gt;
&gt;&nbsp;2.&nbsp;Two&nbsp;coexisting&nbsp;state&nbsp;systems&nbsp;on&nbsp;the&nbsp;JM&nbsp;side
&gt;
&gt;
&gt;&nbsp;You're&nbsp;absolutely&nbsp;right&nbsp;—&nbsp;this&nbsp;is&nbsp;the&nbsp;deepest&nbsp;design&nbsp;risk&nbsp;in&nbsp;the&nbsp;prototype.
&gt;&nbsp;The&nbsp;root&nbsp;cause&nbsp;is&nbsp;a&nbsp;capability&nbsp;gap&nbsp;in&nbsp;Flink's&nbsp;OperatorCoordinator&nbsp;API:
&gt;&nbsp;checkpointCoordinator()&nbsp;returns&nbsp;byte[],&nbsp;but&nbsp;Flink&nbsp;gives&nbsp;the&nbsp;coordinator&nbsp;no
&gt;&nbsp;visibility&nbsp;into&nbsp;where&nbsp;that&nbsp;byte&nbsp;array&nbsp;is&nbsp;stored,&nbsp;when&nbsp;it&nbsp;is&nbsp;durably
&gt;&nbsp;persisted,&nbsp;or&nbsp;how&nbsp;to&nbsp;correlate&nbsp;it&nbsp;with&nbsp;the
&gt;&nbsp;notifyCheckpointComplete/resetToCheckpoint&nbsp;lifecycle.
&gt;
&gt;
&gt;&nbsp;Specifically:
&gt;
&gt;&nbsp;I&nbsp;cannot&nbsp;call&nbsp;CheckpointStorageAccess.getBaseLocation()&nbsp;(method&nbsp;doesn't
&gt;&nbsp;exist)
&gt;
&gt;
&gt;&nbsp;I&nbsp;cannot&nbsp;derive&nbsp;the&nbsp;checkpoint&nbsp;directory&nbsp;from&nbsp;Context&nbsp;(no
&gt;&nbsp;getJobInformation())
&gt;
&gt;
&gt;&nbsp;Even&nbsp;if&nbsp;I&nbsp;could,&nbsp;the&nbsp;coordinator&nbsp;state&nbsp;is&nbsp;written&nbsp;after
&gt;&nbsp;checkpointCoordinator()&nbsp;returns,&nbsp;while&nbsp;I&nbsp;need&nbsp;persistence&nbsp;before&nbsp;writers
&gt;&nbsp;unblock
&gt;
&gt;
&gt;
&gt;&nbsp;This&nbsp;forced&nbsp;the&nbsp;self-managed&nbsp;HDFS&nbsp;path.&nbsp;But&nbsp;I&nbsp;agree&nbsp;the&nbsp;resulting
&gt;&nbsp;dual-state&nbsp;consistency&nbsp;is&nbsp;fragile.
&gt;
&gt;
&gt;&nbsp;&amp;nbsp;&nbsp;1)&nbsp;Is&nbsp;there&nbsp;a&nbsp;Flink-internal&nbsp;API&nbsp;(even&nbsp;unstable)&nbsp;to&nbsp;access&nbsp;the
&gt;&nbsp;checkpoint&nbsp;storage&nbsp;location&nbsp;from&amp;nbsp;&nbsp;&amp;nbsp;&nbsp;OperatorCoordinator?&nbsp;This
&gt;&nbsp;would&nbsp;solve&nbsp;both&nbsp;problem&nbsp;1&nbsp;and&nbsp;problem&nbsp;2.
&gt;&nbsp;&amp;nbsp;&nbsp;2)&nbsp;Alternatively,&nbsp;would&nbsp;the&nbsp;community&nbsp;accept&nbsp;a&nbsp;design&nbsp;where&nbsp;PWC
&gt;&nbsp;state&nbsp;is&nbsp;fully&nbsp;embedded&nbsp;in&nbsp;the&nbsp;coordinator's&nbsp;byte[]&nbsp;state,&nbsp;and&nbsp;we&nbsp;accept
&gt;&nbsp;that&nbsp;snapshotState()&nbsp;on&nbsp;writers&nbsp;blocks&nbsp;only&nbsp;until&nbsp;the&nbsp;coordinator's
&gt;&nbsp;checkpointCoordinator()&nbsp;returns&nbsp;(not&nbsp;until&nbsp;HDFS&nbsp;is&nbsp;durable)?&nbsp;The&nbsp;risk&nbsp;is&nbsp;JM
&gt;&nbsp;crash&nbsp;between&nbsp;checkpointCoordinator()&nbsp;completion&nbsp;and&nbsp;Flink's&nbsp;actual&nbsp;state
&gt;&nbsp;persistence&nbsp;—&nbsp;but&nbsp;perhaps&nbsp;that's&nbsp;acceptable&nbsp;if&nbsp;Paimon's&nbsp;commit&nbsp;is
&gt;&nbsp;idempotent&nbsp;and&nbsp;can&nbsp;filter&nbsp;duplicates?
&gt;&nbsp;&amp;nbsp;&nbsp;3)&nbsp;Or&nbsp;should&nbsp;PIP-30&nbsp;scope&nbsp;be&nbsp;reduced&nbsp;to&nbsp;only&nbsp;address&nbsp;the
&gt;&nbsp;network-shuffle&nbsp;elimination&nbsp;(problem&nbsp;1),&nbsp;and&nbsp;defer&nbsp;the&nbsp;HA-recovery&nbsp;redesign
&gt;&nbsp;until&nbsp;Flink's&nbsp;coordinator&nbsp;API&nbsp;provides&nbsp;better&nbsp;storage&nbsp;integration?
&gt;
&gt;
&gt;&nbsp;3.&nbsp;FixedBucketSink&nbsp;scope&nbsp;vs&nbsp;region&nbsp;failover
&gt;
&gt;
&gt;&nbsp;Valid&nbsp;catch.&nbsp;The&nbsp;current&nbsp;branch&nbsp;wires&nbsp;only&nbsp;FixedBucketSink&nbsp;because&nbsp;that's
&gt;&nbsp;the&nbsp;immediate&nbsp;production&nbsp;need,&nbsp;but&nbsp;you're&nbsp;correct&nbsp;that&nbsp;FixedBucketSink's
&gt;&nbsp;keyBy(bucketKey)&nbsp;creates&nbsp;an&nbsp;all-to-all&nbsp;edge,&nbsp;resulting&nbsp;in&nbsp;a&nbsp;single
&gt;&nbsp;pipelined&nbsp;region.&nbsp;Region&nbsp;failover&nbsp;is&nbsp;not&nbsp;possible&nbsp;in&nbsp;this&nbsp;topology
&gt;&nbsp;regardless&nbsp;of&nbsp;where&nbsp;the&nbsp;committer&nbsp;lives.
&gt;&nbsp;Clarification&nbsp;on&nbsp;PIP-30&nbsp;motivation:
&gt;
&gt;
&gt;&nbsp;I&nbsp;think&nbsp;the&nbsp;original&nbsp;PIP-30&nbsp;lists&nbsp;two&nbsp;goals:
&gt;&nbsp;(a)&nbsp;Eliminate&nbsp;network&nbsp;shuffle&nbsp;for&nbsp;commit&nbsp;messages&nbsp;(delivered&nbsp;by
&gt;&nbsp;coordinator&nbsp;RPC)
&gt;&nbsp;(b)&nbsp;Enable&nbsp;region&nbsp;failover&nbsp;by&nbsp;decoupling&nbsp;commit&nbsp;from&nbsp;writer&nbsp;region
&gt;
&gt;
&gt;&nbsp;Goal&nbsp;(a)&nbsp;is&nbsp;fully&nbsp;achieved&nbsp;even&nbsp;for&nbsp;FixedBucketSink&nbsp;—&nbsp;commit&nbsp;messages&nbsp;no
&gt;&nbsp;longer&nbsp;traverse&nbsp;the&nbsp;shuffle.
&gt;&nbsp;Goal&nbsp;(b)&nbsp;requires&nbsp;UnawareBucketSink&nbsp;(or&nbsp;a&nbsp;bucket-less&nbsp;hash-distribution
&gt;&nbsp;mode),&nbsp;where&nbsp;writers&nbsp;can&nbsp;be&nbsp;colocated&nbsp;with&nbsp;sources&nbsp;without&nbsp;keyBy.&nbsp;This&nbsp;is
&gt;&nbsp;not&nbsp;yet&nbsp;implemented&nbsp;in&nbsp;the&nbsp;prototype.
&gt;
&gt;
&gt;&nbsp;Proposed&nbsp;scope&nbsp;adjustment:
&gt;&nbsp;A.&nbsp;Keep&nbsp;PIP-30&nbsp;as&nbsp;"coordinator-based&nbsp;commit&nbsp;for&nbsp;all&nbsp;sinks",&nbsp;but
&gt;&nbsp;acknowledge&nbsp;region&nbsp;failover&nbsp;only&nbsp;applies&nbsp;to&nbsp;UnawareBucketSink&nbsp;(future&nbsp;work)
&gt;&nbsp;B.&nbsp;Narrow&nbsp;PIP-30&nbsp;to&nbsp;"shuffle-free&nbsp;commit&nbsp;for&nbsp;FixedBucketSink",&nbsp;spin&nbsp;out
&gt;&nbsp;region&nbsp;failover&nbsp;to&nbsp;a&nbsp;separate&nbsp;PIP
&gt;&nbsp;C.&nbsp;Delay&nbsp;PIP-30&nbsp;until&nbsp;UnawareBucketSink&nbsp;coordinator&nbsp;integration&nbsp;is&nbsp;ready,
&gt;&nbsp;so&nbsp;both&nbsp;goals&nbsp;land&nbsp;together
&gt;
&gt;
&gt;&nbsp;I&nbsp;prefer&nbsp;option&nbsp;A&nbsp;if&nbsp;the&nbsp;community&nbsp;is&nbsp;willing&nbsp;to&nbsp;accept&nbsp;incremental
&gt;&nbsp;delivery,&nbsp;or&nbsp;option&nbsp;B&nbsp;if&nbsp;we&nbsp;want&nbsp;stricter&nbsp;scope&nbsp;discipline.&nbsp;What&nbsp;do&nbsp;you
&gt;&nbsp;think?
&gt;
&gt;
&gt;&nbsp;Based&nbsp;on&nbsp;this&nbsp;feedback,&nbsp;I&nbsp;see&nbsp;three&nbsp;possible&nbsp;paths:
&gt;
&gt;
&gt;&nbsp;Iterate&nbsp;on&nbsp;prototype:&nbsp;Implement&nbsp;async&nbsp;HDFS&nbsp;write&nbsp;(problem&nbsp;1),&nbsp;investigate
&gt;&nbsp;Flink&nbsp;API&nbsp;for&nbsp;unified&nbsp;state&nbsp;(problem&nbsp;2),&nbsp;keep&nbsp;FixedBucketSink&nbsp;scope&nbsp;with
&gt;&nbsp;documented&nbsp;limitation&nbsp;(problem&nbsp;3).
&gt;
&gt;
&gt;&nbsp;Split&nbsp;the&nbsp;PIP:&nbsp;Separate&nbsp;"shuffle-free&nbsp;commit"&nbsp;(smaller,&nbsp;ready&nbsp;soon)&nbsp;from
&gt;&nbsp;"region&nbsp;failover&nbsp;architecture"&nbsp;(larger,&nbsp;needs&nbsp;UnawareBucketSink).
&gt;
&gt;
&gt;&nbsp;Pause&nbsp;for&nbsp;Flink&nbsp;API&nbsp;evolution:&nbsp;Wait&nbsp;for&nbsp;Flink&nbsp;to&nbsp;expose&nbsp;checkpoint&nbsp;storage
&gt;&nbsp;to&nbsp;coordinators,&nbsp;then&nbsp;redesign&nbsp;with&nbsp;single-state&nbsp;system.
&gt;
&gt;
&gt;&nbsp;I'd&nbsp;like&nbsp;community&nbsp;guidance&nbsp;on&nbsp;which&nbsp;path&nbsp;to&nbsp;pursue.&nbsp;I'm&nbsp;happy&nbsp;to&nbsp;revise
&gt;&nbsp;the&nbsp;prototype&nbsp;once&nbsp;we&nbsp;have&nbsp;consensus.
&gt;
&gt;
&gt;&nbsp;Thanks,
&gt;&nbsp;yugan
&gt;
&gt;
&gt;
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;原始邮件
&gt;
&gt;
&gt;&nbsp;发件人:Biao&nbsp;Liu&nbsp;<[email protected]&amp;gt;
&gt;&nbsp;发件时间:2026年5月29日&nbsp;22:15
&gt;&nbsp;收件人:dev&nbsp;<[email protected]&amp;gt;
&gt;&nbsp;主题:Re:&nbsp;[DISCUSS]&nbsp;PIP-30:&nbsp;Paimon&nbsp;Write&nbsp;Coordinator&nbsp;-&nbsp;Replacing
&gt;&nbsp;CommitOperator&nbsp;with&nbsp;JobManager-level&nbsp;Coordination
&gt;
&gt;
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Hi&amp;nbsp;yugan,
&gt;
&gt;
&gt;&nbsp;Thanks&amp;nbsp;a&amp;nbsp;lot&amp;nbsp;for&amp;nbsp;sharing&amp;nbsp;the&amp;nbsp;prototype&amp;nbsp;and&amp;nbsp;the&amp;nbsp;detailed&amp;nbsp;write-up.&amp;nbsp;The
&gt;
&gt;&nbsp;PIP-30&amp;nbsp;direction&amp;nbsp;is&amp;nbsp;great,&amp;nbsp;and&amp;nbsp;the&amp;nbsp;prototype&amp;nbsp;gives&amp;nbsp;a&amp;nbsp;very&amp;nbsp;concrete&amp;nbsp;starting
&gt;&nbsp;point&amp;nbsp;for&amp;nbsp;discussion.
&gt;
&gt;
&gt;&nbsp;For&amp;nbsp;background,&amp;nbsp;an&amp;nbsp;earlier&amp;nbsp;mailing-list&amp;nbsp;discussion&amp;nbsp;on&amp;nbsp;the&amp;nbsp;same&amp;nbsp;topic&amp;nbsp;is
&gt;&nbsp;here:&amp;nbsp;
&gt;&nbsp;https://lists.apache.org/thread/5n1mwc7wc3cfjogz56w9k5ql3zyl00lw&amp;nbsp;And
&gt;&nbsp;a&amp;nbsp;related&amp;nbsp;design&amp;nbsp;doc&amp;nbsp;is&amp;nbsp;here
&gt;&nbsp;<https://lists.apache.org/thread/5n1mwc7wc3cfjogz56w9k5ql3zyl00lw&amp;nbsp;Anda&amp;nbsp;related&amp;nbsp;design&amp;nbsp;doc&amp;nbsp;is&amp;nbsp;here&gt;
&gt;&nbsp;:
&gt;
&gt;&nbsp;https://docs.google.com/document/d/1asWKzoytfeB1D8bS_yRIAHnpR40frLf0fnPn2-WSL74/edit?tab=t.0
&gt;
&gt;
&gt;&nbsp;Both&amp;nbsp;might&amp;nbsp;be&amp;nbsp;useful&amp;nbsp;as&amp;nbsp;reference&amp;nbsp;for&amp;nbsp;the&amp;nbsp;discussion&amp;nbsp;below.
&gt;
&gt;
&gt;&nbsp;I&amp;nbsp;have&amp;nbsp;three&amp;nbsp;observations&amp;nbsp;about&amp;nbsp;the&amp;nbsp;current&amp;nbsp;PWC&amp;nbsp;prototype.
&gt;
&gt;
&gt;&nbsp;1.&amp;nbsp;Synchronous&amp;nbsp;wait&amp;nbsp;on&amp;nbsp;the&amp;nbsp;writer&amp;nbsp;barrier&amp;nbsp;path
&gt;
&gt;
&gt;&nbsp;In&amp;nbsp;the&amp;nbsp;current&amp;nbsp;flow,&amp;nbsp;after&amp;nbsp;the&amp;nbsp;barrier&amp;nbsp;passes&amp;nbsp;through,&amp;nbsp;each&amp;nbsp;writer
&gt;
&gt;&nbsp;subtask's&amp;nbsp;snapshotState()&amp;nbsp;calls&amp;nbsp;sender.snapshot(checkpointId).get()&amp;nbsp;and
&gt;
&gt;&nbsp;blocks&amp;nbsp;until&amp;nbsp;the&amp;nbsp;coordinator&amp;nbsp;side&amp;nbsp;completes&amp;nbsp;the&amp;nbsp;future.&amp;nbsp;On&amp;nbsp;the&amp;nbsp;coordinator
&gt;
&gt;&nbsp;side,&amp;nbsp;that&amp;nbsp;future&amp;nbsp;is&amp;nbsp;only&amp;nbsp;signalled&amp;nbsp;after&amp;nbsp;all&amp;nbsp;subtasks&amp;nbsp;have&amp;nbsp;sent&amp;nbsp;their
&gt;
&gt;&nbsp;FileInfoEvent&amp;nbsp;,&amp;nbsp;the&amp;nbsp;coordinator&amp;nbsp;has&amp;nbsp;aggregated&amp;nbsp;the&amp;nbsp;committables,&amp;nbsp;and
&gt;
&gt;&nbsp;PwcStateManager.writeState&amp;nbsp;has&amp;nbsp;finished&amp;nbsp;its&amp;nbsp;HDFS&amp;nbsp;hsync&amp;nbsp;+&amp;nbsp;rename&amp;nbsp;.&amp;nbsp;The
&gt;
&gt;&nbsp;coordinator&amp;nbsp;runs&amp;nbsp;all&amp;nbsp;of&amp;nbsp;this&amp;nbsp;on&amp;nbsp;a&amp;nbsp;single-thread&amp;nbsp;event&amp;nbsp;loop.
&gt;
&gt;
&gt;&nbsp;PIP-30&amp;nbsp;was&amp;nbsp;opened&amp;nbsp;mainly&amp;nbsp;to&amp;nbsp;help&amp;nbsp;large-scale&amp;nbsp;Flink-to-Paimon&amp;nbsp;write&amp;nbsp;jobs,
&gt;
&gt;&nbsp;where&amp;nbsp;checkpoint&amp;nbsp;duration&amp;nbsp;under&amp;nbsp;high&amp;nbsp;parallelism&amp;nbsp;is&amp;nbsp;already&amp;nbsp;sensitive.&amp;nbsp;With
&gt;
&gt;&nbsp;this&amp;nbsp;design,&amp;nbsp;every&amp;nbsp;subtask's&amp;nbsp;checkpoint&amp;nbsp;additionally&amp;nbsp;has&amp;nbsp;to&amp;nbsp;wait&amp;nbsp;on&amp;nbsp;(a)&amp;nbsp;the
&gt;
&gt;&nbsp;slowest-arriving&amp;nbsp;subtask,&amp;nbsp;(b)&amp;nbsp;one&amp;nbsp;HDFS&amp;nbsp;hsync&amp;nbsp;on&amp;nbsp;the&amp;nbsp;JM,&amp;nbsp;and&amp;nbsp;(c)&amp;nbsp;the&amp;nbsp;JM's
&gt;
&gt;&nbsp;single-thread&amp;nbsp;event&amp;nbsp;loop&amp;nbsp;draining&amp;nbsp;N&amp;nbsp;events.&amp;nbsp;This&amp;nbsp;adds&amp;nbsp;a&amp;nbsp;new&amp;nbsp;source&amp;nbsp;of
&gt;&nbsp;latency&amp;nbsp;directly&amp;nbsp;on&amp;nbsp;the&amp;nbsp;barrier&amp;nbsp;path.
&gt;
&gt;
&gt;&nbsp;2.&amp;nbsp;Two&amp;nbsp;coexisting&amp;nbsp;state&amp;nbsp;systems&amp;nbsp;on&amp;nbsp;the&amp;nbsp;JM&amp;nbsp;side
&gt;
&gt;
&gt;&nbsp;The&amp;nbsp;prototype&amp;nbsp;writes&amp;nbsp;aggregated&amp;nbsp;CommitMessages&amp;nbsp;directly&amp;nbsp;through&amp;nbsp;Hadoop
&gt;
&gt;&nbsp;FileSystem&amp;nbsp;to&amp;nbsp;<baseDir&amp;gt;/<operatorId&amp;gt;/checkpoint-{ckId}.state&amp;nbsp;,&amp;nbsp;while
&gt;
&gt;&nbsp;checkpointCoordinator&amp;nbsp;itself&amp;nbsp;returns&amp;nbsp;an&amp;nbsp;empty&amp;nbsp;byte&amp;nbsp;array&amp;nbsp;—&amp;nbsp;i.e.&amp;nbsp;the
&gt;
&gt;&nbsp;Flink-native&amp;nbsp;coordinator&amp;nbsp;state&amp;nbsp;channel&amp;nbsp;is&amp;nbsp;not&amp;nbsp;used&amp;nbsp;for&amp;nbsp;the&amp;nbsp;in-flight
&gt;&nbsp;committables.
&gt;
&gt;
&gt;&nbsp;The&amp;nbsp;deeper&amp;nbsp;concern&amp;nbsp;here&amp;nbsp;is&amp;nbsp;not&amp;nbsp;the&amp;nbsp;HDFS&amp;nbsp;layout&amp;nbsp;itself,&amp;nbsp;but&amp;nbsp;that&amp;nbsp;the
&gt;
&gt;&nbsp;coordinator&amp;nbsp;now&amp;nbsp;lives&amp;nbsp;across&amp;nbsp;two&amp;nbsp;state&amp;nbsp;systems&amp;nbsp;at&amp;nbsp;the&amp;nbsp;same&amp;nbsp;time:&amp;nbsp;Flink's
&gt;
&gt;&nbsp;checkpoint/restore&amp;nbsp;lifecycle&amp;nbsp;on&amp;nbsp;one&amp;nbsp;side,&amp;nbsp;and&amp;nbsp;the&amp;nbsp;self-managed&amp;nbsp;HDFS&amp;nbsp;state
&gt;
&gt;&nbsp;on&amp;nbsp;the&amp;nbsp;other.&amp;nbsp;The&amp;nbsp;two&amp;nbsp;have&amp;nbsp;to&amp;nbsp;stay&amp;nbsp;consistent&amp;nbsp;across&amp;nbsp;every&amp;nbsp;transition&amp;nbsp;—
&gt;
&gt;&nbsp;checkpoint&amp;nbsp;complete,&amp;nbsp;checkpoint&amp;nbsp;abort,&amp;nbsp;JM&amp;nbsp;failover,&amp;nbsp;job&amp;nbsp;restart,&amp;nbsp;full&amp;nbsp;job
&gt;
&gt;&nbsp;cancel&amp;nbsp;—&amp;nbsp;and&amp;nbsp;the&amp;nbsp;consistency&amp;nbsp;contract&amp;nbsp;between&amp;nbsp;them&amp;nbsp;is&amp;nbsp;not&amp;nbsp;obvious&amp;nbsp;from&amp;nbsp;the
&gt;
&gt;&nbsp;code.&amp;nbsp;Any&amp;nbsp;divergence&amp;nbsp;(e.g.&amp;nbsp;a&amp;nbsp;notifyCheckpointAborted&amp;nbsp;racing&amp;nbsp;with&amp;nbsp;writeState
&gt;
&gt;&nbsp;,&amp;nbsp;or&amp;nbsp;a&amp;nbsp;JM&amp;nbsp;crash&amp;nbsp;between&amp;nbsp;committer.commit&amp;nbsp;and&amp;nbsp;stateManager.deleteState&amp;nbsp;)
&gt;
&gt;&nbsp;becomes&amp;nbsp;a&amp;nbsp;correctness&amp;nbsp;question&amp;nbsp;that&amp;nbsp;the&amp;nbsp;coordinator&amp;nbsp;alone&amp;nbsp;has&amp;nbsp;to&amp;nbsp;reason
&gt;&nbsp;about.
&gt;
&gt;&nbsp;3.&amp;nbsp;FixedBucketSink&amp;nbsp;scope&amp;nbsp;vs&amp;nbsp;region&amp;nbsp;failover
&gt;
&gt;
&gt;&nbsp;In&amp;nbsp;the&amp;nbsp;current&amp;nbsp;branch,&amp;nbsp;FixedBucketSink&amp;nbsp;is&amp;nbsp;the&amp;nbsp;only&amp;nbsp;sink&amp;nbsp;wired&amp;nbsp;to&amp;nbsp;the
&gt;
&gt;&nbsp;coordinator&amp;nbsp;factory.&amp;nbsp;As&amp;nbsp;far&amp;nbsp;as&amp;nbsp;I&amp;nbsp;can&amp;nbsp;tell,&amp;nbsp;FixedBucketSink&amp;nbsp;requires&amp;nbsp;records
&gt;
&gt;&nbsp;to&amp;nbsp;be&amp;nbsp;hashed&amp;nbsp;by&amp;nbsp;bucket&amp;nbsp;key,&amp;nbsp;so&amp;nbsp;the&amp;nbsp;writer&amp;nbsp;chain&amp;nbsp;has&amp;nbsp;an&amp;nbsp;all-to-all&amp;nbsp;upstream
&gt;
&gt;&nbsp;edge&amp;nbsp;and&amp;nbsp;ends&amp;nbsp;up&amp;nbsp;in&amp;nbsp;a&amp;nbsp;single&amp;nbsp;Flink&amp;nbsp;pipelined&amp;nbsp;region.&amp;nbsp;In&amp;nbsp;that&amp;nbsp;case,&amp;nbsp;even
&gt;
&gt;&nbsp;after&amp;nbsp;replacing&amp;nbsp;CommitterOperator&amp;nbsp;with&amp;nbsp;a&amp;nbsp;coordinator,&amp;nbsp;the&amp;nbsp;writer&amp;nbsp;DAG&amp;nbsp;still
&gt;
&gt;&nbsp;has&amp;nbsp;only&amp;nbsp;one&amp;nbsp;region,&amp;nbsp;so&amp;nbsp;region&amp;nbsp;failover&amp;nbsp;would&amp;nbsp;not&amp;nbsp;be&amp;nbsp;possible&amp;nbsp;for&amp;nbsp;this
&gt;
&gt;&nbsp;sink.&amp;nbsp;Since&amp;nbsp;PIP-30&amp;nbsp;lists&amp;nbsp;region&amp;nbsp;failover&amp;nbsp;as&amp;nbsp;a&amp;nbsp;primary&amp;nbsp;motivation,&amp;nbsp;this
&gt;
&gt;&nbsp;scope&amp;nbsp;choice&amp;nbsp;does&amp;nbsp;not&amp;nbsp;appear&amp;nbsp;to&amp;nbsp;deliver&amp;nbsp;that&amp;nbsp;goal&amp;nbsp;as&amp;nbsp;it&amp;nbsp;stands.
&gt;
&gt;&nbsp;What&amp;nbsp;do&amp;nbsp;you&amp;nbsp;think?
&gt;
&gt;&nbsp;Thanks,
&gt;&nbsp;Biao&nbsp;/'bɪ.aʊ/
&gt;
&gt;
&gt;
&gt;
&gt;&nbsp;On&amp;nbsp;Mon,&amp;nbsp;25&amp;nbsp;May&amp;nbsp;2026&amp;nbsp;at&amp;nbsp;10:07,&amp;nbsp;Jingsong&amp;nbsp;Li&amp;nbsp;<
&gt;&nbsp;[email protected]&amp;gt;&amp;nbsp;wrote:
&gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;Thanks&amp;nbsp;fishfishfishfishaa,
&gt;&nbsp;&amp;gt;
&gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;I&amp;nbsp;think&amp;nbsp;you&amp;nbsp;can&amp;nbsp;just&amp;nbsp;raise&amp;nbsp;a&amp;nbsp;PR&amp;nbsp;to&amp;nbsp;this.
&gt;&nbsp;&amp;gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;Best,
&gt;&nbsp;&amp;gt;&amp;nbsp;Jingsong
&gt;&nbsp;&amp;gt;
&gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;On&amp;nbsp;Sun,&amp;nbsp;May&amp;nbsp;24,&amp;nbsp;2026&amp;nbsp;at&amp;nbsp;11:29
 PM&amp;nbsp;俞淦&amp;nbsp;<
&gt;&nbsp;[email protected]&amp;gt;&amp;nbsp;wrote:
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;&amp;nbsp;Hi&amp;nbsp;Paimon&amp;nbsp;Community,
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;
&gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;&amp;nbsp;Following&amp;nbsp;PIP-30&amp;nbsp;(Improvement&amp;nbsp;For&amp;nbsp;Paimon&amp;nbsp;Committer&amp;nbsp;In&amp;nbsp;Flink)&amp;amp;nbsp;,&amp;nbsp;I
&gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;have&amp;nbsp;completed&amp;nbsp;a&amp;nbsp;prototype&amp;nbsp;implementation&amp;nbsp;of&amp;nbsp;the&amp;nbsp;Paimon&amp;nbsp;Write&amp;nbsp;Coordinator
&gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;(PWC)&amp;amp;nbsp;on&amp;nbsp;my&amp;nbsp;personal&amp;nbsp;branch.&amp;nbsp;The&amp;nbsp;design&amp;nbsp;replaces&amp;nbsp;the&amp;nbsp;current
&gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;CommitOperator&amp;amp;nbsp;with&amp;nbsp;a&amp;nbsp;JobManager-level&amp;nbsp;OperatorCoordinator&amp;amp;nbsp;to
&gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;eliminate&amp;nbsp;the&amp;nbsp;network&amp;nbsp;shuffle&amp;nbsp;bottleneck&amp;nbsp;for&amp;nbsp;commit&amp;nbsp;messages.
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;&amp;nbsp;Code&amp;nbsp;Branch:&amp;amp;nbsp;
&gt;&nbsp;&amp;gt;&amp;nbsp;https://github.com/fishfishfishfishaa/paimon/tree/yg-pip3-pwc
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt
&gt;&nbsp;<https://github.com/fishfishfishfishaa/paimon/tree/yg-pip3-pwc&amp;gt;&amp;nbsp;&amp;gt&gt;
&gt;&nbsp;;
&gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;&amp;nbsp;Key&amp;nbsp;Design&amp;nbsp;Decision:&amp;nbsp;Custom&amp;nbsp;HDFS&amp;nbsp;State
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;
&gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;&amp;nbsp;Instead&amp;nbsp;of&amp;nbsp;using&amp;nbsp;Flink's&amp;nbsp;native&amp;nbsp;StateBackend,&amp;nbsp;I&amp;nbsp;chose&amp;nbsp;custom&amp;nbsp;HDFS&amp;nbsp;state
&gt;&nbsp;&amp;gt;&amp;nbsp;management&amp;nbsp;for&amp;nbsp;the&amp;nbsp;following&amp;nbsp;reasons:
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;
&gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;&amp;nbsp;Timing&amp;nbsp;mismatch:&amp;nbsp;Flink's&amp;nbsp;operator&amp;nbsp;state&amp;nbsp;snapshot&amp;nbsp;occurs&amp;nbsp;after
&gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;snapshotState()&amp;amp;nbsp;returns,&amp;nbsp;but&amp;nbsp;PWC&amp;nbsp;needs&amp;nbsp;to&amp;nbsp;persist&amp;nbsp;aggregated
&gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;CommitMessages&amp;nbsp;before&amp;amp;nbsp;acknowledging&amp;nbsp;WriteOperators&amp;nbsp;to&amp;nbsp;proceed.&amp;nbsp;This
&gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;pre-barrier&amp;nbsp;persistence&amp;nbsp;requirement&amp;nbsp;doesn't&amp;nbsp;align&amp;nbsp;well&amp;nbsp;with&amp;nbsp;Flink's&amp;nbsp;native
&gt;&nbsp;&amp;gt;&amp;nbsp;state&amp;nbsp;lifecycle.
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;
&gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;&amp;nbsp;Explicit&amp;nbsp;recovery&amp;nbsp;control:&amp;nbsp;Custom&amp;nbsp;state&amp;nbsp;enables&amp;nbsp;a&amp;nbsp;clean
&gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;resetToCheckpoint()&amp;amp;nbsp;logic&amp;nbsp;—&amp;nbsp;scan&amp;nbsp;HDFS,&amp;nbsp;filter&amp;nbsp;by&amp;nbsp;checkpointId&amp;nbsp;<=
&gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;restoredCheckpointId,&amp;nbsp;perform&amp;nbsp;idempotent&amp;nbsp;commit&amp;nbsp;via&amp;nbsp;Paimon's&amp;nbsp;native
&gt;&nbsp;&amp;gt;&amp;nbsp;deduplication,&amp;nbsp;then&amp;nbsp;cleanup.
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;
&gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;&amp;nbsp;Decoupled&amp;nbsp;cleanup:&amp;nbsp;HDFS&amp;nbsp;state&amp;nbsp;deletion&amp;nbsp;is&amp;nbsp;independent&amp;nbsp;of&amp;nbsp;Flink
&gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;checkpoint&amp;nbsp;retention&amp;nbsp;policies,&amp;nbsp;avoiding&amp;nbsp;potential&amp;nbsp;state&amp;nbsp;bloat.
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;&amp;nbsp;State&amp;nbsp;Storage&amp;nbsp;Path:
&gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;&amp;nbsp;<flink-checkpoint-dir&amp;amp;gt;/pwc/<operatorId&amp;amp;gt;/checkpoint-{ckId}.state
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;&amp;nbsp;Current&amp;nbsp;Scope
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;
&gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;&amp;nbsp;Supported:&amp;nbsp;FixedBucketSink&amp;amp;nbsp;(primary&amp;nbsp;use&amp;nbsp;case&amp;nbsp;for&amp;nbsp;PIP-30)
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;&amp;nbsp;Implementation&amp;nbsp;Highlights&amp;nbsp;(Core&amp;nbsp;Flow)
&gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;&amp;nbsp;WriteOperator.preBarrier()&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;amp;nbsp;→&amp;nbsp;send&amp;nbsp;FileInfoEvent&amp;nbsp;to&amp;nbsp;PWC&amp;nbsp;(RPC)
&gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;amp;nbsp;→&amp;nbsp;PWC&amp;nbsp;aggregates,&amp;nbsp;persists&amp;nbsp;to&amp;nbsp;HDFS,&amp;nbsp;sends&amp;nbsp;ACK&amp;nbsp;&amp;nbsp;&amp;amp;nbsp;→
&gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;WriteOperator.snapshotState()&amp;nbsp;completes&amp;nbsp;(local&amp;nbsp;state&amp;nbsp;only)&amp;nbsp;&amp;nbsp;&amp;amp;nbsp;→&amp;nbsp;Flink
&gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;confirms&amp;nbsp;checkpoint&amp;nbsp;&amp;nbsp;&amp;amp;nbsp;→&amp;nbsp;PWC.notifyCheckpointComplete()&amp;nbsp;→
&gt;&nbsp;&amp;gt;&amp;nbsp;Paimon.commit()&amp;nbsp;→&amp;nbsp;delete&amp;nbsp;HDFS&amp;nbsp;state
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;&amp;nbsp;Recovery&amp;nbsp;Logic:
&gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;&amp;nbsp;resetToCheckpoint()&amp;amp;nbsp;scans&amp;nbsp;HDFS,&amp;nbsp;handles&amp;nbsp;<=&amp;nbsp;restoredCkId&amp;amp;nbsp;via
&gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;idempotent&amp;nbsp;commit,&amp;nbsp;and&amp;nbsp;treats&amp;nbsp;&amp;amp;gt;&amp;nbsp;restoredCkId&amp;amp;nbsp;as&amp;nbsp;orphan&amp;nbsp;cleanup.
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;
&gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;&amp;nbsp;Testing&amp;nbsp;Status&amp;nbsp;&amp;amp;amp;&amp;nbsp;Request&amp;nbsp;for&amp;nbsp;Guidance
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;
&gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;&amp;nbsp;Currently,&amp;nbsp;I&amp;nbsp;have&amp;nbsp;added&amp;nbsp;unit&amp;amp;nbsp;tests&amp;nbsp;covering&amp;nbsp;the&amp;nbsp;recovery
&gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;logic&amp;amp;nbsp;of&amp;nbsp;the&amp;nbsp;custom&amp;nbsp;HDFS&amp;nbsp;state.&amp;nbsp;I&amp;nbsp;would&amp;nbsp;appreciate&amp;nbsp;the&amp;nbsp;community's
&gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;advice&amp;nbsp;on&amp;nbsp;what&amp;nbsp;other&amp;nbsp;test&amp;nbsp;cases&amp;nbsp;are&amp;nbsp;essential&amp;nbsp;for&amp;nbsp;this&amp;nbsp;PIP.&amp;amp;nbsp;
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;
&gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;&amp;nbsp;Once&amp;nbsp;I&amp;nbsp;receive&amp;nbsp;initial&amp;nbsp;feedback&amp;nbsp;and&amp;nbsp;strengthen&amp;nbsp;the&amp;nbsp;test&amp;nbsp;coverage
&gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;accordingly,&amp;nbsp;I&amp;nbsp;will&amp;nbsp;open&amp;nbsp;a&amp;nbsp;formal&amp;nbsp;Pull&amp;nbsp;Request.
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;
&gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;&amp;nbsp;Looking&amp;nbsp;forward&amp;nbsp;to&amp;nbsp;your&amp;nbsp;suggestions!
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;&amp;nbsp;Thank&amp;nbsp;you,
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;&amp;nbsp;fishfishfishfishaa/&amp;nbsp;yugan
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;&amp;nbsp;俞淦
&gt;&nbsp;&amp;gt;&amp;nbsp;&amp;gt;&amp;nbsp;[email protected]
&gt;&nbsp;&amp;gt;

Reply via email to