Thanks fishfishfishfishaa, I think you can just raise a PR to this.
Best, Jingsong On Sun, May 24, 2026 at 11:29 PM 俞淦 <[email protected]> wrote: > > Hi Paimon Community, > > Following PIP-30 (Improvement For Paimon Committer In Flink) , I have > completed a prototype implementation of the Paimon Write Coordinator > (PWC) on my personal branch. The design replaces the current > CommitOperator with a JobManager-level OperatorCoordinator to > eliminate the network shuffle bottleneck for commit messages. > > Code > Branch: https://github.com/fishfishfishfishaa/paimon/tree/yg-pip3-pwc > > Key Design Decision: Custom HDFS State > > Instead of using Flink's native StateBackend, I chose custom HDFS state > management for the following reasons: > > > Timing mismatch: Flink's operator state snapshot occurs after > snapshotState() returns, but PWC needs to persist aggregated > CommitMessages before acknowledging WriteOperators to proceed. This > pre-barrier persistence requirement doesn't align well with Flink's native > state lifecycle. > > > > Explicit recovery control: Custom state enables a clean > resetToCheckpoint() logic — scan HDFS, filter by checkpointId <= > restoredCheckpointId, perform idempotent commit via Paimon's native > deduplication, then cleanup. > > > > Decoupled cleanup: HDFS state deletion is independent of Flink checkpoint > retention policies, avoiding potential state bloat. > > > State Storage Path: > <flink-checkpoint-dir>/pwc/<operatorId>/checkpoint-{ckId}.state > > Current Scope > > > Supported: FixedBucketSink (primary use case for PIP-30) > > > Implementation Highlights (Core Flow) > WriteOperator.preBarrier() → send FileInfoEvent to PWC (RPC) → > PWC aggregates, persists to HDFS, sends ACK → > WriteOperator.snapshotState() completes (local state only) → Flink > confirms checkpoint → PWC.notifyCheckpointComplete() → Paimon.commit() > → delete HDFS state > > > Recovery Logic: > resetToCheckpoint() scans HDFS, handles <= restoredCkId via > idempotent commit, and treats > restoredCkId as orphan cleanup. > > Testing Status & Request for Guidance > > Currently, I have added unit tests covering the recovery logic of > the custom HDFS state. I would appreciate the community's advice on what > other test cases are essential for this PIP. > > Once I receive initial feedback and strengthen the test coverage accordingly, > I will open a formal Pull Request. > > Looking forward to your suggestions! > > Thank you, > fishfishfishfishaa/ yugan > > > > > 俞淦 > [email protected]
