[
https://issues.apache.org/jira/browse/FLINK-36873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
xuhuang updated FLINK-36873:
----------------------------
Description:
I've identified several issues while attempting to enable Apache Celeborn to
support Flink batch job recovery.
*1. RestoreState Invocation*
* The method _*{{ShuffleMaster#restoreState}}*_ should be triggered regardless
of whether the Flink job requires recovery.
* This method signifies that a Flink job needs to restore its state, but it is
currently called only after {_}*{{ShuffleMaster#registerJob}}*{_}.
* Consequently, it might not be invoked if the Flink job does not require
recovery.
* For Celeborn, this creates uncertainty regarding when to initialize certain
components; if the initialization occurs during {*}_{{registerJob}}_{*}, it may
lack essential information from the stored snapshot, whereas if it takes place
during {*}_{{restoreState}}_{*}, there is a risk that it may not be invoked at
all.
*2. JobID Information Requirement*
* Several methods in _*{{ShuffleMaster}}*_ should include _*JobID*_
information: {*}_{{ShuffleMaster#supportsBatchSnapshot}}_{*},
{_}*{{ShuffleMaster#snapshotState}}*{_}, and
{_}*{{ShuffleMaster#restoreState}}*{_}.
* These methods are intended for job-granularity state storage and
restoration, but they currently do not incorporate JobID.
* Consequently, Celeborn is unable to determine which job triggered these
calls.
*3. Hybrid Shuffle support batch job recovery*
* The hybrid shuffle implementation supports batch job recovery; however, as
it stands, Flink's hybrid shuffle does not facilitate this.
* Because the Celeborn integration with Flink's hybrid shuffle utilizes
{{{}*_Flink#NettyShuffleService_*{}}}, and since that service does not support
batch job recovery for hybrid shuffle, the integration faces limitations.
{*}4. Cluster granularity store/restore state{*}:
* Presently, _*{{ShuffleMaster}}*_ only offers job-granularity interfaces for
storing and restoring state, as the _*{{NettyShuffleService}}*_ is stateless in
terms of cluster granularity.
* However, _*{{Celeborn#ShuffleMaster}}*_ needs to communicate with the
Celeborn Master, necessitating the storage of certain cluster-level states,
such as {_}*{{CelebornAppId}}*{_}.
* In my opinion, the cluster-granularity store state interface can be execute
after {_}*{{ShuffleMaster#start}}*{_}, and _*{{ShuffleMaster#start}}*_ adding a
snapshot parameter to restore the cluster state.
was:
I've identified several issues while attempting to enable Apache Celeborn to
support Flink job recovery.
# *Restore State Invocation*
** The method _*{{ShuffleMaster#restoreState}}*_ should be triggered
regardless of whether the Flink job requires recovery.
** This method signifies that a Flink job needs to restore its state, but it
is currently called only after {_}*{{ShuffleMaster#registerJob}}*{_}.
** Consequently, it might not be invoked if the Flink job does not require
recovery.
** For Celeborn, this creates uncertainty regarding when to initialize certain
components; if the initialization occurs during {*}_{{registerJob}}_{*}, it may
lack essential information from the stored snapshot, whereas if it takes place
during {*}_{{restoreState}}_{*}, there is a risk that it may not be invoked at
all.
# *JobID Information Requirement*
** Several methods in _*{{ShuffleMaster}}*_ should include _*JobID*_
information: {*}_{{ShuffleMaster#supportsBatchSnapshot}}_{*},
{_}*{{ShuffleMaster#snapshotState}}*{_}, and
{_}*{{ShuffleMaster#restoreState}}*{_}.
** These methods are intended for job-granularity state storage and
restoration, but they currently do not incorporate JobID.
** Consequently, Celeborn is unable to determine which job triggered these
calls.
# *Hybrid Shuffle support batch job recovery*
** The hybrid shuffle implementation supports batch job recovery; however, as
it stands, Flink's hybrid shuffle does not facilitate this.
** Because the Celeborn integration with Flink's hybrid shuffle utilizes
{{{}*_Flink#NettyShuffleService_*{}}}, and since that service does not support
batch job recovery for hybrid shuffle, the integration faces limitations.
# {*}Cluster Granularity Store/Restore State{*}:
** Presently, _*{{ShuffleMaster}}*_ only offers job-granularity interfaces for
storing and restoring state, as the _*{{NettyShuffleService}}*_ is stateless in
terms of cluster granularity.
** However, _*{{Celeborn#ShuffleMaster}}*_ needs to communicate with the
Celeborn Master, necessitating the storage of certain cluster-level states,
such as {_}*{{CelebornAppId}}*{_}.
** In my opinion, the cluster-granularity store state interface can be execute
after {_}*{{ShuffleMaster#start}}*{_}, and _*{{ShuffleMaster#start}}*_ adding a
snapshot parameter to restore the cluster state.
> Adapting batch job progress recovery to Apache Celeborn
> -------------------------------------------------------
>
> Key: FLINK-36873
> URL: https://issues.apache.org/jira/browse/FLINK-36873
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Network
> Reporter: xuhuang
> Priority: Major
>
> I've identified several issues while attempting to enable Apache Celeborn to
> support Flink batch job recovery.
> *1. RestoreState Invocation*
> * The method _*{{ShuffleMaster#restoreState}}*_ should be triggered
> regardless of whether the Flink job requires recovery.
> * This method signifies that a Flink job needs to restore its state, but it
> is currently called only after {_}*{{ShuffleMaster#registerJob}}*{_}.
> * Consequently, it might not be invoked if the Flink job does not require
> recovery.
> * For Celeborn, this creates uncertainty regarding when to initialize
> certain components; if the initialization occurs during
> {*}_{{registerJob}}_{*}, it may lack essential information from the stored
> snapshot, whereas if it takes place during {*}_{{restoreState}}_{*}, there is
> a risk that it may not be invoked at all.
> *2. JobID Information Requirement*
> * Several methods in _*{{ShuffleMaster}}*_ should include _*JobID*_
> information: {*}_{{ShuffleMaster#supportsBatchSnapshot}}_{*},
> {_}*{{ShuffleMaster#snapshotState}}*{_}, and
> {_}*{{ShuffleMaster#restoreState}}*{_}.
> * These methods are intended for job-granularity state storage and
> restoration, but they currently do not incorporate JobID.
> * Consequently, Celeborn is unable to determine which job triggered these
> calls.
> *3. Hybrid Shuffle support batch job recovery*
> * The hybrid shuffle implementation supports batch job recovery; however, as
> it stands, Flink's hybrid shuffle does not facilitate this.
> * Because the Celeborn integration with Flink's hybrid shuffle utilizes
> {{{}*_Flink#NettyShuffleService_*{}}}, and since that service does not
> support batch job recovery for hybrid shuffle, the integration faces
> limitations.
> {*}4. Cluster granularity store/restore state{*}:
> * Presently, _*{{ShuffleMaster}}*_ only offers job-granularity interfaces
> for storing and restoring state, as the _*{{NettyShuffleService}}*_ is
> stateless in terms of cluster granularity.
> * However, _*{{Celeborn#ShuffleMaster}}*_ needs to communicate with the
> Celeborn Master, necessitating the storage of certain cluster-level states,
> such as {_}*{{CelebornAppId}}*{_}.
> * In my opinion, the cluster-granularity store state interface can be
> execute after {_}*{{ShuffleMaster#start}}*{_}, and
> _*{{ShuffleMaster#start}}*_ adding a snapshot parameter to restore the
> cluster state.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)