Hi Paimon Community, Following PIP-30 (Improvement For Paimon Committer In Flink) , I have completed a prototype implementation of the Paimon Write Coordinator (PWC) on my personal branch. The design replaces the current CommitOperator with a JobManager-level OperatorCoordinator to eliminate the network shuffle bottleneck for commit messages.
Code Branch: https://github.com/fishfishfishfishaa/paimon/tree/yg-pip3-pwc Key Design Decision: Custom HDFS State Instead of using Flink's native StateBackend, I chose custom HDFS state management for the following reasons: Timing mismatch: Flink's operator state snapshot occurs after snapshotState() returns, but PWC needs to persist aggregated CommitMessages before acknowledging WriteOperators to proceed. This pre-barrier persistence requirement doesn't align well with Flink's native state lifecycle. Explicit recovery control: Custom state enables a clean resetToCheckpoint() logic — scan HDFS, filter by checkpointId <= restoredCheckpointId, perform idempotent commit via Paimon's native deduplication, then cleanup. Decoupled cleanup: HDFS state deletion is independent of Flink checkpoint retention policies, avoiding potential state bloat. State Storage Path: <flink-checkpoint-dir>/pwc/<operatorId>/checkpoint-{ckId}.state Current Scope Supported: FixedBucketSink (primary use case for PIP-30) Implementation Highlights (Core Flow) WriteOperator.preBarrier() → send FileInfoEvent to PWC (RPC) → PWC aggregates, persists to HDFS, sends ACK → WriteOperator.snapshotState() completes (local state only) → Flink confirms checkpoint → PWC.notifyCheckpointComplete() → Paimon.commit() → delete HDFS state Recovery Logic: resetToCheckpoint() scans HDFS, handles <= restoredCkId via idempotent commit, and treats > restoredCkId as orphan cleanup. Testing Status & Request for Guidance Currently, I have added unit tests covering the recovery logic of the custom HDFS state. I would appreciate the community's advice on what other test cases are essential for this PIP. Once I receive initial feedback and strengthen the test coverage accordingly, I will open a formal Pull Request. Looking forward to your suggestions! Thank you, fishfishfishfishaa/ yugan 俞淦 [email protected]
