Hi Nicholas Jiang,

Thank you for your questions — I'll try to address them one by one.

*> 1. Why do we need a custom OperationLog interface in Celeborn:*
In Celeborn, there are many components that are engine-agnostic and still
need to store and restore their internal states. For example, the
LifecycleManager needs to track which shuffle slots have been allocated.
These components are not directly dependent on Flink, so we cannot rely on
Flink’s built-in interfaces like JobEvent. Therefore, it is necessary for
us to define our own interface specific to Celeborn for this purpose.

*> 2. Why we do not use Flink’s RecoverableStore in this CIP:*
In this CIP, we do not directly use Flink's RecoverableStore because the
ShuffleMaster-related interfaces defined in FLIP-383 had certain
limitations in Flink 1.20 that prevented Celeborn from using Flink to store
its own state. These issues were addressed in Flink 2.0.
- If we want to support Flink 1.20, Celeborn must implement its own
mechanism for snapshotting and restoring state.
- If we plan to target Flink 2.0 or later, then we can fully leverage
Flink’s APIs to manage Celeborn’s state without introducing a separate
RecoverableStore.
Whether or not we should support Flink 1.20 is something we can discuss
further.

To be more specific, here are the key issues with Flink 1.20:
- Lack of snapshot and restore interfaces for ShuffleService cluster state.
The Celeborn client needs to store the CelebornAppId identifier to
communicate with the Celeborn cluster. This ID is part of the cluster
state, but no corresponding mechanism existed in Flink 1.20.
- The ShuffleService had no way to determine whether it was running for the
first time or recovering from a failure. This made it difficult for the
Celeborn client to choose an appropriate initialization point.

*> 3. RemoteShuffleMaster implementation:*
The RemoteShuffleMaster does need to implement the interfaces defined in
FLIP-383. However, this point wasn't clearly stated in the CIP. I will make
sure to clarify and add it in the next revision.

*> 4. Impact of Celeborn upgrades:*
I believe this feature should not be affected by future upgrades to
Celeborn. It is designed to attempt job recovery only in the case of
JobManager failure. If during such a recovery, the Shuffle data becomes
unavailable due to an upgrade, it would naturally trigger task failure and
re-execution, just as it would in any other scenario.

Finally, thank you again for raising these questions — they helped me
realize that there are several details in the CIP that are not yet clearly
explained. I will work on improving the document accordingly.

Best regards,
Xu Huang


Nicholas Jiang <nicholasji...@apache.org> 于2025年7月28日周一 17:03写道:
>
> Hi Xu Huang,
>
> Thanks for driving CIP-21 to support Flink job recovery mechanism of JM.
I have some questions about this proposal:
>
> 1. The OperationLog interface is introduced to represent various
operations within Celeborn. Could we reuse the JobEvent interface instead
of introducing OperationLog interface?
>
> 2. RecoverableStore is an interface that supports asynchronous writing
and sequential reading of OperationLog to reliable storage. Could we reuse
JobEventStore introduced in FLIP-383? IMO, it does not need to introduce
RecoverableStore in celeborn.
>
> 3. There are some interfaces introduced in FLIP-383 including
ShuffleMaster#getAllPartitionWithMetrics. Should RemoteShuffleMaster
implement introduced interfaces to support? The proposed changes does not
mention this point?
>
> 4. How does this support guarantee status compatibility during the
upgrade process?
>
> Regards,
> Nicholas Jiang
>
> On 2025/07/28 07:25:00 Xu Huang wrote:
> > Hi community,
> >
> > I’d like to initiate a discussion regarding CIP-21: Support Flink job
> > recovery from JobManager failure for Apache Celeborn [1].
> >
> > This proposal aims to enable Celeborn to support Flink’s batch job
recovery
> > feature [2]. With this enhancement, Flink batch jobs using Celeborn
will be
> > able to recover from previously completed stages after a JobManager
> > failure, eliminating the need to restart the entire job from scratch.
> >
> > Your feedback and questions are welcome — please feel free to share any
> > thoughts you may have.
> >
> > Best regards,
> > Xu Huang
> >
> > [1] CIP-21: Support flink jobs recovery from JobManager failure for
Apache
> > Celeborn. https://cwiki.apache.org/confluence/x/kw9JFg
> > [2] FLIP-383: Support Job Recovery from JobMaster Failures for Batch
Jobs.
> > https://cwiki.apache.org/confluence/x/QwqZE
> >

Reply via email to