Hi yugan,

Thanks a lot for sharing the prototype and the detailed write-up. The
PIP-30 direction is great, and the prototype gives a very concrete starting
point for discussion.

For background, an earlier mailing-list discussion on the same topic is
here: https://lists.apache.org/thread/5n1mwc7wc3cfjogz56w9k5ql3zyl00lw And
a related design doc is here:
https://docs.google.com/document/d/1asWKzoytfeB1D8bS_yRIAHnpR40frLf0fnPn2-WSL74/edit?tab=t.0

Both might be useful as reference for the discussion below.

I have three observations about the current PWC prototype.

1. Synchronous wait on the writer barrier path

In the current flow, after the barrier passes through, each writer
subtask's snapshotState() calls sender.snapshot(checkpointId).get() and
blocks until the coordinator side completes the future. On the coordinator
side, that future is only signalled after all subtasks have sent their
FileInfoEvent , the coordinator has aggregated the committables, and
PwcStateManager.writeState has finished its HDFS hsync + rename . The
coordinator runs all of this on a single-thread event loop.

PIP-30 was opened mainly to help large-scale Flink-to-Paimon write jobs,
where checkpoint duration under high parallelism is already sensitive. With
this design, every subtask's checkpoint additionally has to wait on (a) the
slowest-arriving subtask, (b) one HDFS hsync on the JM, and (c) the JM's
single-thread event loop draining N events. This adds a new source of
latency directly on the barrier path.

2. Two coexisting state systems on the JM side

The prototype writes aggregated CommitMessages directly through Hadoop
FileSystem to <baseDir>/<operatorId>/checkpoint-{ckId}.state , while
checkpointCoordinator itself returns an empty byte array — i.e. the
Flink-native coordinator state channel is not used for the in-flight
committables.

The deeper concern here is not the HDFS layout itself, but that the
coordinator now lives across two state systems at the same time: Flink's
checkpoint/restore lifecycle on one side, and the self-managed HDFS state
on the other. The two have to stay consistent across every transition —
checkpoint complete, checkpoint abort, JM failover, job restart, full job
cancel — and the consistency contract between them is not obvious from the
code. Any divergence (e.g. a notifyCheckpointAborted racing with writeState
, or a JM crash between committer.commit and stateManager.deleteState )
becomes a correctness question that the coordinator alone has to reason
about.

3. FixedBucketSink scope vs region failover

In the current branch, FixedBucketSink is the only sink wired to the
coordinator factory. As far as I can tell, FixedBucketSink requires records
to be hashed by bucket key, so the writer chain has an all-to-all upstream
edge and ends up in a single Flink pipelined region. In that case, even
after replacing CommitterOperator with a coordinator, the writer DAG still
has only one region, so region failover would not be possible for this
sink. Since PIP-30 lists region failover as a primary motivation, this
scope choice does not appear to deliver that goal as it stands.

What do you think?

Thanks,
Biao /'bɪ.aʊ/



On Mon, 25 May 2026 at 10:07, Jingsong Li <[email protected]> wrote:

> Thanks fishfishfishfishaa,
>
> I think you can just raise a PR to this.
>
> Best,
> Jingsong
>
> On Sun, May 24, 2026 at 11:29 PM 俞淦 <[email protected]> wrote:
> >
> > Hi Paimon Community,
> >
> > Following PIP-30 (Improvement For Paimon Committer In Flink)&nbsp;, I
> have completed a prototype implementation of the Paimon Write Coordinator
> (PWC)&nbsp;on my personal branch. The design replaces the current
> CommitOperator&nbsp;with a JobManager-level OperatorCoordinator&nbsp;to
> eliminate the network shuffle bottleneck for commit messages.
> >
> > Code Branch:&nbsp;
> https://github.com/fishfishfishfishaa/paimon/tree/yg-pip3-pwc
> >
> > Key Design Decision: Custom HDFS State
> >
> > Instead of using Flink's native StateBackend, I chose custom HDFS state
> management for the following reasons:
> >
> >
> > Timing mismatch: Flink's operator state snapshot occurs after
> snapshotState()&nbsp;returns, but PWC needs to persist aggregated
> CommitMessages before&nbsp;acknowledging WriteOperators to proceed. This
> pre-barrier persistence requirement doesn't align well with Flink's native
> state lifecycle.
> >
> >
> >
> > Explicit recovery control: Custom state enables a clean
> resetToCheckpoint()&nbsp;logic — scan HDFS, filter by checkpointId <=
> restoredCheckpointId, perform idempotent commit via Paimon's native
> deduplication, then cleanup.
> >
> >
> >
> > Decoupled cleanup: HDFS state deletion is independent of Flink
> checkpoint retention policies, avoiding potential state bloat.
> >
> >
> > State Storage Path:
> > <flink-checkpoint-dir&gt;/pwc/<operatorId&gt;/checkpoint-{ckId}.state
> >
> > Current Scope
> >
> >
> > Supported: FixedBucketSink&nbsp;(primary use case for PIP-30)
> >
> >
> > Implementation Highlights (Core Flow)
> > WriteOperator.preBarrier()   &nbsp;→ send FileInfoEvent to PWC (RPC)
> &nbsp;→ PWC aggregates, persists to HDFS, sends ACK  &nbsp;→
> WriteOperator.snapshotState() completes (local state only)  &nbsp;→ Flink
> confirms checkpoint  &nbsp;→ PWC.notifyCheckpointComplete() →
> Paimon.commit() → delete HDFS state
> >
> >
> > Recovery Logic:
> > resetToCheckpoint()&nbsp;scans HDFS, handles <= restoredCkId&nbsp;via
> idempotent commit, and treats &gt; restoredCkId&nbsp;as orphan cleanup.
> >
> > Testing Status &amp; Request for Guidance
> >
> > Currently, I have added unit&nbsp;tests covering the recovery
> logic&nbsp;of the custom HDFS state. I would appreciate the community's
> advice on what other test cases are essential for this PIP.&nbsp;
> >
> > Once I receive initial feedback and strengthen the test coverage
> accordingly, I will open a formal Pull Request.
> >
> > Looking forward to your suggestions!
> >
> > Thank you,
> > fishfishfishfishaa/ yugan
> >
> >
> >
> >
> > 俞淦
> > [email protected]
>

Reply via email to