Hi Biao,

Thanks for the detailed review and for pointing to the earlier mailing-list 
thread and design doc. I've read both and they provide very useful context. Let 
me address each observation.


1. Synchronous wait on the writer barrier path


Acknowledged. The current sender.snapshot(ck).get() blocking design does 
introduce a new latency source on the critical path.


My rationale was to ensure strict ordering: PWC must persist state before 
writers complete snapshotState(), so that JM failover never loses in-flight 
committables. However, you're right that the current implementation is too 
pessimistic.Potential mitigations I'd like to explore:


(1) PWC sends ACK immediately upon receiving all FileInfoEvents, before HDFS 
hsync. HDFS write proceeds async. Risk: JM crash between ACK and hsync → 
committables lost.
(2) Writers send FileInfo in background during checkpoint interval, not at 
barrier time. Barrier only sends a lightweight "ready" signal. Requires larger 
change to writer side.
(3) Flink-internal API (even unstable) to access the checkpoint storage 
location from                    OperatorCoordinator. This would 
solve both eliminating the need for a custom HDFS state and avoiding the 
additional blocking mechanism.


But I think we may only be able to reduce blocking, not eliminate it entirely.


2. Two coexisting state systems on the JM side


You're absolutely right — this is the deepest design risk in the prototype.
The root cause is a capability gap in Flink's OperatorCoordinator API: 
checkpointCoordinator() returns byte[], but Flink gives the coordinator no 
visibility into where that byte array is stored, when it is durably persisted, 
or how to correlate it with the notifyCheckpointComplete/resetToCheckpoint 
lifecycle.


Specifically:

I cannot call CheckpointStorageAccess.getBaseLocation() (method doesn't exist)


I cannot derive the checkpoint directory from Context (no getJobInformation())


Even if I could, the coordinator state is written after checkpointCoordinator() 
returns, while I need persistence before writers unblock



This forced the self-managed HDFS path. But I agree the resulting dual-state 
consistency is fragile.


  1) Is there a Flink-internal API (even unstable) to access the 
checkpoint storage location from                    
OperatorCoordinator? This would solve both problem 1 and problem 2.
  2) Alternatively, would the community accept a design where PWC state is 
fully embedded in the coordinator's byte[] state, and we accept that 
snapshotState() on writers blocks only until the coordinator's 
checkpointCoordinator() returns (not until HDFS is durable)? The risk is JM 
crash between checkpointCoordinator() completion and Flink's actual state 
persistence — but perhaps that's acceptable if Paimon's commit is idempotent 
and can filter duplicates?
  3) Or should PIP-30 scope be reduced to only address the network-shuffle 
elimination (problem 1), and defer the HA-recovery redesign until Flink's 
coordinator API provides better storage integration?


3. FixedBucketSink scope vs region failover


Valid catch. The current branch wires only FixedBucketSink because that's the 
immediate production need, but you're correct that FixedBucketSink's 
keyBy(bucketKey) creates an all-to-all edge, resulting in a single pipelined 
region. Region failover is not possible in this topology regardless of where 
the committer lives.
Clarification on PIP-30 motivation:


I think the original PIP-30 lists two goals:
(a) Eliminate network shuffle for commit messages (delivered by coordinator RPC)
(b) Enable region failover by decoupling commit from writer region


Goal (a) is fully achieved even for FixedBucketSink — commit messages no longer 
traverse the shuffle.
Goal (b) requires UnawareBucketSink (or a bucket-less hash-distribution mode), 
where writers can be colocated with sources without keyBy. This is not yet 
implemented in the prototype.


Proposed scope adjustment:
A. Keep PIP-30 as "coordinator-based commit for all sinks", but acknowledge 
region failover only applies to UnawareBucketSink (future work)
B. Narrow PIP-30 to "shuffle-free commit for FixedBucketSink", spin out region 
failover to a separate PIP
C. Delay PIP-30 until UnawareBucketSink coordinator integration is ready, so 
both goals land together


I prefer option A if the community is willing to accept incremental delivery, 
or option B if we want stricter scope discipline. What do you think?


Based on this feedback, I see three possible paths:


Iterate on prototype: Implement async HDFS write (problem 1), investigate Flink 
API for unified state (problem 2), keep FixedBucketSink scope with documented 
limitation (problem 3).


Split the PIP: Separate "shuffle-free commit" (smaller, ready soon) from 
"region failover architecture" (larger, needs UnawareBucketSink).


Pause for Flink API evolution: Wait for Flink to expose checkpoint storage to 
coordinators, then redesign with single-state system.


I'd like community guidance on which path to pursue. I'm happy to revise the 
prototype once we have consensus.


Thanks,
yugan




         原始邮件
         
       
发件人:Biao Liu <[email protected]&gt;
发件时间:2026年5月29日 22:15
收件人:dev <[email protected]&gt;
主题:Re: [DISCUSS] PIP-30: Paimon Write Coordinator - Replacing CommitOperator 
with JobManager-level Coordination



       Hi&nbsp;yugan,

Thanks&nbsp;a&nbsp;lot&nbsp;for&nbsp;sharing&nbsp;the&nbsp;prototype&nbsp;and&nbsp;the&nbsp;detailed&nbsp;write-up.&nbsp;The
PIP-30&nbsp;direction&nbsp;is&nbsp;great,&nbsp;and&nbsp;the&nbsp;prototype&nbsp;gives&nbsp;a&nbsp;very&nbsp;concrete&nbsp;starting
point&nbsp;for&nbsp;discussion.

For&nbsp;background,&nbsp;an&nbsp;earlier&nbsp;mailing-list&nbsp;discussion&nbsp;on&nbsp;the&nbsp;same&nbsp;topic&nbsp;is
here:&nbsp;https://lists.apache.org/thread/5n1mwc7wc3cfjogz56w9k5ql3zyl00lw&nbsp;And
a&nbsp;related&nbsp;design&nbsp;doc&nbsp;is&nbsp;here:
https://docs.google.com/document/d/1asWKzoytfeB1D8bS_yRIAHnpR40frLf0fnPn2-WSL74/edit?tab=t.0

Both&nbsp;might&nbsp;be&nbsp;useful&nbsp;as&nbsp;reference&nbsp;for&nbsp;the&nbsp;discussion&nbsp;below.

I&nbsp;have&nbsp;three&nbsp;observations&nbsp;about&nbsp;the&nbsp;current&nbsp;PWC&nbsp;prototype.

1.&nbsp;Synchronous&nbsp;wait&nbsp;on&nbsp;the&nbsp;writer&nbsp;barrier&nbsp;path

In&nbsp;the&nbsp;current&nbsp;flow,&nbsp;after&nbsp;the&nbsp;barrier&nbsp;passes&nbsp;through,&nbsp;each&nbsp;writer
subtask's&nbsp;snapshotState()&nbsp;calls&nbsp;sender.snapshot(checkpointId).get()&nbsp;and
blocks&nbsp;until&nbsp;the&nbsp;coordinator&nbsp;side&nbsp;completes&nbsp;the&nbsp;future.&nbsp;On&nbsp;the&nbsp;coordinator
side,&nbsp;that&nbsp;future&nbsp;is&nbsp;only&nbsp;signalled&nbsp;after&nbsp;all&nbsp;subtasks&nbsp;have&nbsp;sent&nbsp;their
FileInfoEvent&nbsp;,&nbsp;the&nbsp;coordinator&nbsp;has&nbsp;aggregated&nbsp;the&nbsp;committables,&nbsp;and
PwcStateManager.writeState&nbsp;has&nbsp;finished&nbsp;its&nbsp;HDFS&nbsp;hsync&nbsp;+&nbsp;rename&nbsp;.&nbsp;The
coordinator&nbsp;runs&nbsp;all&nbsp;of&nbsp;this&nbsp;on&nbsp;a&nbsp;single-thread&nbsp;event&nbsp;loop.

PIP-30&nbsp;was&nbsp;opened&nbsp;mainly&nbsp;to&nbsp;help&nbsp;large-scale&nbsp;Flink-to-Paimon&nbsp;write&nbsp;jobs,
where&nbsp;checkpoint&nbsp;duration&nbsp;under&nbsp;high&nbsp;parallelism&nbsp;is&nbsp;already&nbsp;sensitive.&nbsp;With
this&nbsp;design,&nbsp;every&nbsp;subtask's&nbsp;checkpoint&nbsp;additionally&nbsp;has&nbsp;to&nbsp;wait&nbsp;on&nbsp;(a)&nbsp;the
slowest-arriving&nbsp;subtask,&nbsp;(b)&nbsp;one&nbsp;HDFS&nbsp;hsync&nbsp;on&nbsp;the&nbsp;JM,&nbsp;and&nbsp;(c)&nbsp;the&nbsp;JM's
single-thread&nbsp;event&nbsp;loop&nbsp;draining&nbsp;N&nbsp;events.&nbsp;This&nbsp;adds&nbsp;a&nbsp;new&nbsp;source&nbsp;of
latency&nbsp;directly&nbsp;on&nbsp;the&nbsp;barrier&nbsp;path.

2.&nbsp;Two&nbsp;coexisting&nbsp;state&nbsp;systems&nbsp;on&nbsp;the&nbsp;JM&nbsp;side

The&nbsp;prototype&nbsp;writes&nbsp;aggregated&nbsp;CommitMessages&nbsp;directly&nbsp;through&nbsp;Hadoop
FileSystem&nbsp;to&nbsp;<baseDir&gt;/<operatorId&gt;/checkpoint-{ckId}.state&nbsp;,&nbsp;while
checkpointCoordinator&nbsp;itself&nbsp;returns&nbsp;an&nbsp;empty&nbsp;byte&nbsp;array&nbsp;—&nbsp;i.e.&nbsp;the
Flink-native&nbsp;coordinator&nbsp;state&nbsp;channel&nbsp;is&nbsp;not&nbsp;used&nbsp;for&nbsp;the&nbsp;in-flight
committables.

The&nbsp;deeper&nbsp;concern&nbsp;here&nbsp;is&nbsp;not&nbsp;the&nbsp;HDFS&nbsp;layout&nbsp;itself,&nbsp;but&nbsp;that&nbsp;the
coordinator&nbsp;now&nbsp;lives&nbsp;across&nbsp;two&nbsp;state&nbsp;systems&nbsp;at&nbsp;the&nbsp;same&nbsp;time:&nbsp;Flink's
checkpoint/restore&nbsp;lifecycle&nbsp;on&nbsp;one&nbsp;side,&nbsp;and&nbsp;the&nbsp;self-managed&nbsp;HDFS&nbsp;state
on&nbsp;the&nbsp;other.&nbsp;The&nbsp;two&nbsp;have&nbsp;to&nbsp;stay&nbsp;consistent&nbsp;across&nbsp;every&nbsp;transition&nbsp;—
checkpoint&nbsp;complete,&nbsp;checkpoint&nbsp;abort,&nbsp;JM&nbsp;failover,&nbsp;job&nbsp;restart,&nbsp;full&nbsp;job
cancel&nbsp;—&nbsp;and&nbsp;the&nbsp;consistency&nbsp;contract&nbsp;between&nbsp;them&nbsp;is&nbsp;not&nbsp;obvious&nbsp;from&nbsp;the
code.&nbsp;Any&nbsp;divergence&nbsp;(e.g.&nbsp;a&nbsp;notifyCheckpointAborted&nbsp;racing&nbsp;with&nbsp;writeState
,&nbsp;or&nbsp;a&nbsp;JM&nbsp;crash&nbsp;between&nbsp;committer.commit&nbsp;and&nbsp;stateManager.deleteState&nbsp;)
becomes&nbsp;a&nbsp;correctness&nbsp;question&nbsp;that&nbsp;the&nbsp;coordinator&nbsp;alone&nbsp;has&nbsp;to&nbsp;reason
about.

3.&nbsp;FixedBucketSink&nbsp;scope&nbsp;vs&nbsp;region&nbsp;failover

In&nbsp;the&nbsp;current&nbsp;branch,&nbsp;FixedBucketSink&nbsp;is&nbsp;the&nbsp;only&nbsp;sink&nbsp;wired&nbsp;to&nbsp;the
coordinator&nbsp;factory.&nbsp;As&nbsp;far&nbsp;as&nbsp;I&nbsp;can&nbsp;tell,&nbsp;FixedBucketSink&nbsp;requires&nbsp;records
to&nbsp;be&nbsp;hashed&nbsp;by&nbsp;bucket&nbsp;key,&nbsp;so&nbsp;the&nbsp;writer&nbsp;chain&nbsp;has&nbsp;an&nbsp;all-to-all&nbsp;upstream
edge&nbsp;and&nbsp;ends&nbsp;up&nbsp;in&nbsp;a&nbsp;single&nbsp;Flink&nbsp;pipelined&nbsp;region.&nbsp;In&nbsp;that&nbsp;case,&nbsp;even
after&nbsp;replacing&nbsp;CommitterOperator&nbsp;with&nbsp;a&nbsp;coordinator,&nbsp;the&nbsp;writer&nbsp;DAG&nbsp;still
has&nbsp;only&nbsp;one&nbsp;region,&nbsp;so&nbsp;region&nbsp;failover&nbsp;would&nbsp;not&nbsp;be&nbsp;possible&nbsp;for&nbsp;this
sink.&nbsp;Since&nbsp;PIP-30&nbsp;lists&nbsp;region&nbsp;failover&nbsp;as&nbsp;a&nbsp;primary&nbsp;motivation,&nbsp;this
scope&nbsp;choice&nbsp;does&nbsp;not&nbsp;appear&nbsp;to&nbsp;deliver&nbsp;that&nbsp;goal&nbsp;as&nbsp;it&nbsp;stands.

What&nbsp;do&nbsp;you&nbsp;think?

Thanks,
Biao /'bɪ.aʊ/



On&nbsp;Mon,&nbsp;25&nbsp;May&nbsp;2026&nbsp;at&nbsp;10:07,&nbsp;Jingsong&nbsp;Li&nbsp;<[email protected]&gt;&nbsp;wrote:

&gt;&nbsp;Thanks&nbsp;fishfishfishfishaa,
&gt;
&gt;&nbsp;I&nbsp;think&nbsp;you&nbsp;can&nbsp;just&nbsp;raise&nbsp;a&nbsp;PR&nbsp;to&nbsp;this.
&gt;
&gt;&nbsp;Best,
&gt;&nbsp;Jingsong
&gt;
&gt;&nbsp;On&nbsp;Sun,&nbsp;May&nbsp;24,&nbsp;2026&nbsp;at&nbsp;11:29 
PM&nbsp;俞淦&nbsp;<[email protected]&gt;&nbsp;wrote:
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;Hi&nbsp;Paimon&nbsp;Community,
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;Following&nbsp;PIP-30&nbsp;(Improvement&nbsp;For&nbsp;Paimon&nbsp;Committer&nbsp;In&nbsp;Flink)&amp;nbsp;,&nbsp;I
&gt;&nbsp;have&nbsp;completed&nbsp;a&nbsp;prototype&nbsp;implementation&nbsp;of&nbsp;the&nbsp;Paimon&nbsp;Write&nbsp;Coordinator
&gt;&nbsp;(PWC)&amp;nbsp;on&nbsp;my&nbsp;personal&nbsp;branch.&nbsp;The&nbsp;design&nbsp;replaces&nbsp;the&nbsp;current
&gt;&nbsp;CommitOperator&amp;nbsp;with&nbsp;a&nbsp;JobManager-level&nbsp;OperatorCoordinator&amp;nbsp;to
&gt;&nbsp;eliminate&nbsp;the&nbsp;network&nbsp;shuffle&nbsp;bottleneck&nbsp;for&nbsp;commit&nbsp;messages.
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;Code&nbsp;Branch:&amp;nbsp;
&gt;&nbsp;https://github.com/fishfishfishfishaa/paimon/tree/yg-pip3-pwc
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;Key&nbsp;Design&nbsp;Decision:&nbsp;Custom&nbsp;HDFS&nbsp;State
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;Instead&nbsp;of&nbsp;using&nbsp;Flink's&nbsp;native&nbsp;StateBackend,&nbsp;I&nbsp;chose&nbsp;custom&nbsp;HDFS&nbsp;state
&gt;&nbsp;management&nbsp;for&nbsp;the&nbsp;following&nbsp;reasons:
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;Timing&nbsp;mismatch:&nbsp;Flink's&nbsp;operator&nbsp;state&nbsp;snapshot&nbsp;occurs&nbsp;after
&gt;&nbsp;snapshotState()&amp;nbsp;returns,&nbsp;but&nbsp;PWC&nbsp;needs&nbsp;to&nbsp;persist&nbsp;aggregated
&gt;&nbsp;CommitMessages&nbsp;before&amp;nbsp;acknowledging&nbsp;WriteOperators&nbsp;to&nbsp;proceed.&nbsp;This
&gt;&nbsp;pre-barrier&nbsp;persistence&nbsp;requirement&nbsp;doesn't&nbsp;align&nbsp;well&nbsp;with&nbsp;Flink's&nbsp;native
&gt;&nbsp;state&nbsp;lifecycle.
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;Explicit&nbsp;recovery&nbsp;control:&nbsp;Custom&nbsp;state&nbsp;enables&nbsp;a&nbsp;clean
&gt;&nbsp;resetToCheckpoint()&amp;nbsp;logic&nbsp;—&nbsp;scan&nbsp;HDFS,&nbsp;filter&nbsp;by&nbsp;checkpointId&nbsp;<=
&gt;&nbsp;restoredCheckpointId,&nbsp;perform&nbsp;idempotent&nbsp;commit&nbsp;via&nbsp;Paimon's&nbsp;native
&gt;&nbsp;deduplication,&nbsp;then&nbsp;cleanup.
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;Decoupled&nbsp;cleanup:&nbsp;HDFS&nbsp;state&nbsp;deletion&nbsp;is&nbsp;independent&nbsp;of&nbsp;Flink
&gt;&nbsp;checkpoint&nbsp;retention&nbsp;policies,&nbsp;avoiding&nbsp;potential&nbsp;state&nbsp;bloat.
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;State&nbsp;Storage&nbsp;Path:
&gt;&nbsp;&gt;&nbsp;<flink-checkpoint-dir&amp;gt;/pwc/<operatorId&amp;gt;/checkpoint-{ckId}.state
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;Current&nbsp;Scope
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;Supported:&nbsp;FixedBucketSink&amp;nbsp;(primary&nbsp;use&nbsp;case&nbsp;for&nbsp;PIP-30)
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;Implementation&nbsp;Highlights&nbsp;(Core&nbsp;Flow)
&gt;&nbsp;&gt;&nbsp;WriteOperator.preBarrier()&nbsp;&nbsp;&nbsp;&amp;nbsp;→&nbsp;send&nbsp;FileInfoEvent&nbsp;to&nbsp;PWC&nbsp;(RPC)
&gt;&nbsp;&amp;nbsp;→&nbsp;PWC&nbsp;aggregates,&nbsp;persists&nbsp;to&nbsp;HDFS,&nbsp;sends&nbsp;ACK&nbsp;&nbsp;&amp;nbsp;→
&gt;&nbsp;WriteOperator.snapshotState()&nbsp;completes&nbsp;(local&nbsp;state&nbsp;only)&nbsp;&nbsp;&amp;nbsp;→&nbsp;Flink
&gt;&nbsp;confirms&nbsp;checkpoint&nbsp;&nbsp;&amp;nbsp;→&nbsp;PWC.notifyCheckpointComplete()&nbsp;→
&gt;&nbsp;Paimon.commit()&nbsp;→&nbsp;delete&nbsp;HDFS&nbsp;state
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;Recovery&nbsp;Logic:
&gt;&nbsp;&gt;&nbsp;resetToCheckpoint()&amp;nbsp;scans&nbsp;HDFS,&nbsp;handles&nbsp;<=&nbsp;restoredCkId&amp;nbsp;via
&gt;&nbsp;idempotent&nbsp;commit,&nbsp;and&nbsp;treats&nbsp;&amp;gt;&nbsp;restoredCkId&amp;nbsp;as&nbsp;orphan&nbsp;cleanup.
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;Testing&nbsp;Status&nbsp;&amp;amp;&nbsp;Request&nbsp;for&nbsp;Guidance
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;Currently,&nbsp;I&nbsp;have&nbsp;added&nbsp;unit&amp;nbsp;tests&nbsp;covering&nbsp;the&nbsp;recovery
&gt;&nbsp;logic&amp;nbsp;of&nbsp;the&nbsp;custom&nbsp;HDFS&nbsp;state.&nbsp;I&nbsp;would&nbsp;appreciate&nbsp;the&nbsp;community's
&gt;&nbsp;advice&nbsp;on&nbsp;what&nbsp;other&nbsp;test&nbsp;cases&nbsp;are&nbsp;essential&nbsp;for&nbsp;this&nbsp;PIP.&amp;nbsp;
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;Once&nbsp;I&nbsp;receive&nbsp;initial&nbsp;feedback&nbsp;and&nbsp;strengthen&nbsp;the&nbsp;test&nbsp;coverage
&gt;&nbsp;accordingly,&nbsp;I&nbsp;will&nbsp;open&nbsp;a&nbsp;formal&nbsp;Pull&nbsp;Request.
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;Looking&nbsp;forward&nbsp;to&nbsp;your&nbsp;suggestions!
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;Thank&nbsp;you,
&gt;&nbsp;&gt;&nbsp;fishfishfishfishaa/&nbsp;yugan
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;俞淦
&gt;&nbsp;&gt;&nbsp;[email protected]
&gt;

Reply via email to