mxm commented on code in PR #502:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/502#discussion_r1065794519


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java:
##########
@@ -102,6 +103,7 @@ public CollectedMetrics updateMetrics(
         var stableTime = currentJobUpdateTs.plus(stabilizationDuration);
         if (now.isBefore(stableTime)) {
             // As long as we are stabilizing, collect no metrics at all
+            LOG.info("Skipping metric collection during stabilization period 
until {}", stableTime);

Review Comment:
   +1



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Map;
+import java.util.SortedMap;
+
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+
+/** Component responsible for computing vertex parallelism based on the 
scaling metrics. */
+public class JobVertexScaler {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JobVertexScaler.class);
+
+    private Clock clock = Clock.system(ZoneId.systemDefault());
+
+    public int computeScaleTargetParallelism(
+            Configuration conf,
+            JobVertexID vertex,
+            Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+            SortedMap<Instant, ScalingSummary> history) {
+
+        var currentParallelism = (int) 
evaluatedMetrics.get(PARALLELISM).getCurrent();
+        double averageTrueProcessingRate = 
evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
+        if (Double.isNaN(averageTrueProcessingRate)) {
+            LOG.info(
+                    "True processing rate is not available for {}, cannot 
compute new parallelism",
+                    vertex);
+            return currentParallelism;
+        }
+
+        double targetCapacity =
+                AutoScalerUtils.getTargetProcessingCapacity(
+                        evaluatedMetrics, conf, conf.get(TARGET_UTILIZATION), 
true);
+        if (Double.isNaN(targetCapacity)) {
+            LOG.info(
+                    "Target data rate is not available for {}, cannot compute 
new parallelism",
+                    vertex);
+            return currentParallelism;
+        }
+
+        LOG.info("Target processing capacity for {} is {}", vertex, 
targetCapacity);
+        double scaleFactor = targetCapacity / averageTrueProcessingRate;
+        double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
+        if (scaleFactor < minScaleFactor) {
+            LOG.info(
+                    "Computed scale factor of {} for {} is capped by maximum 
scale down factor to {}",
+                    scaleFactor,
+                    vertex,
+                    minScaleFactor);
+            scaleFactor = minScaleFactor;
+        }
+
+        int newParallelism =
+                scale(
+                        currentParallelism,
+                        (int) 
evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
+                        scaleFactor,
+                        conf.getInteger(VERTEX_MIN_PARALLELISM),
+                        conf.getInteger(VERTEX_MAX_PARALLELISM));
+
+        if (newParallelism == currentParallelism
+                || blockScalingBasedOnPastActions(
+                        vertex,
+                        conf,
+                        evaluatedMetrics,
+                        history,
+                        currentParallelism,
+                        newParallelism)) {
+            return currentParallelism;
+        }
+
+        // We record our expectations for this scaling operation
+        evaluatedMetrics.put(
+                ScalingMetric.EXPECTED_PROCESSING_RATE, 
EvaluatedScalingMetric.of(targetCapacity));
+        return newParallelism;
+    }
+
+    private boolean blockScalingBasedOnPastActions(
+            JobVertexID vertex,
+            Configuration conf,
+            Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+            SortedMap<Instant, ScalingSummary> history,
+            int currentParallelism,
+            int newParallelism) {
+
+        // If we don't have past scaling actions for this vertex, there is 
nothing to do
+        if (history.isEmpty()) {
+            return false;
+        }
+
+        boolean scaleUp = currentParallelism < newParallelism;
+
+        return detectImmediateScaleDownAfterScaleUp(vertex, conf, history, 
scaleUp)
+                || detectIneffectiveScaleup(vertex, conf, evaluatedMetrics, 
history, scaleUp);
+    }
+
+    private boolean detectImmediateScaleDownAfterScaleUp(
+            JobVertexID vertex,
+            Configuration conf,
+            SortedMap<Instant, ScalingSummary> history,
+            boolean scaleUp) {
+
+        // Does not apply to scale up operations
+        if (scaleUp) {
+            return false;
+        }
+
+        var lastScalingTs = history.lastKey();
+        var lastSummary = history.get(lastScalingTs);
+
+        boolean lastScaleUp = lastSummary.getNewParallelism() > 
lastSummary.getCurrentParallelism();
+        if (!lastScaleUp) {
+            return false;
+        }
+
+        var gracePeriod = conf.get(SCALE_UP_GRACE_PERIOD);
+        if (lastScalingTs.plus(gracePeriod).isAfter(clock.instant())) {
+            LOG.info(
+                    "Skipping immediate scale down after scale up within grace 
period for {}",
+                    vertex);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    private boolean detectIneffectiveScaleup(
+            JobVertexID vertex,
+            Configuration conf,
+            Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+            SortedMap<Instant, ScalingSummary> history,
+            boolean scaleUp) {
+
+        // Does not apply to scale down operations
+        if (!scaleUp) {
+            return false;
+        }
+
+        var lastScalingTs = history.lastKey();
+        var lastSummary = history.get(lastScalingTs);

Review Comment:
   Same caveat here with regards to the history not including the vertex yet.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Map;
+import java.util.SortedMap;
+
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+
+/** Component responsible for computing vertex parallelism based on the 
scaling metrics. */
+public class JobVertexScaler {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JobVertexScaler.class);
+
+    private Clock clock = Clock.system(ZoneId.systemDefault());
+
+    public int computeScaleTargetParallelism(
+            Configuration conf,
+            JobVertexID vertex,
+            Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+            SortedMap<Instant, ScalingSummary> history) {
+
+        var currentParallelism = (int) 
evaluatedMetrics.get(PARALLELISM).getCurrent();
+        double averageTrueProcessingRate = 
evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
+        if (Double.isNaN(averageTrueProcessingRate)) {
+            LOG.info(
+                    "True processing rate is not available for {}, cannot 
compute new parallelism",
+                    vertex);
+            return currentParallelism;
+        }
+
+        double targetCapacity =
+                AutoScalerUtils.getTargetProcessingCapacity(
+                        evaluatedMetrics, conf, conf.get(TARGET_UTILIZATION), 
true);
+        if (Double.isNaN(targetCapacity)) {
+            LOG.info(
+                    "Target data rate is not available for {}, cannot compute 
new parallelism",
+                    vertex);
+            return currentParallelism;
+        }
+
+        LOG.info("Target processing capacity for {} is {}", vertex, 
targetCapacity);
+        double scaleFactor = targetCapacity / averageTrueProcessingRate;
+        double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
+        if (scaleFactor < minScaleFactor) {
+            LOG.info(
+                    "Computed scale factor of {} for {} is capped by maximum 
scale down factor to {}",
+                    scaleFactor,
+                    vertex,
+                    minScaleFactor);
+            scaleFactor = minScaleFactor;
+        }
+
+        int newParallelism =
+                scale(
+                        currentParallelism,
+                        (int) 
evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
+                        scaleFactor,
+                        conf.getInteger(VERTEX_MIN_PARALLELISM),
+                        conf.getInteger(VERTEX_MAX_PARALLELISM));
+
+        if (newParallelism == currentParallelism
+                || blockScalingBasedOnPastActions(
+                        vertex,
+                        conf,
+                        evaluatedMetrics,
+                        history,
+                        currentParallelism,
+                        newParallelism)) {
+            return currentParallelism;
+        }
+
+        // We record our expectations for this scaling operation
+        evaluatedMetrics.put(
+                ScalingMetric.EXPECTED_PROCESSING_RATE, 
EvaluatedScalingMetric.of(targetCapacity));
+        return newParallelism;
+    }
+
+    private boolean blockScalingBasedOnPastActions(
+            JobVertexID vertex,
+            Configuration conf,
+            Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+            SortedMap<Instant, ScalingSummary> history,
+            int currentParallelism,
+            int newParallelism) {
+
+        // If we don't have past scaling actions for this vertex, there is 
nothing to do
+        if (history.isEmpty()) {
+            return false;
+        }
+
+        boolean scaleUp = currentParallelism < newParallelism;
+
+        return detectImmediateScaleDownAfterScaleUp(vertex, conf, history, 
scaleUp)
+                || detectIneffectiveScaleup(vertex, conf, evaluatedMetrics, 
history, scaleUp);
+    }
+
+    private boolean detectImmediateScaleDownAfterScaleUp(
+            JobVertexID vertex,
+            Configuration conf,
+            SortedMap<Instant, ScalingSummary> history,
+            boolean scaleUp) {

Review Comment:
   This parameter is odd, I'd rather handle this by the caller, i.e. not call 
this method on scale down or no parallelism change.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java:
##########
@@ -120,7 +120,7 @@ public CollectedMetrics updateMetrics(
                             if (h == null) {
                                 h = scalingInformation.getMetricHistory();
                             }
-                            return h.tailMap(now.minus(metricsWindowSize));
+                            return new 
TreeMap<>(h.tailMap(now.minus(metricsWindowSize)));

Review Comment:
   No objections but did returning a subset cause issues?



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Map;
+import java.util.SortedMap;
+
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+
+/** Component responsible for computing vertex parallelism based on the 
scaling metrics. */
+public class JobVertexScaler {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JobVertexScaler.class);
+
+    private Clock clock = Clock.system(ZoneId.systemDefault());
+
+    public int computeScaleTargetParallelism(
+            Configuration conf,
+            JobVertexID vertex,
+            Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+            SortedMap<Instant, ScalingSummary> history) {
+
+        var currentParallelism = (int) 
evaluatedMetrics.get(PARALLELISM).getCurrent();
+        double averageTrueProcessingRate = 
evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
+        if (Double.isNaN(averageTrueProcessingRate)) {
+            LOG.info(
+                    "True processing rate is not available for {}, cannot 
compute new parallelism",
+                    vertex);
+            return currentParallelism;
+        }
+
+        double targetCapacity =
+                AutoScalerUtils.getTargetProcessingCapacity(
+                        evaluatedMetrics, conf, conf.get(TARGET_UTILIZATION), 
true);
+        if (Double.isNaN(targetCapacity)) {
+            LOG.info(
+                    "Target data rate is not available for {}, cannot compute 
new parallelism",
+                    vertex);
+            return currentParallelism;
+        }
+
+        LOG.info("Target processing capacity for {} is {}", vertex, 
targetCapacity);
+        double scaleFactor = targetCapacity / averageTrueProcessingRate;
+        double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
+        if (scaleFactor < minScaleFactor) {
+            LOG.info(
+                    "Computed scale factor of {} for {} is capped by maximum 
scale down factor to {}",
+                    scaleFactor,
+                    vertex,
+                    minScaleFactor);
+            scaleFactor = minScaleFactor;
+        }
+
+        int newParallelism =
+                scale(
+                        currentParallelism,
+                        (int) 
evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
+                        scaleFactor,
+                        conf.getInteger(VERTEX_MIN_PARALLELISM),
+                        conf.getInteger(VERTEX_MAX_PARALLELISM));
+
+        if (newParallelism == currentParallelism
+                || blockScalingBasedOnPastActions(
+                        vertex,
+                        conf,
+                        evaluatedMetrics,
+                        history,
+                        currentParallelism,
+                        newParallelism)) {
+            return currentParallelism;
+        }
+
+        // We record our expectations for this scaling operation
+        evaluatedMetrics.put(
+                ScalingMetric.EXPECTED_PROCESSING_RATE, 
EvaluatedScalingMetric.of(targetCapacity));
+        return newParallelism;
+    }
+
+    private boolean blockScalingBasedOnPastActions(
+            JobVertexID vertex,
+            Configuration conf,
+            Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+            SortedMap<Instant, ScalingSummary> history,
+            int currentParallelism,
+            int newParallelism) {
+
+        // If we don't have past scaling actions for this vertex, there is 
nothing to do
+        if (history.isEmpty()) {
+            return false;
+        }
+
+        boolean scaleUp = currentParallelism < newParallelism;
+
+        return detectImmediateScaleDownAfterScaleUp(vertex, conf, history, 
scaleUp)
+                || detectIneffectiveScaleup(vertex, conf, evaluatedMetrics, 
history, scaleUp);
+    }
+
+    private boolean detectImmediateScaleDownAfterScaleUp(
+            JobVertexID vertex,
+            Configuration conf,
+            SortedMap<Instant, ScalingSummary> history,
+            boolean scaleUp) {
+
+        // Does not apply to scale up operations
+        if (scaleUp) {
+            return false;
+        }
+
+        var lastScalingTs = history.lastKey();
+        var lastSummary = history.get(lastScalingTs);
+
+        boolean lastScaleUp = lastSummary.getNewParallelism() > 
lastSummary.getCurrentParallelism();
+        if (!lastScaleUp) {
+            return false;
+        }
+
+        var gracePeriod = conf.get(SCALE_UP_GRACE_PERIOD);
+        if (lastScalingTs.plus(gracePeriod).isAfter(clock.instant())) {
+            LOG.info(
+                    "Skipping immediate scale down after scale up within grace 
period for {}",
+                    vertex);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    private boolean detectIneffectiveScaleup(
+            JobVertexID vertex,
+            Configuration conf,
+            Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+            SortedMap<Instant, ScalingSummary> history,
+            boolean scaleUp) {
+
+        // Does not apply to scale down operations
+        if (!scaleUp) {
+            return false;
+        }
+
+        var lastScalingTs = history.lastKey();
+        var lastSummary = history.get(lastScalingTs);
+
+        boolean lastScaleUp = lastSummary.getNewParallelism() > 
lastSummary.getCurrentParallelism();
+        if (!lastScaleUp) {
+            return false;
+        }
+
+        double lastProcRate = 
lastSummary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage();
+        double lastExpectedProcRate =
+                
lastSummary.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent();
+        var currentProcRate = 
evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
+
+        double expectedIncrease = lastExpectedProcRate / lastProcRate - 1;
+        double actualIncrease = currentProcRate / lastProcRate - 1;
+
+        boolean withinEffectiveThreshold =
+                (expectedIncrease - actualIncrease) / expectedIncrease
+                        < (1 - 
conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD));

Review Comment:
   Maybe adding comments would help to explain what your formula does beyond 
this.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Map;
+import java.util.SortedMap;
+
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+
+/** Component responsible for computing vertex parallelism based on the 
scaling metrics. */
+public class JobVertexScaler {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JobVertexScaler.class);
+
+    private Clock clock = Clock.system(ZoneId.systemDefault());
+
+    public int computeScaleTargetParallelism(
+            Configuration conf,
+            JobVertexID vertex,
+            Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+            SortedMap<Instant, ScalingSummary> history) {
+
+        var currentParallelism = (int) 
evaluatedMetrics.get(PARALLELISM).getCurrent();
+        double averageTrueProcessingRate = 
evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
+        if (Double.isNaN(averageTrueProcessingRate)) {
+            LOG.info(
+                    "True processing rate is not available for {}, cannot 
compute new parallelism",
+                    vertex);
+            return currentParallelism;
+        }
+
+        double targetCapacity =
+                AutoScalerUtils.getTargetProcessingCapacity(
+                        evaluatedMetrics, conf, conf.get(TARGET_UTILIZATION), 
true);
+        if (Double.isNaN(targetCapacity)) {
+            LOG.info(
+                    "Target data rate is not available for {}, cannot compute 
new parallelism",
+                    vertex);
+            return currentParallelism;
+        }
+
+        LOG.info("Target processing capacity for {} is {}", vertex, 
targetCapacity);
+        double scaleFactor = targetCapacity / averageTrueProcessingRate;
+        double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
+        if (scaleFactor < minScaleFactor) {
+            LOG.info(
+                    "Computed scale factor of {} for {} is capped by maximum 
scale down factor to {}",
+                    scaleFactor,
+                    vertex,
+                    minScaleFactor);
+            scaleFactor = minScaleFactor;
+        }
+
+        int newParallelism =
+                scale(
+                        currentParallelism,
+                        (int) 
evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
+                        scaleFactor,
+                        conf.getInteger(VERTEX_MIN_PARALLELISM),
+                        conf.getInteger(VERTEX_MAX_PARALLELISM));
+
+        if (newParallelism == currentParallelism
+                || blockScalingBasedOnPastActions(
+                        vertex,
+                        conf,
+                        evaluatedMetrics,
+                        history,
+                        currentParallelism,
+                        newParallelism)) {
+            return currentParallelism;
+        }
+
+        // We record our expectations for this scaling operation
+        evaluatedMetrics.put(
+                ScalingMetric.EXPECTED_PROCESSING_RATE, 
EvaluatedScalingMetric.of(targetCapacity));
+        return newParallelism;
+    }
+
+    private boolean blockScalingBasedOnPastActions(
+            JobVertexID vertex,
+            Configuration conf,
+            Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+            SortedMap<Instant, ScalingSummary> history,
+            int currentParallelism,
+            int newParallelism) {
+
+        // If we don't have past scaling actions for this vertex, there is 
nothing to do
+        if (history.isEmpty()) {
+            return false;
+        }
+
+        boolean scaleUp = currentParallelism < newParallelism;
+
+        return detectImmediateScaleDownAfterScaleUp(vertex, conf, history, 
scaleUp)
+                || detectIneffectiveScaleup(vertex, conf, evaluatedMetrics, 
history, scaleUp);
+    }
+
+    private boolean detectImmediateScaleDownAfterScaleUp(
+            JobVertexID vertex,
+            Configuration conf,
+            SortedMap<Instant, ScalingSummary> history,
+            boolean scaleUp) {
+
+        // Does not apply to scale up operations
+        if (scaleUp) {
+            return false;
+        }
+
+        var lastScalingTs = history.lastKey();
+        var lastSummary = history.get(lastScalingTs);
+
+        boolean lastScaleUp = lastSummary.getNewParallelism() > 
lastSummary.getCurrentParallelism();
+        if (!lastScaleUp) {
+            return false;
+        }
+
+        var gracePeriod = conf.get(SCALE_UP_GRACE_PERIOD);
+        if (lastScalingTs.plus(gracePeriod).isAfter(clock.instant())) {
+            LOG.info(
+                    "Skipping immediate scale down after scale up within grace 
period for {}",
+                    vertex);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    private boolean detectIneffectiveScaleup(
+            JobVertexID vertex,
+            Configuration conf,
+            Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+            SortedMap<Instant, ScalingSummary> history,
+            boolean scaleUp) {
+
+        // Does not apply to scale down operations
+        if (!scaleUp) {
+            return false;
+        }
+
+        var lastScalingTs = history.lastKey();
+        var lastSummary = history.get(lastScalingTs);
+
+        boolean lastScaleUp = lastSummary.getNewParallelism() > 
lastSummary.getCurrentParallelism();
+        if (!lastScaleUp) {
+            return false;
+        }
+
+        double lastProcRate = 
lastSummary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage();
+        double lastExpectedProcRate =
+                
lastSummary.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent();
+        var currentProcRate = 
evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
+
+        double expectedIncrease = lastExpectedProcRate / lastProcRate - 1;
+        double actualIncrease = currentProcRate / lastProcRate - 1;
+
+        boolean withinEffectiveThreshold =
+                (expectedIncrease - actualIncrease) / expectedIncrease
+                        < (1 - 
conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD));
+
+        if (withinEffectiveThreshold) {
+            return false;
+        }
+
+        // TODO: Trigger kube event
+
+        if 
(conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
+            LOG.info(
+                    "Skipping further scale up after ineffective previous 
scale up for {}", vertex);
+            return true;

Review Comment:
   This condition should come earlier in the routine.



##########
examples/autoscaling/src/main/java/autoscaling/AutoscalingExample.java:
##########
@@ -29,10 +30,19 @@ public static void main(String[] args) throws Exception {
         stream =
                 stream.shuffle()
                         .map(
-                                i -> {
-                                    // Add sleep to artificially slow down 
processing
-                                    // Thread.sleep(sleep);
-                                    return i;
+                                new RichMapFunction<Long, Long>() {
+                                    @Override
+                                    public Long map(Long i) throws Exception {
+                                        var start = System.nanoTime();
+                                        var end = System.nanoTime();
+                                        while (end
+                                                < start
+                                                        + getRuntimeContext()
+                                                                
.getNumberOfParallelSubtasks()) {
+                                            end = System.nanoTime();

Review Comment:
   Why this change, is this here to produce load by calling System.nanoTime()?



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Map;
+import java.util.SortedMap;
+
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+
+/** Component responsible for computing vertex parallelism based on the 
scaling metrics. */
+public class JobVertexScaler {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JobVertexScaler.class);
+
+    private Clock clock = Clock.system(ZoneId.systemDefault());
+
+    public int computeScaleTargetParallelism(
+            Configuration conf,
+            JobVertexID vertex,
+            Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+            SortedMap<Instant, ScalingSummary> history) {
+
+        var currentParallelism = (int) 
evaluatedMetrics.get(PARALLELISM).getCurrent();
+        double averageTrueProcessingRate = 
evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
+        if (Double.isNaN(averageTrueProcessingRate)) {
+            LOG.info(
+                    "True processing rate is not available for {}, cannot 
compute new parallelism",
+                    vertex);
+            return currentParallelism;
+        }
+
+        double targetCapacity =
+                AutoScalerUtils.getTargetProcessingCapacity(
+                        evaluatedMetrics, conf, conf.get(TARGET_UTILIZATION), 
true);
+        if (Double.isNaN(targetCapacity)) {
+            LOG.info(
+                    "Target data rate is not available for {}, cannot compute 
new parallelism",
+                    vertex);
+            return currentParallelism;
+        }
+
+        LOG.info("Target processing capacity for {} is {}", vertex, 
targetCapacity);
+        double scaleFactor = targetCapacity / averageTrueProcessingRate;
+        double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
+        if (scaleFactor < minScaleFactor) {
+            LOG.info(
+                    "Computed scale factor of {} for {} is capped by maximum 
scale down factor to {}",
+                    scaleFactor,
+                    vertex,
+                    minScaleFactor);
+            scaleFactor = minScaleFactor;
+        }
+
+        int newParallelism =
+                scale(
+                        currentParallelism,
+                        (int) 
evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
+                        scaleFactor,
+                        conf.getInteger(VERTEX_MIN_PARALLELISM),
+                        conf.getInteger(VERTEX_MAX_PARALLELISM));
+
+        if (newParallelism == currentParallelism
+                || blockScalingBasedOnPastActions(
+                        vertex,
+                        conf,
+                        evaluatedMetrics,
+                        history,
+                        currentParallelism,
+                        newParallelism)) {
+            return currentParallelism;
+        }
+
+        // We record our expectations for this scaling operation
+        evaluatedMetrics.put(
+                ScalingMetric.EXPECTED_PROCESSING_RATE, 
EvaluatedScalingMetric.of(targetCapacity));
+        return newParallelism;
+    }
+
+    private boolean blockScalingBasedOnPastActions(
+            JobVertexID vertex,
+            Configuration conf,
+            Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+            SortedMap<Instant, ScalingSummary> history,
+            int currentParallelism,
+            int newParallelism) {
+
+        // If we don't have past scaling actions for this vertex, there is 
nothing to do
+        if (history.isEmpty()) {
+            return false;
+        }
+
+        boolean scaleUp = currentParallelism < newParallelism;
+
+        return detectImmediateScaleDownAfterScaleUp(vertex, conf, history, 
scaleUp)
+                || detectIneffectiveScaleup(vertex, conf, evaluatedMetrics, 
history, scaleUp);
+    }
+
+    private boolean detectImmediateScaleDownAfterScaleUp(
+            JobVertexID vertex,
+            Configuration conf,
+            SortedMap<Instant, ScalingSummary> history,
+            boolean scaleUp) {
+
+        // Does not apply to scale up operations
+        if (scaleUp) {
+            return false;
+        }
+
+        var lastScalingTs = history.lastKey();
+        var lastSummary = history.get(lastScalingTs);

Review Comment:
   I think we need to handle if the history doesn't contain the vertex. We 
currently only yield scaling summaries for changes vertices. Also, if this is 
the first time scaling, the history won't show the vertex.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Map;
+import java.util.SortedMap;
+
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+
+/** Component responsible for computing vertex parallelism based on the 
scaling metrics. */
+public class JobVertexScaler {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JobVertexScaler.class);
+
+    private Clock clock = Clock.system(ZoneId.systemDefault());
+
+    public int computeScaleTargetParallelism(
+            Configuration conf,
+            JobVertexID vertex,
+            Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+            SortedMap<Instant, ScalingSummary> history) {
+
+        var currentParallelism = (int) 
evaluatedMetrics.get(PARALLELISM).getCurrent();
+        double averageTrueProcessingRate = 
evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
+        if (Double.isNaN(averageTrueProcessingRate)) {
+            LOG.info(
+                    "True processing rate is not available for {}, cannot 
compute new parallelism",
+                    vertex);
+            return currentParallelism;
+        }
+
+        double targetCapacity =
+                AutoScalerUtils.getTargetProcessingCapacity(
+                        evaluatedMetrics, conf, conf.get(TARGET_UTILIZATION), 
true);
+        if (Double.isNaN(targetCapacity)) {
+            LOG.info(
+                    "Target data rate is not available for {}, cannot compute 
new parallelism",
+                    vertex);
+            return currentParallelism;
+        }
+
+        LOG.info("Target processing capacity for {} is {}", vertex, 
targetCapacity);
+        double scaleFactor = targetCapacity / averageTrueProcessingRate;
+        double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
+        if (scaleFactor < minScaleFactor) {
+            LOG.info(
+                    "Computed scale factor of {} for {} is capped by maximum 
scale down factor to {}",
+                    scaleFactor,
+                    vertex,
+                    minScaleFactor);
+            scaleFactor = minScaleFactor;
+        }
+
+        int newParallelism =
+                scale(
+                        currentParallelism,
+                        (int) 
evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
+                        scaleFactor,
+                        conf.getInteger(VERTEX_MIN_PARALLELISM),
+                        conf.getInteger(VERTEX_MAX_PARALLELISM));
+
+        if (newParallelism == currentParallelism
+                || blockScalingBasedOnPastActions(
+                        vertex,
+                        conf,
+                        evaluatedMetrics,
+                        history,
+                        currentParallelism,
+                        newParallelism)) {
+            return currentParallelism;
+        }
+
+        // We record our expectations for this scaling operation
+        evaluatedMetrics.put(
+                ScalingMetric.EXPECTED_PROCESSING_RATE, 
EvaluatedScalingMetric.of(targetCapacity));
+        return newParallelism;
+    }
+
+    private boolean blockScalingBasedOnPastActions(
+            JobVertexID vertex,
+            Configuration conf,
+            Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+            SortedMap<Instant, ScalingSummary> history,
+            int currentParallelism,
+            int newParallelism) {
+
+        // If we don't have past scaling actions for this vertex, there is 
nothing to do
+        if (history.isEmpty()) {
+            return false;
+        }
+
+        boolean scaleUp = currentParallelism < newParallelism;
+
+        return detectImmediateScaleDownAfterScaleUp(vertex, conf, history, 
scaleUp)
+                || detectIneffectiveScaleup(vertex, conf, evaluatedMetrics, 
history, scaleUp);
+    }
+
+    private boolean detectImmediateScaleDownAfterScaleUp(
+            JobVertexID vertex,
+            Configuration conf,
+            SortedMap<Instant, ScalingSummary> history,
+            boolean scaleUp) {
+
+        // Does not apply to scale up operations
+        if (scaleUp) {
+            return false;
+        }
+
+        var lastScalingTs = history.lastKey();
+        var lastSummary = history.get(lastScalingTs);
+
+        boolean lastScaleUp = lastSummary.getNewParallelism() > 
lastSummary.getCurrentParallelism();
+        if (!lastScaleUp) {

Review Comment:
   ```suggestion
           if (!lastScaledUp) {
   ```



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java:
##########
@@ -141,21 +141,30 @@ public CollectedMetrics updateMetrics(
         if 
(now.isBefore(stableTime.plus(conf.get(AutoScalerOptions.METRICS_WINDOW)))) {
             // As long as we haven't had time to collect a full window,
             // collect metrics but do not return any metrics
+            LOG.info("Waiting until initial metric window is full before 
starting scaling");

Review Comment:
   
   
   +1
   



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Map;
+import java.util.SortedMap;
+
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+
+/** Component responsible for computing vertex parallelism based on the 
scaling metrics. */
+public class JobVertexScaler {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JobVertexScaler.class);
+
+    private Clock clock = Clock.system(ZoneId.systemDefault());
+
+    public int computeScaleTargetParallelism(
+            Configuration conf,
+            JobVertexID vertex,
+            Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+            SortedMap<Instant, ScalingSummary> history) {
+
+        var currentParallelism = (int) 
evaluatedMetrics.get(PARALLELISM).getCurrent();
+        double averageTrueProcessingRate = 
evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
+        if (Double.isNaN(averageTrueProcessingRate)) {
+            LOG.info(
+                    "True processing rate is not available for {}, cannot 
compute new parallelism",
+                    vertex);
+            return currentParallelism;
+        }
+
+        double targetCapacity =
+                AutoScalerUtils.getTargetProcessingCapacity(
+                        evaluatedMetrics, conf, conf.get(TARGET_UTILIZATION), 
true);
+        if (Double.isNaN(targetCapacity)) {
+            LOG.info(
+                    "Target data rate is not available for {}, cannot compute 
new parallelism",
+                    vertex);
+            return currentParallelism;
+        }
+
+        LOG.info("Target processing capacity for {} is {}", vertex, 
targetCapacity);
+        double scaleFactor = targetCapacity / averageTrueProcessingRate;
+        double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
+        if (scaleFactor < minScaleFactor) {
+            LOG.info(
+                    "Computed scale factor of {} for {} is capped by maximum 
scale down factor to {}",
+                    scaleFactor,
+                    vertex,
+                    minScaleFactor);
+            scaleFactor = minScaleFactor;
+        }
+
+        int newParallelism =
+                scale(
+                        currentParallelism,
+                        (int) 
evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
+                        scaleFactor,
+                        conf.getInteger(VERTEX_MIN_PARALLELISM),
+                        conf.getInteger(VERTEX_MAX_PARALLELISM));
+
+        if (newParallelism == currentParallelism
+                || blockScalingBasedOnPastActions(
+                        vertex,
+                        conf,
+                        evaluatedMetrics,
+                        history,
+                        currentParallelism,
+                        newParallelism)) {
+            return currentParallelism;
+        }
+
+        // We record our expectations for this scaling operation
+        evaluatedMetrics.put(
+                ScalingMetric.EXPECTED_PROCESSING_RATE, 
EvaluatedScalingMetric.of(targetCapacity));
+        return newParallelism;
+    }
+
+    private boolean blockScalingBasedOnPastActions(
+            JobVertexID vertex,
+            Configuration conf,
+            Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+            SortedMap<Instant, ScalingSummary> history,
+            int currentParallelism,
+            int newParallelism) {
+
+        // If we don't have past scaling actions for this vertex, there is 
nothing to do
+        if (history.isEmpty()) {
+            return false;
+        }
+
+        boolean scaleUp = currentParallelism < newParallelism;
+
+        return detectImmediateScaleDownAfterScaleUp(vertex, conf, history, 
scaleUp)
+                || detectIneffectiveScaleup(vertex, conf, evaluatedMetrics, 
history, scaleUp);
+    }
+
+    private boolean detectImmediateScaleDownAfterScaleUp(
+            JobVertexID vertex,
+            Configuration conf,
+            SortedMap<Instant, ScalingSummary> history,
+            boolean scaleUp) {
+
+        // Does not apply to scale up operations
+        if (scaleUp) {
+            return false;
+        }
+
+        var lastScalingTs = history.lastKey();
+        var lastSummary = history.get(lastScalingTs);
+
+        boolean lastScaleUp = lastSummary.getNewParallelism() > 
lastSummary.getCurrentParallelism();
+        if (!lastScaleUp) {
+            return false;
+        }
+
+        var gracePeriod = conf.get(SCALE_UP_GRACE_PERIOD);
+        if (lastScalingTs.plus(gracePeriod).isAfter(clock.instant())) {
+            LOG.info(
+                    "Skipping immediate scale down after scale up within grace 
period for {}",
+                    vertex);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    private boolean detectIneffectiveScaleup(
+            JobVertexID vertex,
+            Configuration conf,
+            Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+            SortedMap<Instant, ScalingSummary> history,
+            boolean scaleUp) {

Review Comment:
   Same here, would remove this parameter and handle this by the caller.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Map;
+import java.util.SortedMap;
+
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+
+/** Component responsible for computing vertex parallelism based on the 
scaling metrics. */
+public class JobVertexScaler {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JobVertexScaler.class);
+
+    private Clock clock = Clock.system(ZoneId.systemDefault());
+
+    public int computeScaleTargetParallelism(
+            Configuration conf,
+            JobVertexID vertex,
+            Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+            SortedMap<Instant, ScalingSummary> history) {
+
+        var currentParallelism = (int) 
evaluatedMetrics.get(PARALLELISM).getCurrent();
+        double averageTrueProcessingRate = 
evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
+        if (Double.isNaN(averageTrueProcessingRate)) {
+            LOG.info(
+                    "True processing rate is not available for {}, cannot 
compute new parallelism",
+                    vertex);
+            return currentParallelism;
+        }
+
+        double targetCapacity =
+                AutoScalerUtils.getTargetProcessingCapacity(
+                        evaluatedMetrics, conf, conf.get(TARGET_UTILIZATION), 
true);
+        if (Double.isNaN(targetCapacity)) {
+            LOG.info(
+                    "Target data rate is not available for {}, cannot compute 
new parallelism",
+                    vertex);
+            return currentParallelism;
+        }
+
+        LOG.info("Target processing capacity for {} is {}", vertex, 
targetCapacity);
+        double scaleFactor = targetCapacity / averageTrueProcessingRate;
+        double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
+        if (scaleFactor < minScaleFactor) {
+            LOG.info(
+                    "Computed scale factor of {} for {} is capped by maximum 
scale down factor to {}",
+                    scaleFactor,
+                    vertex,
+                    minScaleFactor);
+            scaleFactor = minScaleFactor;
+        }
+
+        int newParallelism =
+                scale(
+                        currentParallelism,
+                        (int) 
evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
+                        scaleFactor,
+                        conf.getInteger(VERTEX_MIN_PARALLELISM),
+                        conf.getInteger(VERTEX_MAX_PARALLELISM));
+
+        if (newParallelism == currentParallelism
+                || blockScalingBasedOnPastActions(
+                        vertex,
+                        conf,
+                        evaluatedMetrics,
+                        history,
+                        currentParallelism,
+                        newParallelism)) {
+            return currentParallelism;
+        }
+
+        // We record our expectations for this scaling operation
+        evaluatedMetrics.put(
+                ScalingMetric.EXPECTED_PROCESSING_RATE, 
EvaluatedScalingMetric.of(targetCapacity));
+        return newParallelism;
+    }
+
+    private boolean blockScalingBasedOnPastActions(
+            JobVertexID vertex,
+            Configuration conf,
+            Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+            SortedMap<Instant, ScalingSummary> history,
+            int currentParallelism,
+            int newParallelism) {
+
+        // If we don't have past scaling actions for this vertex, there is 
nothing to do
+        if (history.isEmpty()) {
+            return false;
+        }
+
+        boolean scaleUp = currentParallelism < newParallelism;
+
+        return detectImmediateScaleDownAfterScaleUp(vertex, conf, history, 
scaleUp)
+                || detectIneffectiveScaleup(vertex, conf, evaluatedMetrics, 
history, scaleUp);
+    }
+
+    private boolean detectImmediateScaleDownAfterScaleUp(
+            JobVertexID vertex,
+            Configuration conf,
+            SortedMap<Instant, ScalingSummary> history,
+            boolean scaleUp) {
+
+        // Does not apply to scale up operations
+        if (scaleUp) {
+            return false;
+        }
+
+        var lastScalingTs = history.lastKey();
+        var lastSummary = history.get(lastScalingTs);
+
+        boolean lastScaleUp = lastSummary.getNewParallelism() > 
lastSummary.getCurrentParallelism();
+        if (!lastScaleUp) {
+            return false;
+        }
+
+        var gracePeriod = conf.get(SCALE_UP_GRACE_PERIOD);
+        if (lastScalingTs.plus(gracePeriod).isAfter(clock.instant())) {
+            LOG.info(
+                    "Skipping immediate scale down after scale up within grace 
period for {}",
+                    vertex);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    private boolean detectIneffectiveScaleup(
+            JobVertexID vertex,
+            Configuration conf,
+            Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+            SortedMap<Instant, ScalingSummary> history,
+            boolean scaleUp) {
+
+        // Does not apply to scale down operations
+        if (!scaleUp) {
+            return false;
+        }
+
+        var lastScalingTs = history.lastKey();
+        var lastSummary = history.get(lastScalingTs);
+
+        boolean lastScaleUp = lastSummary.getNewParallelism() > 
lastSummary.getCurrentParallelism();
+        if (!lastScaleUp) {
+            return false;
+        }
+
+        double lastProcRate = 
lastSummary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage();
+        double lastExpectedProcRate =
+                
lastSummary.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent();
+        var currentProcRate = 
evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
+
+        double expectedIncrease = lastExpectedProcRate / lastProcRate - 1;
+        double actualIncrease = currentProcRate / lastProcRate - 1;
+
+        boolean withinEffectiveThreshold =
+                (expectedIncrease - actualIncrease) / expectedIncrease
+                        < (1 - 
conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD));

Review Comment:
   I don't understand this formula. If you have a config option which says, you 
want at least x % increase, I'd imagine this to work as follows:
   
   ```
   percentage_change = actual_rate / expected_rate - 1
   if (percentage_change < 
conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD)) {
     return false;
   } else {
      // uneffective scaling detected
   }
   ```



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java:
##########
@@ -387,19 +389,18 @@ protected Map<String, FlinkMetric> 
getFilteredVertexMetricNames(
         }
 
         requiredMetrics.forEach(
-                flinkMetric -> {
-                    filteredMetrics.put(
-                            flinkMetric
-                                    .findAny(allMetricNames)
-                                    .orElseThrow(
-                                            () ->
-                                                    new RuntimeException(
-                                                            "Could not find 
required metric "
-                                                                    + 
flinkMetric.name()
-                                                                    + " for "
-                                                                    + 
jobVertexID)),
-                            flinkMetric);
-                });
+                flinkMetric ->
+                        filteredMetrics.put(
+                                flinkMetric
+                                        .findAny(allMetricNames)
+                                        .orElseThrow(
+                                                () ->
+                                                        new RuntimeException(
+                                                                "Could not 
find required metric "
+                                                                        + 
flinkMetric.name()
+                                                                        + " 
for "
+                                                                        + 
jobVertexID)),
+                                flinkMetric));

Review Comment:
   What is the change here? Indention?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to