Hi yugan, Thanks a lot for sharing the prototype and the detailed write-up. The PIP-30 direction is great, and the prototype gives a very concrete starting point for discussion.
For background, an earlier mailing-list discussion on the same topic is here: https://lists.apache.org/thread/5n1mwc7wc3cfjogz56w9k5ql3zyl00lw And a related design doc is here: https://docs.google.com/document/d/1asWKzoytfeB1D8bS_yRIAHnpR40frLf0fnPn2-WSL74/edit?tab=t.0 Both might be useful as reference for the discussion below. I have three observations about the current PWC prototype. 1. Synchronous wait on the writer barrier path In the current flow, after the barrier passes through, each writer subtask's snapshotState() calls sender.snapshot(checkpointId).get() and blocks until the coordinator side completes the future. On the coordinator side, that future is only signalled after all subtasks have sent their FileInfoEvent , the coordinator has aggregated the committables, and PwcStateManager.writeState has finished its HDFS hsync + rename . The coordinator runs all of this on a single-thread event loop. PIP-30 was opened mainly to help large-scale Flink-to-Paimon write jobs, where checkpoint duration under high parallelism is already sensitive. With this design, every subtask's checkpoint additionally has to wait on (a) the slowest-arriving subtask, (b) one HDFS hsync on the JM, and (c) the JM's single-thread event loop draining N events. This adds a new source of latency directly on the barrier path. 2. Two coexisting state systems on the JM side The prototype writes aggregated CommitMessages directly through Hadoop FileSystem to <baseDir>/<operatorId>/checkpoint-{ckId}.state , while checkpointCoordinator itself returns an empty byte array — i.e. the Flink-native coordinator state channel is not used for the in-flight committables. The deeper concern here is not the HDFS layout itself, but that the coordinator now lives across two state systems at the same time: Flink's checkpoint/restore lifecycle on one side, and the self-managed HDFS state on the other. The two have to stay consistent across every transition — checkpoint complete, checkpoint abort, JM failover, job restart, full job cancel — and the consistency contract between them is not obvious from the code. Any divergence (e.g. a notifyCheckpointAborted racing with writeState , or a JM crash between committer.commit and stateManager.deleteState ) becomes a correctness question that the coordinator alone has to reason about. 3. FixedBucketSink scope vs region failover In the current branch, FixedBucketSink is the only sink wired to the coordinator factory. As far as I can tell, FixedBucketSink requires records to be hashed by bucket key, so the writer chain has an all-to-all upstream edge and ends up in a single Flink pipelined region. In that case, even after replacing CommitterOperator with a coordinator, the writer DAG still has only one region, so region failover would not be possible for this sink. Since PIP-30 lists region failover as a primary motivation, this scope choice does not appear to deliver that goal as it stands. What do you think? Thanks, Biao /'bɪ.aʊ/ On Mon, 25 May 2026 at 10:07, Jingsong Li <[email protected]> wrote: > Thanks fishfishfishfishaa, > > I think you can just raise a PR to this. > > Best, > Jingsong > > On Sun, May 24, 2026 at 11:29 PM 俞淦 <[email protected]> wrote: > > > > Hi Paimon Community, > > > > Following PIP-30 (Improvement For Paimon Committer In Flink) , I > have completed a prototype implementation of the Paimon Write Coordinator > (PWC) on my personal branch. The design replaces the current > CommitOperator with a JobManager-level OperatorCoordinator to > eliminate the network shuffle bottleneck for commit messages. > > > > Code Branch: > https://github.com/fishfishfishfishaa/paimon/tree/yg-pip3-pwc > > > > Key Design Decision: Custom HDFS State > > > > Instead of using Flink's native StateBackend, I chose custom HDFS state > management for the following reasons: > > > > > > Timing mismatch: Flink's operator state snapshot occurs after > snapshotState() returns, but PWC needs to persist aggregated > CommitMessages before acknowledging WriteOperators to proceed. This > pre-barrier persistence requirement doesn't align well with Flink's native > state lifecycle. > > > > > > > > Explicit recovery control: Custom state enables a clean > resetToCheckpoint() logic — scan HDFS, filter by checkpointId <= > restoredCheckpointId, perform idempotent commit via Paimon's native > deduplication, then cleanup. > > > > > > > > Decoupled cleanup: HDFS state deletion is independent of Flink > checkpoint retention policies, avoiding potential state bloat. > > > > > > State Storage Path: > > <flink-checkpoint-dir>/pwc/<operatorId>/checkpoint-{ckId}.state > > > > Current Scope > > > > > > Supported: FixedBucketSink (primary use case for PIP-30) > > > > > > Implementation Highlights (Core Flow) > > WriteOperator.preBarrier() → send FileInfoEvent to PWC (RPC) > → PWC aggregates, persists to HDFS, sends ACK → > WriteOperator.snapshotState() completes (local state only) → Flink > confirms checkpoint → PWC.notifyCheckpointComplete() → > Paimon.commit() → delete HDFS state > > > > > > Recovery Logic: > > resetToCheckpoint() scans HDFS, handles <= restoredCkId via > idempotent commit, and treats > restoredCkId as orphan cleanup. > > > > Testing Status & Request for Guidance > > > > Currently, I have added unit tests covering the recovery > logic of the custom HDFS state. I would appreciate the community's > advice on what other test cases are essential for this PIP. > > > > Once I receive initial feedback and strengthen the test coverage > accordingly, I will open a formal Pull Request. > > > > Looking forward to your suggestions! > > > > Thank you, > > fishfishfishfishaa/ yugan > > > > > > > > > > 俞淦 > > [email protected] >
