Hi Paimon Community,

Following PIP-30 (Improvement For Paimon Committer In Flink) , I have 
completed a prototype implementation of the Paimon Write Coordinator 
(PWC) on my personal branch. The design replaces the current 
CommitOperator with a JobManager-level OperatorCoordinator to 
eliminate the network shuffle bottleneck for commit messages.

Code Branch: https://github.com/fishfishfishfishaa/paimon/tree/yg-pip3-pwc

Key Design Decision: Custom HDFS State

Instead of using Flink's native StateBackend, I chose custom HDFS state 
management for the following reasons:


Timing mismatch: Flink's operator state snapshot occurs after 
snapshotState() returns, but PWC needs to persist aggregated 
CommitMessages before acknowledging WriteOperators to proceed. This 
pre-barrier persistence requirement doesn't align well with Flink's native 
state lifecycle.



Explicit recovery control: Custom state enables a clean 
resetToCheckpoint()&nbsp;logic — scan HDFS, filter by checkpointId <= 
restoredCheckpointId, perform idempotent commit via Paimon's native 
deduplication, then cleanup.



Decoupled cleanup: HDFS state deletion is independent of Flink checkpoint 
retention policies, avoiding potential state bloat.


State Storage Path:
<flink-checkpoint-dir&gt;/pwc/<operatorId&gt;/checkpoint-{ckId}.state

Current Scope


Supported: FixedBucketSink&nbsp;(primary use case for PIP-30)


Implementation Highlights (Core Flow)
WriteOperator.preBarrier()   &nbsp;→ send FileInfoEvent to PWC (RPC)  &nbsp;→ 
PWC aggregates, persists to HDFS, sends ACK  &nbsp;→ 
WriteOperator.snapshotState() completes (local state only)  &nbsp;→ Flink 
confirms checkpoint  &nbsp;→ PWC.notifyCheckpointComplete() → Paimon.commit() → 
delete HDFS state


Recovery Logic:
resetToCheckpoint()&nbsp;scans HDFS, handles <= restoredCkId&nbsp;via 
idempotent commit, and treats &gt; restoredCkId&nbsp;as orphan cleanup.

Testing Status &amp; Request for Guidance

Currently, I have added unit&nbsp;tests covering the recovery logic&nbsp;of the 
custom HDFS state. I would appreciate the community's advice on what other test 
cases are essential for this PIP.&nbsp;

Once I receive initial feedback and strengthen the test coverage accordingly, I 
will open a formal Pull Request.

Looking forward to your suggestions!

Thank you,
fishfishfishfishaa/ yugan




俞淦
[email protected]

Reply via email to