Hi, Matthias.
Thank you very much for the comments.

> - Why do we mark the rescale operation as IGNORED if an (repeatable) 
> exception causes a restart?

Here we mainly discuss two scenarios:
- If a recoverable exception occurs while there is an ongoing rescale, the 
current rescale will be forcibly terminated and saved. Then, a new rescale will 
be created to record the subsequent rescale information.
- If there is no ongoing rescale, a new rescale will be created directly to 
record the information.

> That's because the failure will create a new rescale instance that will be 
> saved if the resources changed as part of the failure handling/job restart 
> process?

If, during failure handling and job recovery (i.e., when there is an ongoing 
rescale), a new resource requirement change or new available resource change 
occurs (provided that such a change can trigger a new rescale), the ongoing 
rescale will be forcibly terminated and saved.
Then, the system will create a new rescale to continue recording the subsequent 
process.

> - Thinking about the GlobalRescaleId: Should we still keep it considering 
> that we're not preserving the value on JM failovers (because of the missing 
> HA functionality) in this FLIP? We could remove it and add it as part of a 
> HA-support follow-up. Providing it despite the missing HA support might be 
> misleading in case a JM failover actually happens. WDYT?

Nice proposal! That makes sense to me~

> - For the job vertices, you introduced a 5th resource type (previous, 
> acquired, required, min bound, max bound). What's the difference? I guess, 
> min bound corresponds to "sufficient resources" and max bound corresponds to 
> "desired resources". What are required resources here in that case? It might 
> be good to stick to the naming that was introduced in FLIP-472 [1].

I’ve already made the latest changes.
BTW, the original intention here was to record information as similar as 
possible to the one from #updateJobResourcesRequirements requests. Now I 
believe it’s quite necessary to remove the previous design and use a unified 
description instead, since this makes things more consistent and easier to 
understand. Many thanks for your suggestions.

> - "Job Vertices Rescaled Information" and "Slots / Resources Rescaled 
> Information" seem to have redundant information now: "parallelism of job 
> vertex" and "slot number of the slot sharing group" are more or less the 
> same. The parallelism is defined per job vertex (along the 
> SlotSharingGroupId). The SlotSharingGroup holds the ResourceProfile (but no 
> parallelism).

Please let me to explain the reasoning:
- The parallelism of job vertices is kept so that users can clearly see the 
parallelism of each job vertex.
- Adding information about the number of slots in a SlotSharingGroup is 
intended to give users a more intuitive understanding of how many resources 
this type of group consumes. Of course, even if we remove the slot number in 
the SlotSharingGroup, users could still calculate it by themselves based on job 
vertices paralllelsims, but that would require them to be familiar with the 
AdaptiveScheduler’s logic for computing slot numbers.
- In addition, if we only keep the information about the number of slots in the 
SlotSharingGroup and drop the parallelism information at the job-vertex level, 
then users would lose the fine-grained parallelism details of job vertices 
during a rescale. They would only be able to see SlotSharingGroup-level 
information, which is somewhat like parallelism but less precise.
- If necessary, I’m willing to remove the slot-number-related field from the 
SlotSharingGroup description.
Please let me know your thoughts.

> - "Other information" section contains a comment message for the reason why a 
> rescale event terminates. Reason might be a better term than comment 
> (especially because we're relying on an enum here rather than plain text).

Good idea — I’ve already made the corresponding changes.

> - Based on the current proposal of the FLIP, the intermediate states of the 
> rescale event actually match the AdaptiveScheduler's states. Only the final 
> states are exclusive to the scale event?  Would it make sense to rely on the 
> AdaptiveScheduler state entirely for the intermediate state and a dedicated 
> field "terminalState"?

Yes, your understanding is correct. This approach works well for me, and I’ve 
already made the corresponding changes in the wiki document.

> - Why does StateTransitions need to be extended with getRescaleTimeline()?

- This is because we want to quickly obtain a reference to the RescaleTimeline, 
and maintain rescale information through it.
- It was introduced into the StateTransitions class so that all states can call 
#getRescaleTimeline via this interface.
- After re-examining the scope affected by rescale maintenance operations, it 
might be more precise to introduce this method into 
StateWithExecutionGraph.Context / StateWithoutExecutionGraph.Context [1] 
instead.

> Shouldn't the RescaleTimeline be located in the AdaptiveScheduler?

Yes. Whether #getRescaleTimeline is introduced in StateTransitions or in 
StateWithExecutionGraph.Context / StateWithoutExecutionGraph.Context, the 
RescaleTimeline should still be part of the AdaptiveScheduler.

> Instead, the State.Context (essentially, the AdaptiveScheduler) should 
> provide methods for updating the current rescale process object that lives in 
> the AdaptiveScheduler. WDYT?

Sounds good to me. Once #getRescaleTimeline is introduced in State.Context, it 
can directly obtain a reference to the RescaleTimeline, allowing quick 
operations on the current rescale.
What do you think?


> - Section "How to process rescales storage ?": ExecutionGraphInfo could be 
> modified to include the Rescale history (along the exception history). The 
> data can be then saved in the ExecutionGraphInfoStore. All you have to do is 
> to store the rescale history in AdaptiveScheduler and an object that holds 
> the infos of the current rescale process. If this current rescale process 
> terminates, an immutable rescale snapshot of this event is created that is 
> saved in the rescale history.

Sorry for my previous wording was ambiguous and inconsistent with the 
documentation. Thanks a lot for pointing it out. This is actually what I 
intended to do, and I’ve already corrected the corresponding description in the 
document.

[1] 
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=334760525#FLIP495:SupportAdaptiveSchedulerrecordandquerytherescalehistory-Howtohold/maintaintherescaleshistory?

Best,
Yuepeng Pan






| |
Roc Marshal
|
|
flin...@126.com
|


---- Replied Message ----
| From | Matthias Pohl<map...@apache.org> |
| Date | 08/21/2025 18:54 |
| To | dev@flink.apache.org<dev@flink.apache.org> |
| Cc | Yuepeng Pan<panyuep...@apache.org> |
| Subject | Re: [DISCUSS] FLIP-495: Support AdaptiveScheduler record and query 
the rescale history |
Thanks for the update, Yuepeng. Here are my remarks:

- Why do we mark the rescale operation as IGNORED if an (repeatable)
exception causes a restart? That's because the failure will create a new
rescale instance that will be saved if the resources changed as part of the
failure handling/job restart process?

- Thinking about the GlobalRescaleId: Should we still keep it considering
that we're not preserving the value on JM failovers (because of the missing
HA functionality) in this FLIP? We could remove it and add it as part of a
HA-support follow-up. Providing it despite the missing HA support might be
misleading in case a JM failover actually happens. WDYT?

- For the job vertices, you introduced a 5th resource type (previous,
acquired, required, min bound, max bound). What's the difference? I guess,
min bound corresponds to "sufficient resources" and max bound corresponds
to "desired resources". What are required resources here in that case? It
might be good to stick to the naming that was introduced in FLIP-472 [1].

- "Job Vertices Rescaled Information" and "Slots / Resources Rescaled
Information" seem to have redundant information now: "parallelism of job
vertex" and "slot number of the slot sharing group" are more or less the
same. The parallelism is defined per job vertex (along the
SlotSharingGroupId). The SlotSharingGroup holds the ResourceProfile (but no
parallelism).

- "Other information" section contains a comment message for the reason why
a rescale event terminates. Reason might be a better term than comment
(especially because we're relying on an enum here rather than plain text).

- Based on the current proposal of the FLIP, the intermediate states of the
rescale event actually match the AdaptiveScheduler's states. Only the final
states are exclusive to the scale event? Would it make sense to rely on the
AdaptiveScheduler state entirely for the intermediate state and a dedicated
field "terminalState"?

- Why does StateTransitions need to be extended with getRescaleTimeline()?
Shouldn't the RescaleTimeline be located in the AdaptiveScheduler? Instead,
the State.Context (essentially, the AdaptiveScheduler) should provide
methods for updating the current rescale process object that lives in the
AdaptiveScheduler. WDYT?

- Section "How to process rescales storage ?": ExecutionGraphInfo could be
modified to include the Rescale history (along the exception history). The
data can be then saved in the ExecutionGraphInfoStore. All you have to do
is to store the rescale history in AdaptiveScheduler and an object that
holds the infos of the current rescale process. If this current rescale
process terminates, an immutable rescale snapshot of this event is created
that is saved in the rescale history.

Matthias

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-472%3A+Aligning+timeout+logic+in+the+AdaptiveScheduler%27s+WaitingForResources+and+Executing+states

On Tue, Aug 19, 2025 at 11:14 AM Yuepeng Pan <panyuep...@apache.org> wrote:

Bumping this thread kindly. Thanks!

Best,
Yuepeng Pan


At 2025-08-13 14:52:26, "Yuepeng Pan" <panyuep...@apache.org> wrote:

Hi, Matthias,
Thank you  very much for your comments!
I have carefully read your reply and made some changes in the hope of making 
improvements.
Please help take a look.

For your comments:

1. You mention a few options for when it comes to storing the data which is
good. The FLIP doesn't point out, though, what option you're going to go
for as part of this FLIP (as far as I can see). It would be good to only
outline the option to go for in the FLIP and list the other options as
rejected alternatives (with the pro's and con's). I think it make sense to
go for option 3 (i.e. following what's done for the ExecutionGraphInfoStore
for now). The other options can be considered as a follow-up.

This is very meaningful. Based on this comment, I have kept option 3 in its 
original place and moved the other candidate options to [1].

2. About the terminal states of a rescaling (i.e. IGNORED, FAILED,
COMPLETED): Can we we clarify in the FLIP under what conditions the
rescaling transitions into each of the three terminal states?

Yes, this is a reasonable request for understanding and explaining the logic of 
transitions to terminated states.
A new subsection [2] has been added to address this.

3. The section "The information to record in a rescale event" could be
restructured in four sections (to remove redundancy):
a) The IDs (Rescale
ID, resourceRequirementsEpochID, subRescaleIdOfResourceRequirementsEpochID):
What about making these names easier to read: GlobalRescaleID, RescaleUUID,
RescaleAttemptId)
b) Per-vertex data which includes: JobVertexID, JobVertexName,
SlotSharingGroupId, the different parallelisms (pre-rescale, sufficient,
desired, post-rescale)
c) The SlotSharingGroup information: SlotSharingGroupId, name,
ResourceProfile
d) Other information: Timestamps of state transitions, etc. as laid out in
the FLIP already

That makes sense to me. Please check [3] for the latest updates in this part.

4. The FLIP doesn't explain how the data is passed through the
AdaptiveScheduler states. We should be handling some kind of
RescaleSnapshot that is passed through the different states and updated and
its final state is stored somewhere within AdaptiveScheduler in the end, I
guess. Can we clarify that in the FLIP?

Indeed — this was missing in the original FLIP. To address this, I have added 
[4], which focuses on describing how a Rescale is represented,
and how we can quickly pass and maintain the Rescale history.

5. You mention the config parameters for the cache in the public interface
section. But there's no mentioning of any caching and how that is used
within the FLIP.

Sorry for the rough description in the previous version.
Since this part belongs to the REST API acceleration mechanism for rescaling, 
and Option 6 seems reasonable to me,
I plan to add it to FLIP-487 once the design of FLIP-495 has reached consensus.
Of course, if needed, I'd be happy to clarify the usage and purpose of this 
parameter in the current email thread.

6. The REST endpoint is probably better suited in FLIP-487. FLIP-495 should
be about the actual implementation details and how the data is stored
internally whereas FLIP-487 is about exposing the information to the
outside through the REST API and the Flink UI. That would be a way to
decrease the scope of FLIP-495. WDYT?

That sounds nice to me. Therefore, I have moved all REST API–related changes to 
FLIP-487.
BTW, to avoid repetitive changes in FLIP-487, I'll start organizing FLIP-487 
after FLIP-495 has been finalized.

Looking forward to your next review!

[1]https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=334760525#FLIP495:SupportAdaptiveSchedulerrecordandquerytherescalehistory-Aboutrescaleeventsstorage.1
[2]https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=334760525#FLIP495:SupportAdaptiveSchedulerrecordandquerytherescalehistory-ThemainscenarioswhereRescalestatusswitchestoterminated
[3]https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=334760525#FLIP495:SupportAdaptiveSchedulerrecordandquerytherescalehistory-Theinformationtorecordinarescaleevent
[4]https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=334760525#FLIP495:SupportAdaptiveSchedulerrecordandquerytherescalehistory-InternalInterfaces


Best regards,
Yuepeng Pan





At 2025-08-10 23:54:37, "Matthias Pohl" <map...@apache.org> wrote:
Hi Yuepeng,
thanks for reminding me of this FLIP. I went over it and have a few items
which we might need to address before we can actually finalize the vote:

1. You mention a few options for when it comes to storing the data which is
good. The FLIP doesn't point out, though, what option you're going to go
for as part of this FLIP (as far as I can see). It would be good to only
outline the option to go for in the FLIP and list the other options as
rejected alternatives (with the pro's and con's). I think it make sense to
go for option 3 (i.e. following what's done for the ExecutionGraphInfoStore
for now). The other options can be considered as a follow-up.
2. About the terminal states of a rescaling (i.e. IGNORED, FAILED,
COMPLETED): Can we we clarify in the FLIP under what conditions the
rescaling transitions into each of the three terminal states?
3. The section "The information to record in a rescale event" could be
restructured in four sections (to remove redundancy):
a) The IDs (Rescale
ID, resourceRequirementsEpochID, subRescaleIdOfResourceRequirementsEpochID):
What about making these names easier to read: GlobalRescaleID, RescaleUUID,
RescaleAttemptId)
b) Per-vertex data which includes: JobVertexID, JobVertexName,
SlotSharingGroupId, the different parallelisms (pre-rescale, sufficient,
desired, post-rescale)
c) The SlotSharingGroup information: SlotSharingGroupId, name,
ResourceProfile
d) Other information: Timestamps of state transitions, etc. as laid out in
the FLIP already
4. The FLIP doesn't explain how the data is passed through the
AdaptiveScheduler states. We should be handling some kind of
RescaleSnapshot that is passed through the different states and updated and
its final state is stored somewhere within AdaptiveScheduler in the end, I
guess. Can we clarify that in the FLIP?
5. You mention the config parameters for the cache in the public interface
section. But there's no mentioning of any caching and how that is used
within the FLIP.
6. The REST endpoint is probably better suited in FLIP-487. FLIP-495 should
be about the actual implementation details and how the data is stored
internally whereas FLIP-487 is about exposing the information to the
outside through the REST API and the Flink UI. That would be a way to
decrease the scope of FLIP-495. WDYT?

Best,
Matthias


On Mon, Mar 24, 2025 at 11:37 AM Yuepeng Pan <panyuep...@apache.org> wrote:

Hi, Community,

There haven’t been any further responses to this email over the past few
days.
I'd like to initiate a vote on the current proposal[1] in the next few
days.
Please rest assured that I’m proceeding cautiously and not rushing the
process.
If there are any concerns about this FLIP-495[1],
I will gladly pause and make the adjustments.

Best regards,
Yuepeng Pan

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-495%3A+Support+AdaptiveScheduler+record+and+query+the+rescale+history


On 2024/12/17 15:18:45 Yuepeng Pan wrote:
Hi community,




We discussed several aspects of FLIP-487[1] 'Show history of rescales in
Web UI for AdaptiveScheduler'
and received a lot of valuable feedback. Based on the suggestions from
the email thread[2],
we plan to split the original proposal for FLIP-487[1].




The current email thread and the FLIP-495[3] wiki will be used to
discuss 'Support AdaptiveScheduler in recording and querying the rescale
history',
while FLIP-487[1] will primarily focus on displaying-related design
content




Looking forward to any feedback and opinions on FLIP-495[3].




[1]
https://cwiki.apache.org/confluence/display/FLINK/%5BWIP%5D+FLIP-487%3A+Show+history+of+rescales+in+Web+UI+for+AdaptiveScheduler

[2] https://lists.apache.org/thread/f4md4btkf006mxcxf66bng1kfz0rsn8c

[3]
https://cwiki.apache.org/confluence/display/FLINK/%5BWIP%5D+FLIP-495%3A+Support+AdaptiveScheduler+record+and+query+the+rescale+history




Thank you very much.




Best,

Regards.

Yuepeng Pan



Reply via email to