mxm commented on code in PR #502:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/502#discussion_r1065794519
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java:
##########
@@ -102,6 +103,7 @@ public CollectedMetrics updateMetrics(
var stableTime = currentJobUpdateTs.plus(stabilizationDuration);
if (now.isBefore(stableTime)) {
// As long as we are stabilizing, collect no metrics at all
+ LOG.info("Skipping metric collection during stabilization period
until {}", stableTime);
Review Comment:
+1
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Map;
+import java.util.SortedMap;
+
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+
+/** Component responsible for computing vertex parallelism based on the
scaling metrics. */
+public class JobVertexScaler {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(JobVertexScaler.class);
+
+ private Clock clock = Clock.system(ZoneId.systemDefault());
+
+ public int computeScaleTargetParallelism(
+ Configuration conf,
+ JobVertexID vertex,
+ Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+ SortedMap<Instant, ScalingSummary> history) {
+
+ var currentParallelism = (int)
evaluatedMetrics.get(PARALLELISM).getCurrent();
+ double averageTrueProcessingRate =
evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
+ if (Double.isNaN(averageTrueProcessingRate)) {
+ LOG.info(
+ "True processing rate is not available for {}, cannot
compute new parallelism",
+ vertex);
+ return currentParallelism;
+ }
+
+ double targetCapacity =
+ AutoScalerUtils.getTargetProcessingCapacity(
+ evaluatedMetrics, conf, conf.get(TARGET_UTILIZATION),
true);
+ if (Double.isNaN(targetCapacity)) {
+ LOG.info(
+ "Target data rate is not available for {}, cannot compute
new parallelism",
+ vertex);
+ return currentParallelism;
+ }
+
+ LOG.info("Target processing capacity for {} is {}", vertex,
targetCapacity);
+ double scaleFactor = targetCapacity / averageTrueProcessingRate;
+ double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
+ if (scaleFactor < minScaleFactor) {
+ LOG.info(
+ "Computed scale factor of {} for {} is capped by maximum
scale down factor to {}",
+ scaleFactor,
+ vertex,
+ minScaleFactor);
+ scaleFactor = minScaleFactor;
+ }
+
+ int newParallelism =
+ scale(
+ currentParallelism,
+ (int)
evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
+ scaleFactor,
+ conf.getInteger(VERTEX_MIN_PARALLELISM),
+ conf.getInteger(VERTEX_MAX_PARALLELISM));
+
+ if (newParallelism == currentParallelism
+ || blockScalingBasedOnPastActions(
+ vertex,
+ conf,
+ evaluatedMetrics,
+ history,
+ currentParallelism,
+ newParallelism)) {
+ return currentParallelism;
+ }
+
+ // We record our expectations for this scaling operation
+ evaluatedMetrics.put(
+ ScalingMetric.EXPECTED_PROCESSING_RATE,
EvaluatedScalingMetric.of(targetCapacity));
+ return newParallelism;
+ }
+
+ private boolean blockScalingBasedOnPastActions(
+ JobVertexID vertex,
+ Configuration conf,
+ Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+ SortedMap<Instant, ScalingSummary> history,
+ int currentParallelism,
+ int newParallelism) {
+
+ // If we don't have past scaling actions for this vertex, there is
nothing to do
+ if (history.isEmpty()) {
+ return false;
+ }
+
+ boolean scaleUp = currentParallelism < newParallelism;
+
+ return detectImmediateScaleDownAfterScaleUp(vertex, conf, history,
scaleUp)
+ || detectIneffectiveScaleup(vertex, conf, evaluatedMetrics,
history, scaleUp);
+ }
+
+ private boolean detectImmediateScaleDownAfterScaleUp(
+ JobVertexID vertex,
+ Configuration conf,
+ SortedMap<Instant, ScalingSummary> history,
+ boolean scaleUp) {
+
+ // Does not apply to scale up operations
+ if (scaleUp) {
+ return false;
+ }
+
+ var lastScalingTs = history.lastKey();
+ var lastSummary = history.get(lastScalingTs);
+
+ boolean lastScaleUp = lastSummary.getNewParallelism() >
lastSummary.getCurrentParallelism();
+ if (!lastScaleUp) {
+ return false;
+ }
+
+ var gracePeriod = conf.get(SCALE_UP_GRACE_PERIOD);
+ if (lastScalingTs.plus(gracePeriod).isAfter(clock.instant())) {
+ LOG.info(
+ "Skipping immediate scale down after scale up within grace
period for {}",
+ vertex);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private boolean detectIneffectiveScaleup(
+ JobVertexID vertex,
+ Configuration conf,
+ Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+ SortedMap<Instant, ScalingSummary> history,
+ boolean scaleUp) {
+
+ // Does not apply to scale down operations
+ if (!scaleUp) {
+ return false;
+ }
+
+ var lastScalingTs = history.lastKey();
+ var lastSummary = history.get(lastScalingTs);
Review Comment:
Same caveat here with regards to the history not including the vertex yet.
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Map;
+import java.util.SortedMap;
+
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+
+/** Component responsible for computing vertex parallelism based on the
scaling metrics. */
+public class JobVertexScaler {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(JobVertexScaler.class);
+
+ private Clock clock = Clock.system(ZoneId.systemDefault());
+
+ public int computeScaleTargetParallelism(
+ Configuration conf,
+ JobVertexID vertex,
+ Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+ SortedMap<Instant, ScalingSummary> history) {
+
+ var currentParallelism = (int)
evaluatedMetrics.get(PARALLELISM).getCurrent();
+ double averageTrueProcessingRate =
evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
+ if (Double.isNaN(averageTrueProcessingRate)) {
+ LOG.info(
+ "True processing rate is not available for {}, cannot
compute new parallelism",
+ vertex);
+ return currentParallelism;
+ }
+
+ double targetCapacity =
+ AutoScalerUtils.getTargetProcessingCapacity(
+ evaluatedMetrics, conf, conf.get(TARGET_UTILIZATION),
true);
+ if (Double.isNaN(targetCapacity)) {
+ LOG.info(
+ "Target data rate is not available for {}, cannot compute
new parallelism",
+ vertex);
+ return currentParallelism;
+ }
+
+ LOG.info("Target processing capacity for {} is {}", vertex,
targetCapacity);
+ double scaleFactor = targetCapacity / averageTrueProcessingRate;
+ double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
+ if (scaleFactor < minScaleFactor) {
+ LOG.info(
+ "Computed scale factor of {} for {} is capped by maximum
scale down factor to {}",
+ scaleFactor,
+ vertex,
+ minScaleFactor);
+ scaleFactor = minScaleFactor;
+ }
+
+ int newParallelism =
+ scale(
+ currentParallelism,
+ (int)
evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
+ scaleFactor,
+ conf.getInteger(VERTEX_MIN_PARALLELISM),
+ conf.getInteger(VERTEX_MAX_PARALLELISM));
+
+ if (newParallelism == currentParallelism
+ || blockScalingBasedOnPastActions(
+ vertex,
+ conf,
+ evaluatedMetrics,
+ history,
+ currentParallelism,
+ newParallelism)) {
+ return currentParallelism;
+ }
+
+ // We record our expectations for this scaling operation
+ evaluatedMetrics.put(
+ ScalingMetric.EXPECTED_PROCESSING_RATE,
EvaluatedScalingMetric.of(targetCapacity));
+ return newParallelism;
+ }
+
+ private boolean blockScalingBasedOnPastActions(
+ JobVertexID vertex,
+ Configuration conf,
+ Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+ SortedMap<Instant, ScalingSummary> history,
+ int currentParallelism,
+ int newParallelism) {
+
+ // If we don't have past scaling actions for this vertex, there is
nothing to do
+ if (history.isEmpty()) {
+ return false;
+ }
+
+ boolean scaleUp = currentParallelism < newParallelism;
+
+ return detectImmediateScaleDownAfterScaleUp(vertex, conf, history,
scaleUp)
+ || detectIneffectiveScaleup(vertex, conf, evaluatedMetrics,
history, scaleUp);
+ }
+
+ private boolean detectImmediateScaleDownAfterScaleUp(
+ JobVertexID vertex,
+ Configuration conf,
+ SortedMap<Instant, ScalingSummary> history,
+ boolean scaleUp) {
Review Comment:
This parameter is odd, I'd rather handle this by the caller, i.e. not call
this method on scale down or no parallelism change.
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java:
##########
@@ -120,7 +120,7 @@ public CollectedMetrics updateMetrics(
if (h == null) {
h = scalingInformation.getMetricHistory();
}
- return h.tailMap(now.minus(metricsWindowSize));
+ return new
TreeMap<>(h.tailMap(now.minus(metricsWindowSize)));
Review Comment:
No objections but did returning a subset cause issues?
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Map;
+import java.util.SortedMap;
+
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+
+/** Component responsible for computing vertex parallelism based on the
scaling metrics. */
+public class JobVertexScaler {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(JobVertexScaler.class);
+
+ private Clock clock = Clock.system(ZoneId.systemDefault());
+
+ public int computeScaleTargetParallelism(
+ Configuration conf,
+ JobVertexID vertex,
+ Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+ SortedMap<Instant, ScalingSummary> history) {
+
+ var currentParallelism = (int)
evaluatedMetrics.get(PARALLELISM).getCurrent();
+ double averageTrueProcessingRate =
evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
+ if (Double.isNaN(averageTrueProcessingRate)) {
+ LOG.info(
+ "True processing rate is not available for {}, cannot
compute new parallelism",
+ vertex);
+ return currentParallelism;
+ }
+
+ double targetCapacity =
+ AutoScalerUtils.getTargetProcessingCapacity(
+ evaluatedMetrics, conf, conf.get(TARGET_UTILIZATION),
true);
+ if (Double.isNaN(targetCapacity)) {
+ LOG.info(
+ "Target data rate is not available for {}, cannot compute
new parallelism",
+ vertex);
+ return currentParallelism;
+ }
+
+ LOG.info("Target processing capacity for {} is {}", vertex,
targetCapacity);
+ double scaleFactor = targetCapacity / averageTrueProcessingRate;
+ double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
+ if (scaleFactor < minScaleFactor) {
+ LOG.info(
+ "Computed scale factor of {} for {} is capped by maximum
scale down factor to {}",
+ scaleFactor,
+ vertex,
+ minScaleFactor);
+ scaleFactor = minScaleFactor;
+ }
+
+ int newParallelism =
+ scale(
+ currentParallelism,
+ (int)
evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
+ scaleFactor,
+ conf.getInteger(VERTEX_MIN_PARALLELISM),
+ conf.getInteger(VERTEX_MAX_PARALLELISM));
+
+ if (newParallelism == currentParallelism
+ || blockScalingBasedOnPastActions(
+ vertex,
+ conf,
+ evaluatedMetrics,
+ history,
+ currentParallelism,
+ newParallelism)) {
+ return currentParallelism;
+ }
+
+ // We record our expectations for this scaling operation
+ evaluatedMetrics.put(
+ ScalingMetric.EXPECTED_PROCESSING_RATE,
EvaluatedScalingMetric.of(targetCapacity));
+ return newParallelism;
+ }
+
+ private boolean blockScalingBasedOnPastActions(
+ JobVertexID vertex,
+ Configuration conf,
+ Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+ SortedMap<Instant, ScalingSummary> history,
+ int currentParallelism,
+ int newParallelism) {
+
+ // If we don't have past scaling actions for this vertex, there is
nothing to do
+ if (history.isEmpty()) {
+ return false;
+ }
+
+ boolean scaleUp = currentParallelism < newParallelism;
+
+ return detectImmediateScaleDownAfterScaleUp(vertex, conf, history,
scaleUp)
+ || detectIneffectiveScaleup(vertex, conf, evaluatedMetrics,
history, scaleUp);
+ }
+
+ private boolean detectImmediateScaleDownAfterScaleUp(
+ JobVertexID vertex,
+ Configuration conf,
+ SortedMap<Instant, ScalingSummary> history,
+ boolean scaleUp) {
+
+ // Does not apply to scale up operations
+ if (scaleUp) {
+ return false;
+ }
+
+ var lastScalingTs = history.lastKey();
+ var lastSummary = history.get(lastScalingTs);
+
+ boolean lastScaleUp = lastSummary.getNewParallelism() >
lastSummary.getCurrentParallelism();
+ if (!lastScaleUp) {
+ return false;
+ }
+
+ var gracePeriod = conf.get(SCALE_UP_GRACE_PERIOD);
+ if (lastScalingTs.plus(gracePeriod).isAfter(clock.instant())) {
+ LOG.info(
+ "Skipping immediate scale down after scale up within grace
period for {}",
+ vertex);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private boolean detectIneffectiveScaleup(
+ JobVertexID vertex,
+ Configuration conf,
+ Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+ SortedMap<Instant, ScalingSummary> history,
+ boolean scaleUp) {
+
+ // Does not apply to scale down operations
+ if (!scaleUp) {
+ return false;
+ }
+
+ var lastScalingTs = history.lastKey();
+ var lastSummary = history.get(lastScalingTs);
+
+ boolean lastScaleUp = lastSummary.getNewParallelism() >
lastSummary.getCurrentParallelism();
+ if (!lastScaleUp) {
+ return false;
+ }
+
+ double lastProcRate =
lastSummary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage();
+ double lastExpectedProcRate =
+
lastSummary.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent();
+ var currentProcRate =
evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
+
+ double expectedIncrease = lastExpectedProcRate / lastProcRate - 1;
+ double actualIncrease = currentProcRate / lastProcRate - 1;
+
+ boolean withinEffectiveThreshold =
+ (expectedIncrease - actualIncrease) / expectedIncrease
+ < (1 -
conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD));
Review Comment:
Maybe adding comments would help to explain what your formula does beyond
this.
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Map;
+import java.util.SortedMap;
+
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+
+/** Component responsible for computing vertex parallelism based on the
scaling metrics. */
+public class JobVertexScaler {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(JobVertexScaler.class);
+
+ private Clock clock = Clock.system(ZoneId.systemDefault());
+
+ public int computeScaleTargetParallelism(
+ Configuration conf,
+ JobVertexID vertex,
+ Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+ SortedMap<Instant, ScalingSummary> history) {
+
+ var currentParallelism = (int)
evaluatedMetrics.get(PARALLELISM).getCurrent();
+ double averageTrueProcessingRate =
evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
+ if (Double.isNaN(averageTrueProcessingRate)) {
+ LOG.info(
+ "True processing rate is not available for {}, cannot
compute new parallelism",
+ vertex);
+ return currentParallelism;
+ }
+
+ double targetCapacity =
+ AutoScalerUtils.getTargetProcessingCapacity(
+ evaluatedMetrics, conf, conf.get(TARGET_UTILIZATION),
true);
+ if (Double.isNaN(targetCapacity)) {
+ LOG.info(
+ "Target data rate is not available for {}, cannot compute
new parallelism",
+ vertex);
+ return currentParallelism;
+ }
+
+ LOG.info("Target processing capacity for {} is {}", vertex,
targetCapacity);
+ double scaleFactor = targetCapacity / averageTrueProcessingRate;
+ double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
+ if (scaleFactor < minScaleFactor) {
+ LOG.info(
+ "Computed scale factor of {} for {} is capped by maximum
scale down factor to {}",
+ scaleFactor,
+ vertex,
+ minScaleFactor);
+ scaleFactor = minScaleFactor;
+ }
+
+ int newParallelism =
+ scale(
+ currentParallelism,
+ (int)
evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
+ scaleFactor,
+ conf.getInteger(VERTEX_MIN_PARALLELISM),
+ conf.getInteger(VERTEX_MAX_PARALLELISM));
+
+ if (newParallelism == currentParallelism
+ || blockScalingBasedOnPastActions(
+ vertex,
+ conf,
+ evaluatedMetrics,
+ history,
+ currentParallelism,
+ newParallelism)) {
+ return currentParallelism;
+ }
+
+ // We record our expectations for this scaling operation
+ evaluatedMetrics.put(
+ ScalingMetric.EXPECTED_PROCESSING_RATE,
EvaluatedScalingMetric.of(targetCapacity));
+ return newParallelism;
+ }
+
+ private boolean blockScalingBasedOnPastActions(
+ JobVertexID vertex,
+ Configuration conf,
+ Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+ SortedMap<Instant, ScalingSummary> history,
+ int currentParallelism,
+ int newParallelism) {
+
+ // If we don't have past scaling actions for this vertex, there is
nothing to do
+ if (history.isEmpty()) {
+ return false;
+ }
+
+ boolean scaleUp = currentParallelism < newParallelism;
+
+ return detectImmediateScaleDownAfterScaleUp(vertex, conf, history,
scaleUp)
+ || detectIneffectiveScaleup(vertex, conf, evaluatedMetrics,
history, scaleUp);
+ }
+
+ private boolean detectImmediateScaleDownAfterScaleUp(
+ JobVertexID vertex,
+ Configuration conf,
+ SortedMap<Instant, ScalingSummary> history,
+ boolean scaleUp) {
+
+ // Does not apply to scale up operations
+ if (scaleUp) {
+ return false;
+ }
+
+ var lastScalingTs = history.lastKey();
+ var lastSummary = history.get(lastScalingTs);
+
+ boolean lastScaleUp = lastSummary.getNewParallelism() >
lastSummary.getCurrentParallelism();
+ if (!lastScaleUp) {
+ return false;
+ }
+
+ var gracePeriod = conf.get(SCALE_UP_GRACE_PERIOD);
+ if (lastScalingTs.plus(gracePeriod).isAfter(clock.instant())) {
+ LOG.info(
+ "Skipping immediate scale down after scale up within grace
period for {}",
+ vertex);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private boolean detectIneffectiveScaleup(
+ JobVertexID vertex,
+ Configuration conf,
+ Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+ SortedMap<Instant, ScalingSummary> history,
+ boolean scaleUp) {
+
+ // Does not apply to scale down operations
+ if (!scaleUp) {
+ return false;
+ }
+
+ var lastScalingTs = history.lastKey();
+ var lastSummary = history.get(lastScalingTs);
+
+ boolean lastScaleUp = lastSummary.getNewParallelism() >
lastSummary.getCurrentParallelism();
+ if (!lastScaleUp) {
+ return false;
+ }
+
+ double lastProcRate =
lastSummary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage();
+ double lastExpectedProcRate =
+
lastSummary.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent();
+ var currentProcRate =
evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
+
+ double expectedIncrease = lastExpectedProcRate / lastProcRate - 1;
+ double actualIncrease = currentProcRate / lastProcRate - 1;
+
+ boolean withinEffectiveThreshold =
+ (expectedIncrease - actualIncrease) / expectedIncrease
+ < (1 -
conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD));
+
+ if (withinEffectiveThreshold) {
+ return false;
+ }
+
+ // TODO: Trigger kube event
+
+ if
(conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
+ LOG.info(
+ "Skipping further scale up after ineffective previous
scale up for {}", vertex);
+ return true;
Review Comment:
This condition should come earlier in the routine.
##########
examples/autoscaling/src/main/java/autoscaling/AutoscalingExample.java:
##########
@@ -29,10 +30,19 @@ public static void main(String[] args) throws Exception {
stream =
stream.shuffle()
.map(
- i -> {
- // Add sleep to artificially slow down
processing
- // Thread.sleep(sleep);
- return i;
+ new RichMapFunction<Long, Long>() {
+ @Override
+ public Long map(Long i) throws Exception {
+ var start = System.nanoTime();
+ var end = System.nanoTime();
+ while (end
+ < start
+ + getRuntimeContext()
+
.getNumberOfParallelSubtasks()) {
+ end = System.nanoTime();
Review Comment:
Why this change, is this here to produce load by calling System.nanoTime()?
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Map;
+import java.util.SortedMap;
+
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+
+/** Component responsible for computing vertex parallelism based on the
scaling metrics. */
+public class JobVertexScaler {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(JobVertexScaler.class);
+
+ private Clock clock = Clock.system(ZoneId.systemDefault());
+
+ public int computeScaleTargetParallelism(
+ Configuration conf,
+ JobVertexID vertex,
+ Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+ SortedMap<Instant, ScalingSummary> history) {
+
+ var currentParallelism = (int)
evaluatedMetrics.get(PARALLELISM).getCurrent();
+ double averageTrueProcessingRate =
evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
+ if (Double.isNaN(averageTrueProcessingRate)) {
+ LOG.info(
+ "True processing rate is not available for {}, cannot
compute new parallelism",
+ vertex);
+ return currentParallelism;
+ }
+
+ double targetCapacity =
+ AutoScalerUtils.getTargetProcessingCapacity(
+ evaluatedMetrics, conf, conf.get(TARGET_UTILIZATION),
true);
+ if (Double.isNaN(targetCapacity)) {
+ LOG.info(
+ "Target data rate is not available for {}, cannot compute
new parallelism",
+ vertex);
+ return currentParallelism;
+ }
+
+ LOG.info("Target processing capacity for {} is {}", vertex,
targetCapacity);
+ double scaleFactor = targetCapacity / averageTrueProcessingRate;
+ double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
+ if (scaleFactor < minScaleFactor) {
+ LOG.info(
+ "Computed scale factor of {} for {} is capped by maximum
scale down factor to {}",
+ scaleFactor,
+ vertex,
+ minScaleFactor);
+ scaleFactor = minScaleFactor;
+ }
+
+ int newParallelism =
+ scale(
+ currentParallelism,
+ (int)
evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
+ scaleFactor,
+ conf.getInteger(VERTEX_MIN_PARALLELISM),
+ conf.getInteger(VERTEX_MAX_PARALLELISM));
+
+ if (newParallelism == currentParallelism
+ || blockScalingBasedOnPastActions(
+ vertex,
+ conf,
+ evaluatedMetrics,
+ history,
+ currentParallelism,
+ newParallelism)) {
+ return currentParallelism;
+ }
+
+ // We record our expectations for this scaling operation
+ evaluatedMetrics.put(
+ ScalingMetric.EXPECTED_PROCESSING_RATE,
EvaluatedScalingMetric.of(targetCapacity));
+ return newParallelism;
+ }
+
+ private boolean blockScalingBasedOnPastActions(
+ JobVertexID vertex,
+ Configuration conf,
+ Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+ SortedMap<Instant, ScalingSummary> history,
+ int currentParallelism,
+ int newParallelism) {
+
+ // If we don't have past scaling actions for this vertex, there is
nothing to do
+ if (history.isEmpty()) {
+ return false;
+ }
+
+ boolean scaleUp = currentParallelism < newParallelism;
+
+ return detectImmediateScaleDownAfterScaleUp(vertex, conf, history,
scaleUp)
+ || detectIneffectiveScaleup(vertex, conf, evaluatedMetrics,
history, scaleUp);
+ }
+
+ private boolean detectImmediateScaleDownAfterScaleUp(
+ JobVertexID vertex,
+ Configuration conf,
+ SortedMap<Instant, ScalingSummary> history,
+ boolean scaleUp) {
+
+ // Does not apply to scale up operations
+ if (scaleUp) {
+ return false;
+ }
+
+ var lastScalingTs = history.lastKey();
+ var lastSummary = history.get(lastScalingTs);
Review Comment:
I think we need to handle if the history doesn't contain the vertex. We
currently only yield scaling summaries for changes vertices. Also, if this is
the first time scaling, the history won't show the vertex.
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Map;
+import java.util.SortedMap;
+
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+
+/** Component responsible for computing vertex parallelism based on the
scaling metrics. */
+public class JobVertexScaler {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(JobVertexScaler.class);
+
+ private Clock clock = Clock.system(ZoneId.systemDefault());
+
+ public int computeScaleTargetParallelism(
+ Configuration conf,
+ JobVertexID vertex,
+ Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+ SortedMap<Instant, ScalingSummary> history) {
+
+ var currentParallelism = (int)
evaluatedMetrics.get(PARALLELISM).getCurrent();
+ double averageTrueProcessingRate =
evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
+ if (Double.isNaN(averageTrueProcessingRate)) {
+ LOG.info(
+ "True processing rate is not available for {}, cannot
compute new parallelism",
+ vertex);
+ return currentParallelism;
+ }
+
+ double targetCapacity =
+ AutoScalerUtils.getTargetProcessingCapacity(
+ evaluatedMetrics, conf, conf.get(TARGET_UTILIZATION),
true);
+ if (Double.isNaN(targetCapacity)) {
+ LOG.info(
+ "Target data rate is not available for {}, cannot compute
new parallelism",
+ vertex);
+ return currentParallelism;
+ }
+
+ LOG.info("Target processing capacity for {} is {}", vertex,
targetCapacity);
+ double scaleFactor = targetCapacity / averageTrueProcessingRate;
+ double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
+ if (scaleFactor < minScaleFactor) {
+ LOG.info(
+ "Computed scale factor of {} for {} is capped by maximum
scale down factor to {}",
+ scaleFactor,
+ vertex,
+ minScaleFactor);
+ scaleFactor = minScaleFactor;
+ }
+
+ int newParallelism =
+ scale(
+ currentParallelism,
+ (int)
evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
+ scaleFactor,
+ conf.getInteger(VERTEX_MIN_PARALLELISM),
+ conf.getInteger(VERTEX_MAX_PARALLELISM));
+
+ if (newParallelism == currentParallelism
+ || blockScalingBasedOnPastActions(
+ vertex,
+ conf,
+ evaluatedMetrics,
+ history,
+ currentParallelism,
+ newParallelism)) {
+ return currentParallelism;
+ }
+
+ // We record our expectations for this scaling operation
+ evaluatedMetrics.put(
+ ScalingMetric.EXPECTED_PROCESSING_RATE,
EvaluatedScalingMetric.of(targetCapacity));
+ return newParallelism;
+ }
+
+ private boolean blockScalingBasedOnPastActions(
+ JobVertexID vertex,
+ Configuration conf,
+ Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+ SortedMap<Instant, ScalingSummary> history,
+ int currentParallelism,
+ int newParallelism) {
+
+ // If we don't have past scaling actions for this vertex, there is
nothing to do
+ if (history.isEmpty()) {
+ return false;
+ }
+
+ boolean scaleUp = currentParallelism < newParallelism;
+
+ return detectImmediateScaleDownAfterScaleUp(vertex, conf, history,
scaleUp)
+ || detectIneffectiveScaleup(vertex, conf, evaluatedMetrics,
history, scaleUp);
+ }
+
+ private boolean detectImmediateScaleDownAfterScaleUp(
+ JobVertexID vertex,
+ Configuration conf,
+ SortedMap<Instant, ScalingSummary> history,
+ boolean scaleUp) {
+
+ // Does not apply to scale up operations
+ if (scaleUp) {
+ return false;
+ }
+
+ var lastScalingTs = history.lastKey();
+ var lastSummary = history.get(lastScalingTs);
+
+ boolean lastScaleUp = lastSummary.getNewParallelism() >
lastSummary.getCurrentParallelism();
+ if (!lastScaleUp) {
Review Comment:
```suggestion
if (!lastScaledUp) {
```
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java:
##########
@@ -141,21 +141,30 @@ public CollectedMetrics updateMetrics(
if
(now.isBefore(stableTime.plus(conf.get(AutoScalerOptions.METRICS_WINDOW)))) {
// As long as we haven't had time to collect a full window,
// collect metrics but do not return any metrics
+ LOG.info("Waiting until initial metric window is full before
starting scaling");
Review Comment:
+1
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Map;
+import java.util.SortedMap;
+
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+
+/** Component responsible for computing vertex parallelism based on the
scaling metrics. */
+public class JobVertexScaler {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(JobVertexScaler.class);
+
+ private Clock clock = Clock.system(ZoneId.systemDefault());
+
+ public int computeScaleTargetParallelism(
+ Configuration conf,
+ JobVertexID vertex,
+ Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+ SortedMap<Instant, ScalingSummary> history) {
+
+ var currentParallelism = (int)
evaluatedMetrics.get(PARALLELISM).getCurrent();
+ double averageTrueProcessingRate =
evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
+ if (Double.isNaN(averageTrueProcessingRate)) {
+ LOG.info(
+ "True processing rate is not available for {}, cannot
compute new parallelism",
+ vertex);
+ return currentParallelism;
+ }
+
+ double targetCapacity =
+ AutoScalerUtils.getTargetProcessingCapacity(
+ evaluatedMetrics, conf, conf.get(TARGET_UTILIZATION),
true);
+ if (Double.isNaN(targetCapacity)) {
+ LOG.info(
+ "Target data rate is not available for {}, cannot compute
new parallelism",
+ vertex);
+ return currentParallelism;
+ }
+
+ LOG.info("Target processing capacity for {} is {}", vertex,
targetCapacity);
+ double scaleFactor = targetCapacity / averageTrueProcessingRate;
+ double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
+ if (scaleFactor < minScaleFactor) {
+ LOG.info(
+ "Computed scale factor of {} for {} is capped by maximum
scale down factor to {}",
+ scaleFactor,
+ vertex,
+ minScaleFactor);
+ scaleFactor = minScaleFactor;
+ }
+
+ int newParallelism =
+ scale(
+ currentParallelism,
+ (int)
evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
+ scaleFactor,
+ conf.getInteger(VERTEX_MIN_PARALLELISM),
+ conf.getInteger(VERTEX_MAX_PARALLELISM));
+
+ if (newParallelism == currentParallelism
+ || blockScalingBasedOnPastActions(
+ vertex,
+ conf,
+ evaluatedMetrics,
+ history,
+ currentParallelism,
+ newParallelism)) {
+ return currentParallelism;
+ }
+
+ // We record our expectations for this scaling operation
+ evaluatedMetrics.put(
+ ScalingMetric.EXPECTED_PROCESSING_RATE,
EvaluatedScalingMetric.of(targetCapacity));
+ return newParallelism;
+ }
+
+ private boolean blockScalingBasedOnPastActions(
+ JobVertexID vertex,
+ Configuration conf,
+ Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+ SortedMap<Instant, ScalingSummary> history,
+ int currentParallelism,
+ int newParallelism) {
+
+ // If we don't have past scaling actions for this vertex, there is
nothing to do
+ if (history.isEmpty()) {
+ return false;
+ }
+
+ boolean scaleUp = currentParallelism < newParallelism;
+
+ return detectImmediateScaleDownAfterScaleUp(vertex, conf, history,
scaleUp)
+ || detectIneffectiveScaleup(vertex, conf, evaluatedMetrics,
history, scaleUp);
+ }
+
+ private boolean detectImmediateScaleDownAfterScaleUp(
+ JobVertexID vertex,
+ Configuration conf,
+ SortedMap<Instant, ScalingSummary> history,
+ boolean scaleUp) {
+
+ // Does not apply to scale up operations
+ if (scaleUp) {
+ return false;
+ }
+
+ var lastScalingTs = history.lastKey();
+ var lastSummary = history.get(lastScalingTs);
+
+ boolean lastScaleUp = lastSummary.getNewParallelism() >
lastSummary.getCurrentParallelism();
+ if (!lastScaleUp) {
+ return false;
+ }
+
+ var gracePeriod = conf.get(SCALE_UP_GRACE_PERIOD);
+ if (lastScalingTs.plus(gracePeriod).isAfter(clock.instant())) {
+ LOG.info(
+ "Skipping immediate scale down after scale up within grace
period for {}",
+ vertex);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private boolean detectIneffectiveScaleup(
+ JobVertexID vertex,
+ Configuration conf,
+ Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+ SortedMap<Instant, ScalingSummary> history,
+ boolean scaleUp) {
Review Comment:
Same here, would remove this parameter and handle this by the caller.
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Map;
+import java.util.SortedMap;
+
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+
+/** Component responsible for computing vertex parallelism based on the
scaling metrics. */
+public class JobVertexScaler {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(JobVertexScaler.class);
+
+ private Clock clock = Clock.system(ZoneId.systemDefault());
+
+ public int computeScaleTargetParallelism(
+ Configuration conf,
+ JobVertexID vertex,
+ Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+ SortedMap<Instant, ScalingSummary> history) {
+
+ var currentParallelism = (int)
evaluatedMetrics.get(PARALLELISM).getCurrent();
+ double averageTrueProcessingRate =
evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
+ if (Double.isNaN(averageTrueProcessingRate)) {
+ LOG.info(
+ "True processing rate is not available for {}, cannot
compute new parallelism",
+ vertex);
+ return currentParallelism;
+ }
+
+ double targetCapacity =
+ AutoScalerUtils.getTargetProcessingCapacity(
+ evaluatedMetrics, conf, conf.get(TARGET_UTILIZATION),
true);
+ if (Double.isNaN(targetCapacity)) {
+ LOG.info(
+ "Target data rate is not available for {}, cannot compute
new parallelism",
+ vertex);
+ return currentParallelism;
+ }
+
+ LOG.info("Target processing capacity for {} is {}", vertex,
targetCapacity);
+ double scaleFactor = targetCapacity / averageTrueProcessingRate;
+ double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
+ if (scaleFactor < minScaleFactor) {
+ LOG.info(
+ "Computed scale factor of {} for {} is capped by maximum
scale down factor to {}",
+ scaleFactor,
+ vertex,
+ minScaleFactor);
+ scaleFactor = minScaleFactor;
+ }
+
+ int newParallelism =
+ scale(
+ currentParallelism,
+ (int)
evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
+ scaleFactor,
+ conf.getInteger(VERTEX_MIN_PARALLELISM),
+ conf.getInteger(VERTEX_MAX_PARALLELISM));
+
+ if (newParallelism == currentParallelism
+ || blockScalingBasedOnPastActions(
+ vertex,
+ conf,
+ evaluatedMetrics,
+ history,
+ currentParallelism,
+ newParallelism)) {
+ return currentParallelism;
+ }
+
+ // We record our expectations for this scaling operation
+ evaluatedMetrics.put(
+ ScalingMetric.EXPECTED_PROCESSING_RATE,
EvaluatedScalingMetric.of(targetCapacity));
+ return newParallelism;
+ }
+
+ private boolean blockScalingBasedOnPastActions(
+ JobVertexID vertex,
+ Configuration conf,
+ Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+ SortedMap<Instant, ScalingSummary> history,
+ int currentParallelism,
+ int newParallelism) {
+
+ // If we don't have past scaling actions for this vertex, there is
nothing to do
+ if (history.isEmpty()) {
+ return false;
+ }
+
+ boolean scaleUp = currentParallelism < newParallelism;
+
+ return detectImmediateScaleDownAfterScaleUp(vertex, conf, history,
scaleUp)
+ || detectIneffectiveScaleup(vertex, conf, evaluatedMetrics,
history, scaleUp);
+ }
+
+ private boolean detectImmediateScaleDownAfterScaleUp(
+ JobVertexID vertex,
+ Configuration conf,
+ SortedMap<Instant, ScalingSummary> history,
+ boolean scaleUp) {
+
+ // Does not apply to scale up operations
+ if (scaleUp) {
+ return false;
+ }
+
+ var lastScalingTs = history.lastKey();
+ var lastSummary = history.get(lastScalingTs);
+
+ boolean lastScaleUp = lastSummary.getNewParallelism() >
lastSummary.getCurrentParallelism();
+ if (!lastScaleUp) {
+ return false;
+ }
+
+ var gracePeriod = conf.get(SCALE_UP_GRACE_PERIOD);
+ if (lastScalingTs.plus(gracePeriod).isAfter(clock.instant())) {
+ LOG.info(
+ "Skipping immediate scale down after scale up within grace
period for {}",
+ vertex);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private boolean detectIneffectiveScaleup(
+ JobVertexID vertex,
+ Configuration conf,
+ Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+ SortedMap<Instant, ScalingSummary> history,
+ boolean scaleUp) {
+
+ // Does not apply to scale down operations
+ if (!scaleUp) {
+ return false;
+ }
+
+ var lastScalingTs = history.lastKey();
+ var lastSummary = history.get(lastScalingTs);
+
+ boolean lastScaleUp = lastSummary.getNewParallelism() >
lastSummary.getCurrentParallelism();
+ if (!lastScaleUp) {
+ return false;
+ }
+
+ double lastProcRate =
lastSummary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage();
+ double lastExpectedProcRate =
+
lastSummary.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent();
+ var currentProcRate =
evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
+
+ double expectedIncrease = lastExpectedProcRate / lastProcRate - 1;
+ double actualIncrease = currentProcRate / lastProcRate - 1;
+
+ boolean withinEffectiveThreshold =
+ (expectedIncrease - actualIncrease) / expectedIncrease
+ < (1 -
conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD));
Review Comment:
I don't understand this formula. If you have a config option which says, you
want at least x % increase, I'd imagine this to work as follows:
```
percentage_change = actual_rate / expected_rate - 1
if (percentage_change <
conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD)) {
return false;
} else {
// uneffective scaling detected
}
```
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java:
##########
@@ -387,19 +389,18 @@ protected Map<String, FlinkMetric>
getFilteredVertexMetricNames(
}
requiredMetrics.forEach(
- flinkMetric -> {
- filteredMetrics.put(
- flinkMetric
- .findAny(allMetricNames)
- .orElseThrow(
- () ->
- new RuntimeException(
- "Could not find
required metric "
- +
flinkMetric.name()
- + " for "
- +
jobVertexID)),
- flinkMetric);
- });
+ flinkMetric ->
+ filteredMetrics.put(
+ flinkMetric
+ .findAny(allMetricNames)
+ .orElseThrow(
+ () ->
+ new RuntimeException(
+ "Could not
find required metric "
+ +
flinkMetric.name()
+ + "
for "
+ +
jobVertexID)),
+ flinkMetric));
Review Comment:
What is the change here? Indention?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]