1996fanrui commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1299169121
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##########
@@ -144,6 +188,17 @@ private void maybeRescale() {
}
}
+ private Duration timeSinceLastRescale() {
+ return Duration.between(lastRescale, Instant.now());
+ }
+
+ private void rescaleWhenCooldownPeriodIsOver() {
+ context.runIfConditionOrSchedule(
+ timeSinceLastRescale().compareTo(scalingIntervalMin) > 0,
+ this::maybeRescale,
+ scalingIntervalMin); // reset cooldown period
Review Comment:
Schedule the `maybeRescale` after `scalingIntervalMin` might not make sense.
For example, `scalingIntervalMin` is 30 seconds, `lastRescale` is
`08:01:05`, and now is `08:01:10`.
From your code, the `maybeRescale` is called at `08:01:40`, however it
should be called at `08:01:35`, right?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##########
@@ -144,6 +188,17 @@ private void maybeRescale() {
}
}
+ private Duration timeSinceLastRescale() {
+ return Duration.between(lastRescale, Instant.now());
+ }
+
+ private void rescaleWhenCooldownPeriodIsOver() {
Review Comment:
`rescaleWhenCooldownPeriodIsOver` should consider one case that it's called
in multiple times.
For example, scalingIntervalMin is 30 seconds, `lastRescale` is 08:01:05.
- The `rescaleWhenCooldownPeriodIsOver` is called at 08:01:10, it will
schedule `maybeRescale` at `08:01:35`.
- The `rescaleWhenCooldownPeriodIsOver` maybe called at 08:01:15 again, it
will schedule `maybeRescale` at `08:01:35` again.
The second or repeated schedule should be ignored.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##########
@@ -210,6 +265,15 @@ interface Context
* @return a ScheduledFuture representing pending completion of the
task
*/
ScheduledFuture<?> runIfState(State expectedState, Runnable action,
Duration delay);
+
+ /**
+ * Runs the given action immediately or after a delay depending on the
given condition.
+ *
+ * @param condition if met, the action is executed immediately or
scheduled otherwise
+ * @param action action to run
+ * @param delay delay after which to run the action if the condition
is not met
+ */
+ void runIfConditionOrSchedule(boolean condition, Runnable action,
Duration delay);
Review Comment:
This interface isn't necessary, if
`timeSinceLastRescale().compareTo(scalingIntervalMin)` is true, we can call
`maybeRescale` directly.
If it's false, it's better to call `runIfState` to schedule.
BTW, if the state isn't this(Executing), the `maybeRescale` shouldn't be
called. That's why the `runIfState` name includes `IfState`. The
`runIfConditionOrSchedule` doesn't consider the `IfState`, it might have bug.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]