Hi yugan, Thanks for the thoughtful response. A few replies inline.
On observation 1 (synchronous wait) and observation 2 (two coexisting state systems) Both of these are addressed in the design discussed in the earlier mailing-list thread / Google Doc I shared (yellow-highlighted sections; the doc is on Google Docs because we ran into image-editing issues updating the original PIP-30 on cwiki). The key point: in-flight committables are persisted on the writer side through Flink's native operator state, not on the coordinator. With that, the coordinator does not need to durably persist CommitMessages before writers unblock — the handoff is a fire-and-forget event after snapshotState() , no synchronous HDFS write on the barrier path. The coordinator itself only owns a small amount of state (commitUser, watermark, listener state), which fits the Flink-native checkpointCoordinator / resetToCheckpoint channel. One state system on the JM side, so the dual-state consistency concern doesn't arise. Worth re-reading those sections — I think it covers (1) and (2) without needing the Flink-internal API workarounds you mentioned. On observation 3 (FixedBucketSink scope) Two concerns. First, splitting PIP-30 along sink-type lines is risky. FixedBucketSink and UnawareBucketSink share the same coordinator implementation and are tightly coupled at the implementation level. Landing "shuffle-free commit for FixedBucketSink" first will likely force a rewrite of the core design when UnawareBucketSink + region failover is added later, rather than a clean extension. Second, I'm not sure FixedBucketSink should be in PIP-30 at all. Our internal implementation of this design — which is already deployed in production — doesn't wire up FixedBucketSink , precisely because the coordinator commit doesn't solve region failover there (single-region topology regardless). If region failover is the primary motivation of PIP-30, including a sink that can't benefit from it seems to dilute the scope. Proposal If you don't mind, I'd like to open a GitHub issue to upstream our internal implementation (the design itself is in the Google Doc), so the discussion has concrete code to anchor on alongside your prototype branch. Happy to have you participate and check whether it addresses the three points. What do you think? Thanks, Biao /'bɪ.aʊ/ On Thu, 4 Jun 2026 at 23:32, 俞淦 <[email protected]> wrote: > Hi Biao, > > > Thanks for the detailed review and for pointing to the earlier > mailing-list thread and design doc. I've read both and they provide very > useful context. Let me address each observation. > > > 1. Synchronous wait on the writer barrier path > > > Acknowledged. The current sender.snapshot(ck).get() blocking design does > introduce a new latency source on the critical path. > > > My rationale was to ensure strict ordering: PWC must persist state before > writers complete snapshotState(), so that JM failover never loses in-flight > committables. However, you're right that the current implementation is too > pessimistic.Potential mitigations I'd like to explore: > > > (1) PWC sends ACK immediately upon receiving all FileInfoEvents, before > HDFS hsync. HDFS write proceeds async. Risk: JM crash between ACK and hsync > → committables lost. > (2) Writers send FileInfo in background during checkpoint interval, not at > barrier time. Barrier only sends a lightweight "ready" signal. Requires > larger change to writer side. > (3) Flink-internal API (even unstable) to access the checkpoint storage > location from OperatorCoordinator. This would solve both > eliminating the need for a custom HDFS state and avoiding the additional > blocking mechanism. > > > But I think we may only be able to reduce blocking, not eliminate it > entirely. > > > 2. Two coexisting state systems on the JM side > > > You're absolutely right — this is the deepest design risk in the prototype. > The root cause is a capability gap in Flink's OperatorCoordinator API: > checkpointCoordinator() returns byte[], but Flink gives the coordinator no > visibility into where that byte array is stored, when it is durably > persisted, or how to correlate it with the > notifyCheckpointComplete/resetToCheckpoint lifecycle. > > > Specifically: > > I cannot call CheckpointStorageAccess.getBaseLocation() (method doesn't > exist) > > > I cannot derive the checkpoint directory from Context (no > getJobInformation()) > > > Even if I could, the coordinator state is written after > checkpointCoordinator() returns, while I need persistence before writers > unblock > > > > This forced the self-managed HDFS path. But I agree the resulting > dual-state consistency is fragile. > > > 1) Is there a Flink-internal API (even unstable) to access the > checkpoint storage location from OperatorCoordinator? This > would solve both problem 1 and problem 2. > 2) Alternatively, would the community accept a design where PWC > state is fully embedded in the coordinator's byte[] state, and we accept > that snapshotState() on writers blocks only until the coordinator's > checkpointCoordinator() returns (not until HDFS is durable)? The risk is JM > crash between checkpointCoordinator() completion and Flink's actual state > persistence — but perhaps that's acceptable if Paimon's commit is > idempotent and can filter duplicates? > 3) Or should PIP-30 scope be reduced to only address the > network-shuffle elimination (problem 1), and defer the HA-recovery redesign > until Flink's coordinator API provides better storage integration? > > > 3. FixedBucketSink scope vs region failover > > > Valid catch. The current branch wires only FixedBucketSink because that's > the immediate production need, but you're correct that FixedBucketSink's > keyBy(bucketKey) creates an all-to-all edge, resulting in a single > pipelined region. Region failover is not possible in this topology > regardless of where the committer lives. > Clarification on PIP-30 motivation: > > > I think the original PIP-30 lists two goals: > (a) Eliminate network shuffle for commit messages (delivered by > coordinator RPC) > (b) Enable region failover by decoupling commit from writer region > > > Goal (a) is fully achieved even for FixedBucketSink — commit messages no > longer traverse the shuffle. > Goal (b) requires UnawareBucketSink (or a bucket-less hash-distribution > mode), where writers can be colocated with sources without keyBy. This is > not yet implemented in the prototype. > > > Proposed scope adjustment: > A. Keep PIP-30 as "coordinator-based commit for all sinks", but > acknowledge region failover only applies to UnawareBucketSink (future work) > B. Narrow PIP-30 to "shuffle-free commit for FixedBucketSink", spin out > region failover to a separate PIP > C. Delay PIP-30 until UnawareBucketSink coordinator integration is ready, > so both goals land together > > > I prefer option A if the community is willing to accept incremental > delivery, or option B if we want stricter scope discipline. What do you > think? > > > Based on this feedback, I see three possible paths: > > > Iterate on prototype: Implement async HDFS write (problem 1), investigate > Flink API for unified state (problem 2), keep FixedBucketSink scope with > documented limitation (problem 3). > > > Split the PIP: Separate "shuffle-free commit" (smaller, ready soon) from > "region failover architecture" (larger, needs UnawareBucketSink). > > > Pause for Flink API evolution: Wait for Flink to expose checkpoint storage > to coordinators, then redesign with single-state system. > > > I'd like community guidance on which path to pursue. I'm happy to revise > the prototype once we have consensus. > > > Thanks, > yugan > > > > > 原始邮件 > > > 发件人:Biao Liu <[email protected]> > 发件时间:2026年5月29日 22:15 > 收件人:dev <[email protected]> > 主题:Re: [DISCUSS] PIP-30: Paimon Write Coordinator - Replacing > CommitOperator with JobManager-level Coordination > > > > Hi yugan, > > > Thanks a lot for sharing the prototype and the detailed write-up. The > > PIP-30 direction is great, and the prototype gives a very concrete starting > point for discussion. > > > For background, an earlier mailing-list discussion on the same topic is > here: > https://lists.apache.org/thread/5n1mwc7wc3cfjogz56w9k5ql3zyl00lw And > a related design doc is here > <https://lists.apache.org/thread/5n1mwc7wc3cfjogz56w9k5ql3zyl00lw Anda related design doc is here> > : > > https://docs.google.com/document/d/1asWKzoytfeB1D8bS_yRIAHnpR40frLf0fnPn2-WSL74/edit?tab=t.0 > > > Both might be useful as reference for the discussion below. > > > I have three observations about the current PWC prototype. > > > 1. Synchronous wait on the writer barrier path > > > In the current flow, after the barrier passes through, each writer > > subtask's snapshotState() calls sender.snapshot(checkpointId).get() and > > blocks until the coordinator side completes the future. On the coordinator > > side, that future is only signalled after all subtasks have sent their > > FileInfoEvent , the coordinator has aggregated the committables, and > > PwcStateManager.writeState has finished its HDFS hsync + rename . The > > coordinator runs all of this on a single-thread event loop. > > > PIP-30 was opened mainly to help large-scale Flink-to-Paimon write jobs, > > where checkpoint duration under high parallelism is already sensitive. With > > this design, every subtask's checkpoint additionally has to wait on (a) the > > slowest-arriving subtask, (b) one HDFS hsync on the JM, and (c) the JM's > > single-thread event loop draining N events. This adds a new source of > latency directly on the barrier path. > > > 2. Two coexisting state systems on the JM side > > > The prototype writes aggregated CommitMessages directly through Hadoop > > FileSystem to <baseDir>/<operatorId>/checkpoint-{ckId}.state , while > > checkpointCoordinator itself returns an empty byte array — i.e. the > > Flink-native coordinator state channel is not used for the in-flight > committables. > > > The deeper concern here is not the HDFS layout itself, but that the > > coordinator now lives across two state systems at the same time: Flink's > > checkpoint/restore lifecycle on one side, and the self-managed HDFS state > > on the other. The two have to stay consistent across every transition — > > checkpoint complete, checkpoint abort, JM failover, job restart, full job > > cancel — and the consistency contract between them is not obvious from the > > code. Any divergence (e.g. a notifyCheckpointAborted racing with writeState > > , or a JM crash between committer.commit and stateManager.deleteState ) > > becomes a correctness question that the coordinator alone has to reason > about. > > 3. FixedBucketSink scope vs region failover > > > In the current branch, FixedBucketSink is the only sink wired to the > > coordinator factory. As far as I can tell, FixedBucketSink requires records > > to be hashed by bucket key, so the writer chain has an all-to-all upstream > > edge and ends up in a single Flink pipelined region. In that case, even > > after replacing CommitterOperator with a coordinator, the writer DAG still > > has only one region, so region failover would not be possible for this > > sink. Since PIP-30 lists region failover as a primary motivation, this > > scope choice does not appear to deliver that goal as it stands. > > What do you think? > > Thanks, > Biao /'bɪ.aʊ/ > > > > > On Mon, 25 May 2026 at 10:07, Jingsong Li < > [email protected]> wrote: > > > Thanks fishfishfishfishaa, > > > > > I think you can just raise a PR to this. > > > > Best, > > Jingsong > > > > > On Sun, May 24, 2026 at 11:29 > PM 俞淦 < > [email protected]> wrote: > > > > > > Hi Paimon Community, > > > > > > > Following PIP-30 (Improvement For Paimon Committer In Flink)&nbsp;, I > > > have completed a prototype implementation of the Paimon Write Coordinator > > > (PWC)&nbsp;on my personal branch. The design replaces the current > > > CommitOperator&nbsp;with a JobManager-level OperatorCoordinator&nbsp;to > > > eliminate the network shuffle bottleneck for commit messages. > > > > > > Code Branch:&nbsp; > > https://github.com/fishfishfishfishaa/paimon/tree/yg-pip3-pwc > > > > <https://github.com/fishfishfishfishaa/paimon/tree/yg-pip3-pwc> >> > ; > > > > Key Design Decision: Custom HDFS State > > > > > > > Instead of using Flink's native StateBackend, I chose custom HDFS state > > management for the following reasons: > > > > > > > > > > Timing mismatch: Flink's operator state snapshot occurs after > > > snapshotState()&nbsp;returns, but PWC needs to persist aggregated > > > CommitMessages before&nbsp;acknowledging WriteOperators to proceed. This > > > pre-barrier persistence requirement doesn't align well with Flink's native > > state lifecycle. > > > > > > > > > > > > > Explicit recovery control: Custom state enables a clean > > > resetToCheckpoint()&nbsp;logic — scan HDFS, filter by checkpointId <= > > > restoredCheckpointId, perform idempotent commit via Paimon's native > > deduplication, then cleanup. > > > > > > > > > > > > > Decoupled cleanup: HDFS state deletion is independent of Flink > > > checkpoint retention policies, avoiding potential state bloat. > > > > > > > > > State Storage Path: > > > > <flink-checkpoint-dir&gt;/pwc/<operatorId&gt;/checkpoint-{ckId}.state > > > > > > Current Scope > > > > > > > > > > Supported: FixedBucketSink&nbsp;(primary use case for PIP-30) > > > > > > > > > Implementation Highlights (Core Flow) > > > > WriteOperator.preBarrier() &nbsp;→ send FileInfoEvent to PWC (RPC) > > > &nbsp;→ PWC aggregates, persists to HDFS, sends ACK &nbsp;→ > > > WriteOperator.snapshotState() completes (local state only) &nbsp;→ Flink > > > confirms checkpoint &nbsp;→ PWC.notifyCheckpointComplete() → > > Paimon.commit() → delete HDFS state > > > > > > > > > Recovery Logic: > > > > resetToCheckpoint()&nbsp;scans HDFS, handles <= restoredCkId&nbsp;via > > > idempotent commit, and treats &gt; restoredCkId&nbsp;as orphan cleanup. > > > > > > > Testing Status &amp; Request for Guidance > > > > > > > Currently, I have added unit&nbsp;tests covering the recovery > > > logic&nbsp;of the custom HDFS state. I would appreciate the community's > > > advice on what other test cases are essential for this PIP.&nbsp; > > > > > > > Once I receive initial feedback and strengthen the test coverage > > > accordingly, I will open a formal Pull Request. > > > > > > > Looking forward to your suggestions! > > > > > > Thank you, > > > fishfishfishfishaa/ yugan > > > > > > > > > > > > > > > 俞淦 > > > [email protected] > >
