Thanks fishfishfishfishaa,

I think you can just raise a PR to this.

Best,
Jingsong

On Sun, May 24, 2026 at 11:29 PM 俞淦 <[email protected]> wrote:
>
> Hi Paimon Community,
>
> Following PIP-30 (Improvement For Paimon Committer In Flink)&nbsp;, I have 
> completed a prototype implementation of the Paimon Write Coordinator 
> (PWC)&nbsp;on my personal branch. The design replaces the current 
> CommitOperator&nbsp;with a JobManager-level OperatorCoordinator&nbsp;to 
> eliminate the network shuffle bottleneck for commit messages.
>
> Code 
> Branch:&nbsp;https://github.com/fishfishfishfishaa/paimon/tree/yg-pip3-pwc
>
> Key Design Decision: Custom HDFS State
>
> Instead of using Flink's native StateBackend, I chose custom HDFS state 
> management for the following reasons:
>
>
> Timing mismatch: Flink's operator state snapshot occurs after 
> snapshotState()&nbsp;returns, but PWC needs to persist aggregated 
> CommitMessages before&nbsp;acknowledging WriteOperators to proceed. This 
> pre-barrier persistence requirement doesn't align well with Flink's native 
> state lifecycle.
>
>
>
> Explicit recovery control: Custom state enables a clean 
> resetToCheckpoint()&nbsp;logic — scan HDFS, filter by checkpointId <= 
> restoredCheckpointId, perform idempotent commit via Paimon's native 
> deduplication, then cleanup.
>
>
>
> Decoupled cleanup: HDFS state deletion is independent of Flink checkpoint 
> retention policies, avoiding potential state bloat.
>
>
> State Storage Path:
> <flink-checkpoint-dir&gt;/pwc/<operatorId&gt;/checkpoint-{ckId}.state
>
> Current Scope
>
>
> Supported: FixedBucketSink&nbsp;(primary use case for PIP-30)
>
>
> Implementation Highlights (Core Flow)
> WriteOperator.preBarrier()   &nbsp;→ send FileInfoEvent to PWC (RPC)  &nbsp;→ 
> PWC aggregates, persists to HDFS, sends ACK  &nbsp;→ 
> WriteOperator.snapshotState() completes (local state only)  &nbsp;→ Flink 
> confirms checkpoint  &nbsp;→ PWC.notifyCheckpointComplete() → Paimon.commit() 
> → delete HDFS state
>
>
> Recovery Logic:
> resetToCheckpoint()&nbsp;scans HDFS, handles <= restoredCkId&nbsp;via 
> idempotent commit, and treats &gt; restoredCkId&nbsp;as orphan cleanup.
>
> Testing Status &amp; Request for Guidance
>
> Currently, I have added unit&nbsp;tests covering the recovery logic&nbsp;of 
> the custom HDFS state. I would appreciate the community's advice on what 
> other test cases are essential for this PIP.&nbsp;
>
> Once I receive initial feedback and strengthen the test coverage accordingly, 
> I will open a formal Pull Request.
>
> Looking forward to your suggestions!
>
> Thank you,
> fishfishfishfishaa/ yugan
>
>
>
>
> 俞淦
> [email protected]

Reply via email to