1996fanrui commented on code in PR #922:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/922#discussion_r1881470489
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DelayedScaleDown.java:
##########
@@ -30,23 +30,78 @@
import java.time.Instant;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.Map;
+import static org.apache.flink.util.Preconditions.checkState;
+
/** All delayed scale down requests. */
public class DelayedScaleDown {
+ @Data
+ private static class RecommendedParallelism {
+ @Nonnull private final Instant triggerTime;
+ private final int parallelism;
+
+ @JsonCreator
+ public RecommendedParallelism(
+ @Nonnull @JsonProperty("triggerTime") Instant triggerTime,
+ @JsonProperty("parallelism") int parallelism) {
+ this.triggerTime = triggerTime;
+ this.parallelism = parallelism;
+ }
+ }
+
/** The delayed scale down info for vertex. */
@Data
public static class VertexDelayedScaleDownInfo {
private final Instant firstTriggerTime;
- private int maxRecommendedParallelism;
+ // TODO : add the comment to explain how to calculate the max
parallelism within the sliding
+ // window.
+ private final LinkedList<RecommendedParallelism>
recommendedParallelisms;
+
+ public VertexDelayedScaleDownInfo(Instant firstTriggerTime) {
+ this.firstTriggerTime = firstTriggerTime;
+ this.recommendedParallelisms = new LinkedList<>();
+ }
@JsonCreator
public VertexDelayedScaleDownInfo(
@JsonProperty("firstTriggerTime") Instant firstTriggerTime,
- @JsonProperty("maxRecommendedParallelism") int
maxRecommendedParallelism) {
+ @JsonProperty("recommendedParallelisms")
+ LinkedList<RecommendedParallelism>
recommendedParallelisms) {
this.firstTriggerTime = firstTriggerTime;
- this.maxRecommendedParallelism = maxRecommendedParallelism;
+ this.recommendedParallelisms = recommendedParallelisms;
+ }
+
+ /** Record current recommended parallelism. */
+ public void recordRecommendedParallelism(Instant triggerTime, int
parallelism) {
+
+ // Remove all recommended parallelisms that are lower than the
latest parallelism.
+ while (!recommendedParallelisms.isEmpty()
+ && recommendedParallelisms.peekLast().getParallelism() <=
parallelism) {
+ recommendedParallelisms.pollLast();
+ }
Review Comment:
Thanks @gyfora for the comment in advance, I'm still adding more tests and
comments for this PR.
I have some other works to do this week, so I expect this PR will be ready
next week.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]