Hi Biao,
Thanks for the detailed review and for pointing to the earlier mailing-list
thread and design doc. I've read both and they provide very useful context. Let
me address each observation.
1. Synchronous wait on the writer barrier path
Acknowledged. The current sender.snapshot(ck).get() blocking design does
introduce a new latency source on the critical path.
My rationale was to ensure strict ordering: PWC must persist state before
writers complete snapshotState(), so that JM failover never loses in-flight
committables. However, you're right that the current implementation is too
pessimistic.Potential mitigations I'd like to explore:
(1) PWC sends ACK immediately upon receiving all FileInfoEvents, before HDFS
hsync. HDFS write proceeds async. Risk: JM crash between ACK and hsync →
committables lost.
(2) Writers send FileInfo in background during checkpoint interval, not at
barrier time. Barrier only sends a lightweight "ready" signal. Requires larger
change to writer side.
(3) Flink-internal API (even unstable) to access the checkpoint storage
location from OperatorCoordinator. This would
solve both eliminating the need for a custom HDFS state and avoiding the
additional blocking mechanism.
But I think we may only be able to reduce blocking, not eliminate it entirely.
2. Two coexisting state systems on the JM side
You're absolutely right — this is the deepest design risk in the prototype.
The root cause is a capability gap in Flink's OperatorCoordinator API:
checkpointCoordinator() returns byte[], but Flink gives the coordinator no
visibility into where that byte array is stored, when it is durably persisted,
or how to correlate it with the notifyCheckpointComplete/resetToCheckpoint
lifecycle.
Specifically:
I cannot call CheckpointStorageAccess.getBaseLocation() (method doesn't exist)
I cannot derive the checkpoint directory from Context (no getJobInformation())
Even if I could, the coordinator state is written after checkpointCoordinator()
returns, while I need persistence before writers unblock
This forced the self-managed HDFS path. But I agree the resulting dual-state
consistency is fragile.
1) Is there a Flink-internal API (even unstable) to access the
checkpoint storage location from
OperatorCoordinator? This would solve both problem 1 and problem 2.
2) Alternatively, would the community accept a design where PWC state is
fully embedded in the coordinator's byte[] state, and we accept that
snapshotState() on writers blocks only until the coordinator's
checkpointCoordinator() returns (not until HDFS is durable)? The risk is JM
crash between checkpointCoordinator() completion and Flink's actual state
persistence — but perhaps that's acceptable if Paimon's commit is idempotent
and can filter duplicates?
3) Or should PIP-30 scope be reduced to only address the network-shuffle
elimination (problem 1), and defer the HA-recovery redesign until Flink's
coordinator API provides better storage integration?
3. FixedBucketSink scope vs region failover
Valid catch. The current branch wires only FixedBucketSink because that's the
immediate production need, but you're correct that FixedBucketSink's
keyBy(bucketKey) creates an all-to-all edge, resulting in a single pipelined
region. Region failover is not possible in this topology regardless of where
the committer lives.
Clarification on PIP-30 motivation:
I think the original PIP-30 lists two goals:
(a) Eliminate network shuffle for commit messages (delivered by coordinator RPC)
(b) Enable region failover by decoupling commit from writer region
Goal (a) is fully achieved even for FixedBucketSink — commit messages no longer
traverse the shuffle.
Goal (b) requires UnawareBucketSink (or a bucket-less hash-distribution mode),
where writers can be colocated with sources without keyBy. This is not yet
implemented in the prototype.
Proposed scope adjustment:
A. Keep PIP-30 as "coordinator-based commit for all sinks", but acknowledge
region failover only applies to UnawareBucketSink (future work)
B. Narrow PIP-30 to "shuffle-free commit for FixedBucketSink", spin out region
failover to a separate PIP
C. Delay PIP-30 until UnawareBucketSink coordinator integration is ready, so
both goals land together
I prefer option A if the community is willing to accept incremental delivery,
or option B if we want stricter scope discipline. What do you think?
Based on this feedback, I see three possible paths:
Iterate on prototype: Implement async HDFS write (problem 1), investigate Flink
API for unified state (problem 2), keep FixedBucketSink scope with documented
limitation (problem 3).
Split the PIP: Separate "shuffle-free commit" (smaller, ready soon) from
"region failover architecture" (larger, needs UnawareBucketSink).
Pause for Flink API evolution: Wait for Flink to expose checkpoint storage to
coordinators, then redesign with single-state system.
I'd like community guidance on which path to pursue. I'm happy to revise the
prototype once we have consensus.
Thanks,
yugan
原始邮件
发件人:Biao Liu <[email protected]>
发件时间:2026年5月29日 22:15
收件人:dev <[email protected]>
主题:Re: [DISCUSS] PIP-30: Paimon Write Coordinator - Replacing CommitOperator
with JobManager-level Coordination
Hi yugan,
Thanks a lot for sharing the prototype and the detailed write-up. The
PIP-30 direction is great, and the prototype gives a very concrete starting
point for discussion.
For background, an earlier mailing-list discussion on the same topic is
here: https://lists.apache.org/thread/5n1mwc7wc3cfjogz56w9k5ql3zyl00lw And
a related design doc is here:
https://docs.google.com/document/d/1asWKzoytfeB1D8bS_yRIAHnpR40frLf0fnPn2-WSL74/edit?tab=t.0
Both might be useful as reference for the discussion below.
I have three observations about the current PWC prototype.
1. Synchronous wait on the writer barrier path
In the current flow, after the barrier passes through, each writer
subtask's snapshotState() calls sender.snapshot(checkpointId).get() and
blocks until the coordinator side completes the future. On the coordinator
side, that future is only signalled after all subtasks have sent their
FileInfoEvent , the coordinator has aggregated the committables, and
PwcStateManager.writeState has finished its HDFS hsync + rename . The
coordinator runs all of this on a single-thread event loop.
PIP-30 was opened mainly to help large-scale Flink-to-Paimon write jobs,
where checkpoint duration under high parallelism is already sensitive. With
this design, every subtask's checkpoint additionally has to wait on (a) the
slowest-arriving subtask, (b) one HDFS hsync on the JM, and (c) the JM's
single-thread event loop draining N events. This adds a new source of
latency directly on the barrier path.
2. Two coexisting state systems on the JM side
The prototype writes aggregated CommitMessages directly through Hadoop
FileSystem to <baseDir>/<operatorId>/checkpoint-{ckId}.state , while
checkpointCoordinator itself returns an empty byte array — i.e. the
Flink-native coordinator state channel is not used for the in-flight
committables.
The deeper concern here is not the HDFS layout itself, but that the
coordinator now lives across two state systems at the same time: Flink's
checkpoint/restore lifecycle on one side, and the self-managed HDFS state
on the other. The two have to stay consistent across every transition —
checkpoint complete, checkpoint abort, JM failover, job restart, full job
cancel — and the consistency contract between them is not obvious from the
code. Any divergence (e.g. a notifyCheckpointAborted racing with writeState
, or a JM crash between committer.commit and stateManager.deleteState )
becomes a correctness question that the coordinator alone has to reason
about.
3. FixedBucketSink scope vs region failover
In the current branch, FixedBucketSink is the only sink wired to the
coordinator factory. As far as I can tell, FixedBucketSink requires records
to be hashed by bucket key, so the writer chain has an all-to-all upstream
edge and ends up in a single Flink pipelined region. In that case, even
after replacing CommitterOperator with a coordinator, the writer DAG still
has only one region, so region failover would not be possible for this
sink. Since PIP-30 lists region failover as a primary motivation, this
scope choice does not appear to deliver that goal as it stands.
What do you think?
Thanks,
Biao /'bɪ.aʊ/
On Mon, 25 May 2026 at 10:07, Jingsong Li <[email protected]> wrote:
> Thanks fishfishfishfishaa,
>
> I think you can just raise a PR to this.
>
> Best,
> Jingsong
>
> On Sun, May 24, 2026 at 11:29
PM 俞淦 <[email protected]> wrote:
> >
> > Hi Paimon Community,
> >
> > Following PIP-30 (Improvement For Paimon Committer In Flink)&nbsp;, I
> have completed a prototype implementation of the Paimon Write Coordinator
> (PWC)&nbsp;on my personal branch. The design replaces the current
> CommitOperator&nbsp;with a JobManager-level OperatorCoordinator&nbsp;to
> eliminate the network shuffle bottleneck for commit messages.
> >
> > Code Branch:&nbsp;
> https://github.com/fishfishfishfishaa/paimon/tree/yg-pip3-pwc
> >
> > Key Design Decision: Custom HDFS State
> >
> > Instead of using Flink's native StateBackend, I chose custom HDFS state
> management for the following reasons:
> >
> >
> > Timing mismatch: Flink's operator state snapshot occurs after
> snapshotState()&nbsp;returns, but PWC needs to persist aggregated
> CommitMessages before&nbsp;acknowledging WriteOperators to proceed. This
> pre-barrier persistence requirement doesn't align well with Flink's native
> state lifecycle.
> >
> >
> >
> > Explicit recovery control: Custom state enables a clean
> resetToCheckpoint()&nbsp;logic — scan HDFS, filter by checkpointId <=
> restoredCheckpointId, perform idempotent commit via Paimon's native
> deduplication, then cleanup.
> >
> >
> >
> > Decoupled cleanup: HDFS state deletion is independent of Flink
> checkpoint retention policies, avoiding potential state bloat.
> >
> >
> > State Storage Path:
> > <flink-checkpoint-dir&gt;/pwc/<operatorId&gt;/checkpoint-{ckId}.state
> >
> > Current Scope
> >
> >
> > Supported: FixedBucketSink&nbsp;(primary use case for PIP-30)
> >
> >
> > Implementation Highlights (Core Flow)
> > WriteOperator.preBarrier() &nbsp;→ send FileInfoEvent to PWC (RPC)
> &nbsp;→ PWC aggregates, persists to HDFS, sends ACK &nbsp;→
> WriteOperator.snapshotState() completes (local state only) &nbsp;→ Flink
> confirms checkpoint &nbsp;→ PWC.notifyCheckpointComplete() →
> Paimon.commit() → delete HDFS state
> >
> >
> > Recovery Logic:
> > resetToCheckpoint()&nbsp;scans HDFS, handles <= restoredCkId&nbsp;via
> idempotent commit, and treats &gt; restoredCkId&nbsp;as orphan cleanup.
> >
> > Testing Status &amp; Request for Guidance
> >
> > Currently, I have added unit&nbsp;tests covering the recovery
> logic&nbsp;of the custom HDFS state. I would appreciate the community's
> advice on what other test cases are essential for this PIP.&nbsp;
> >
> > Once I receive initial feedback and strengthen the test coverage
> accordingly, I will open a formal Pull Request.
> >
> > Looking forward to your suggestions!
> >
> > Thank you,
> > fishfishfishfishaa/ yugan
> >
> >
> >
> >
> > 俞淦
> > [email protected]
>