1996fanrui commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1356383787
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##########
@@ -124,23 +157,74 @@ private void handleDeploymentFailure(ExecutionVertex
executionVertex, JobExcepti
@Override
public void onNewResourcesAvailable() {
- maybeRescale();
+ rescaleWhenCooldownPeriodIsOver();
}
@Override
public void onNewResourceRequirements() {
- maybeRescale();
+ rescaleWhenCooldownPeriodIsOver();
+ }
+
+ /** Force rescaling as long as the target parallelism is different from
the current one. */
+ private void forceRescale() {
+ if (context.shouldRescale(getExecutionGraph(), true)) {
+ getLogger()
+ .info(
+ "Added resources are still there after {}
time({}), force a rescale.",
+
JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+ scalingIntervalMax);
+ context.goToRestarting(
+ getExecutionGraph(),
+ getExecutionGraphHandler(),
+ getOperatorCoordinatorHandler(),
+ Duration.ofMillis(0L),
+ getFailures());
+ }
}
+ /**
+ * Rescale the job if added resource meets {@link
JobManagerOptions#MIN_PARALLELISM_INCREASE}.
+ * Otherwise, force a rescale after {@link
JobManagerOptions#SCHEDULER_SCALING_INTERVAL_MAX} if
+ * the resource is still there.
+ */
private void maybeRescale() {
- if (context.shouldRescale(getExecutionGraph())) {
- getLogger().info("Can change the parallelism of job. Restarting
job.");
+ rescaleScheduled = false;
+ if (context.shouldRescale(
+ getExecutionGraph(), false)) { //
JobManagerOptions#MIN_PARALLELISM_INCREASE met
+ getLogger().info("Can change the parallelism of the job.
Restarting the job.");
context.goToRestarting(
getExecutionGraph(),
getExecutionGraphHandler(),
getOperatorCoordinatorHandler(),
Duration.ofMillis(0L),
getFailures());
+ } else if (scalingIntervalMax
+ != null) { // JobManagerOptions#MIN_PARALLELISM_INCREASE not
met
Review Comment:
I see all comments and code aren't same line in flink repo. Please move
these comments before the code, and update the comments using the
`forceRescale`.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##########
@@ -124,23 +157,74 @@ private void handleDeploymentFailure(ExecutionVertex
executionVertex, JobExcepti
@Override
public void onNewResourcesAvailable() {
- maybeRescale();
+ rescaleWhenCooldownPeriodIsOver();
}
@Override
public void onNewResourceRequirements() {
- maybeRescale();
+ rescaleWhenCooldownPeriodIsOver();
+ }
+
+ /** Force rescaling as long as the target parallelism is different from
the current one. */
+ private void forceRescale() {
+ if (context.shouldRescale(getExecutionGraph(), true)) {
+ getLogger()
+ .info(
+ "Added resources are still there after {}
time({}), force a rescale.",
+
JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+ scalingIntervalMax);
+ context.goToRestarting(
+ getExecutionGraph(),
+ getExecutionGraphHandler(),
+ getOperatorCoordinatorHandler(),
+ Duration.ofMillis(0L),
+ getFailures());
+ }
}
+ /**
+ * Rescale the job if added resource meets {@link
JobManagerOptions#MIN_PARALLELISM_INCREASE}.
+ * Otherwise, force a rescale after {@link
JobManagerOptions#SCHEDULER_SCALING_INTERVAL_MAX} if
+ * the resource is still there.
+ */
private void maybeRescale() {
- if (context.shouldRescale(getExecutionGraph())) {
- getLogger().info("Can change the parallelism of job. Restarting
job.");
+ rescaleScheduled = false;
+ if (context.shouldRescale(
+ getExecutionGraph(), false)) { //
JobManagerOptions#MIN_PARALLELISM_INCREASE met
Review Comment:
`JobManagerOptions#MIN_PARALLELISM_INCREASE` is the implementation inside of
`context.shouldRescale`, and we should interface-oriented rather than
implementation-oriented,
So here shouldn't mention `MIN_PARALLELISM_INCREASE`, we just care about
`forceRescale` is ture or false here. Does it make sense?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##########
@@ -67,13 +77,36 @@ class Executing extends StateWithExecutionGraph implements
ResourceListener {
this.context = context;
Preconditions.checkState(
executionGraph.getState() == JobStatus.RUNNING, "Assuming
running execution graph");
+ this.scalingIntervalMin = scalingIntervalMin;
+ this.scalingIntervalMax = scalingIntervalMax;
+ this.lastRescale =
+ Instant.now(); // Executing is recreated with each restart
(when we rescale)
+ // we consider the first execution of the pipeline as a rescale event
+ Preconditions.checkState(
+ !scalingIntervalMin.isNegative(),
+ "%s must be positive integer or 0",
+ JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN.key());
+ if (scalingIntervalMax != null) {
+ Preconditions.checkState(
+ scalingIntervalMax.compareTo(scalingIntervalMin) > 0,
+ "%s(%d) must be greater than %s(%d)",
+ JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+ scalingIntervalMax,
+ JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN.key(),
+ scalingIntervalMin);
+ }
deploy();
// check if new resources have come available in the meantime
context.runIfState(this, this::maybeRescale, Duration.ZERO);
}
+ @VisibleForTesting
+ void setLastRescale(Instant lastRescale) {
Review Comment:
Can the `lastRescale` be passed from constructor?
If so, the `lastRescale` can be final, it's safer. I see it can works,
please help check, thanks~
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]