zentol commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1326042689
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##########
@@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex
executionVertex, JobExcepti
@Override
public void onNewResourcesAvailable() {
- maybeRescale();
+ rescaleWhenCooldownPeriodIsOver();
}
@Override
public void onNewResourceRequirements() {
- maybeRescale();
+ rescaleWhenCooldownPeriodIsOver();
}
private void maybeRescale() {
- if (context.shouldRescale(getExecutionGraph())) {
- getLogger().info("Can change the parallelism of job. Restarting
job.");
+ final Duration timeSinceLastRescale = timeSinceLastRescale();
+ rescaleScheduled = false;
+ final boolean shouldForceRescale =
+ (scalingIntervalMax != null)
+ && (timeSinceLastRescale.compareTo(scalingIntervalMax)
> 0)
+ && (lastRescale != Instant.EPOCH); // initial rescale
is not forced
+ if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) {
+ if (shouldForceRescale) {
+ getLogger()
+ .info(
+ "Time since last rescale ({}) > {} ({}).
Force-changing the parallelism of the job. Restarting the job.",
+ timeSinceLastRescale,
+
JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+ scalingIntervalMax);
+ } else {
+ getLogger().info("Can change the parallelism of the job.
Restarting the job.");
+ }
+ lastRescale = Instant.now();
context.goToRestarting(
getExecutionGraph(),
Review Comment:
> with this scenario we will restart every 5 min (as the timeout is
exceeded min-increase check is overridden)
This depends on what your starting point for the timeout is. It shouldn't be
fixed to the start of the job, but be reset after a job was rescaled.
So, slot arrives at 24h mark, job is rescaled, timeout is reset; for the
next hour we only scale up if min-parallel-increase is satisfied, if a slot
arrives after that we'd immediately rescale.
I still think this'd be problematic though as described above.
I think a good way to think about it is like this: We probably all had some
situation in our live where we thought "oh If only I had done that <some time>
earlier." Can be anything, really, let's say it's washing the dishes. You can
still do that _now_, but had you done it earlier you may _now_ have been able
to use your time better.
That's ultimately what we're doing here.
We look back, see that we had this slot the whole time, and go "ah, if only
I had rescaled sooner, but better late than never".
But on the flip-side, if you lost that slot in the mean time and look back
you'd think "good job me, that was a good call to ignore that slot!".
This is all about stability; "I could have rescaled and it wouldn't have
been a problem.". But you can only conclude that after having out that slot for
some period of time; otherwise you're just guessing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]