[ 
https://issues.apache.org/jira/browse/FLINK-36873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuhuang updated FLINK-36873:
----------------------------
    Description: 
I've identified several issues while attempting to enable Apache Celeborn to 
support Flink batch job recovery.

*1. RestoreState Invocation*
 * The method _*{{ShuffleMaster#restoreState}}*_ should be triggered regardless 
of whether the Flink job requires recovery.
 * This method signifies that a Flink job needs to restore its state, but it is 
currently called only after {_}*{{ShuffleMaster#registerJob}}*{_}.

 * Consequently, it might not be invoked if the Flink job does not require 
recovery.

 * For Celeborn, this creates uncertainty regarding when to initialize certain 
components; if the initialization occurs during {*}_{{registerJob}}_{*}, it may 
lack essential information from the stored snapshot, whereas if it takes place 
during {*}_{{restoreState}}_{*}, there is a risk that it may not be invoked at 
all.

*2. JobID Information Requirement* 
 * Several methods in _*{{ShuffleMaster}}*_ should include _*JobID*_ 
information: {*}_{{ShuffleMaster#supportsBatchSnapshot}}_{*}, 
{_}*{{ShuffleMaster#snapshotState}}*{_}, and 
{_}*{{ShuffleMaster#restoreState}}*{_}.
 * These methods are intended for job-granularity state storage and 
restoration, but they currently do not incorporate JobID.

 * Consequently, Celeborn is unable to determine which job triggered these 
calls.

*3. Hybrid Shuffle support batch job recovery* 
 * The hybrid shuffle implementation supports batch job recovery; however, as 
it stands, Flink's hybrid shuffle does not facilitate this.
 * Because the Celeborn integration with Flink's hybrid shuffle utilizes 
{{{}*_Flink#NettyShuffleService_*{}}}, and since that service does not support 
batch job recovery for hybrid shuffle, the integration faces limitations.

{*}4. Cluster granularity store/restore state{*}:
 * Presently, _*{{ShuffleMaster}}*_ only offers job-granularity interfaces for 
storing and restoring state, as the _*{{NettyShuffleService}}*_ is stateless in 
terms of cluster granularity.
 * However, _*{{Celeborn#ShuffleMaster}}*_ needs to communicate with the 
Celeborn Master, necessitating the storage of certain cluster-level states, 
such as {_}*{{CelebornAppId}}*{_}.

 * In my opinion, the cluster-granularity store state interface can be execute 
after {_}*{{ShuffleMaster#start}}*{_}, and _*{{ShuffleMaster#start}}*_ adding a 
snapshot parameter to restore the cluster state.

  was:
I've identified several issues while attempting to enable Apache Celeborn to 
support Flink job recovery.
 # *Restore State Invocation*

 ** The method _*{{ShuffleMaster#restoreState}}*_ should be triggered 
regardless of whether the Flink job requires recovery.

 ** This method signifies that a Flink job needs to restore its state, but it 
is currently called only after {_}*{{ShuffleMaster#registerJob}}*{_}.

 ** Consequently, it might not be invoked if the Flink job does not require 
recovery.

 ** For Celeborn, this creates uncertainty regarding when to initialize certain 
components; if the initialization occurs during {*}_{{registerJob}}_{*}, it may 
lack essential information from the stored snapshot, whereas if it takes place 
during {*}_{{restoreState}}_{*}, there is a risk that it may not be invoked at 
all.

 # *JobID Information Requirement* 

 ** Several methods in _*{{ShuffleMaster}}*_ should include _*JobID*_ 
information: {*}_{{ShuffleMaster#supportsBatchSnapshot}}_{*}, 
{_}*{{ShuffleMaster#snapshotState}}*{_}, and 
{_}*{{ShuffleMaster#restoreState}}*{_}.

 ** These methods are intended for job-granularity state storage and 
restoration, but they currently do not incorporate JobID.

 ** Consequently, Celeborn is unable to determine which job triggered these 
calls.

 # *Hybrid Shuffle support batch job recovery* 

 ** The hybrid shuffle implementation supports batch job recovery; however, as 
it stands, Flink's hybrid shuffle does not facilitate this.

 ** Because the Celeborn integration with Flink's hybrid shuffle utilizes 
{{{}*_Flink#NettyShuffleService_*{}}}, and since that service does not support 
batch job recovery for hybrid shuffle, the integration faces limitations.

 # {*}Cluster Granularity Store/Restore State{*}:

 ** Presently, _*{{ShuffleMaster}}*_ only offers job-granularity interfaces for 
storing and restoring state, as the _*{{NettyShuffleService}}*_ is stateless in 
terms of cluster granularity.

 ** However, _*{{Celeborn#ShuffleMaster}}*_ needs to communicate with the 
Celeborn Master, necessitating the storage of certain cluster-level states, 
such as {_}*{{CelebornAppId}}*{_}.

 ** In my opinion, the cluster-granularity store state interface can be execute 
after {_}*{{ShuffleMaster#start}}*{_}, and _*{{ShuffleMaster#start}}*_ adding a 
snapshot parameter to restore the cluster state.


> Adapting batch job progress recovery to Apache Celeborn
> -------------------------------------------------------
>
>                 Key: FLINK-36873
>                 URL: https://issues.apache.org/jira/browse/FLINK-36873
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Network
>            Reporter: xuhuang
>            Priority: Major
>
> I've identified several issues while attempting to enable Apache Celeborn to 
> support Flink batch job recovery.
> *1. RestoreState Invocation*
>  * The method _*{{ShuffleMaster#restoreState}}*_ should be triggered 
> regardless of whether the Flink job requires recovery.
>  * This method signifies that a Flink job needs to restore its state, but it 
> is currently called only after {_}*{{ShuffleMaster#registerJob}}*{_}.
>  * Consequently, it might not be invoked if the Flink job does not require 
> recovery.
>  * For Celeborn, this creates uncertainty regarding when to initialize 
> certain components; if the initialization occurs during 
> {*}_{{registerJob}}_{*}, it may lack essential information from the stored 
> snapshot, whereas if it takes place during {*}_{{restoreState}}_{*}, there is 
> a risk that it may not be invoked at all.
> *2. JobID Information Requirement* 
>  * Several methods in _*{{ShuffleMaster}}*_ should include _*JobID*_ 
> information: {*}_{{ShuffleMaster#supportsBatchSnapshot}}_{*}, 
> {_}*{{ShuffleMaster#snapshotState}}*{_}, and 
> {_}*{{ShuffleMaster#restoreState}}*{_}.
>  * These methods are intended for job-granularity state storage and 
> restoration, but they currently do not incorporate JobID.
>  * Consequently, Celeborn is unable to determine which job triggered these 
> calls.
> *3. Hybrid Shuffle support batch job recovery* 
>  * The hybrid shuffle implementation supports batch job recovery; however, as 
> it stands, Flink's hybrid shuffle does not facilitate this.
>  * Because the Celeborn integration with Flink's hybrid shuffle utilizes 
> {{{}*_Flink#NettyShuffleService_*{}}}, and since that service does not 
> support batch job recovery for hybrid shuffle, the integration faces 
> limitations.
> {*}4. Cluster granularity store/restore state{*}:
>  * Presently, _*{{ShuffleMaster}}*_ only offers job-granularity interfaces 
> for storing and restoring state, as the _*{{NettyShuffleService}}*_ is 
> stateless in terms of cluster granularity.
>  * However, _*{{Celeborn#ShuffleMaster}}*_ needs to communicate with the 
> Celeborn Master, necessitating the storage of certain cluster-level states, 
> such as {_}*{{CelebornAppId}}*{_}.
>  * In my opinion, the cluster-granularity store state interface can be 
> execute after {_}*{{ShuffleMaster#start}}*{_}, and 
> _*{{ShuffleMaster#start}}*_ adding a snapshot parameter to restore the 
> cluster state.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to