This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 27daa5ac [FLINK-30571] Estimate scalability coefficient from past 
scaling history using linear regression (#966)
27daa5ac is described below

commit 27daa5ac23f65fd46f6345dadd8118235af7a508
Author: pchoudhury22 <[email protected]>
AuthorDate: Tue May 6 21:23:04 2025 +0530

    [FLINK-30571] Estimate scalability coefficient from past scaling history 
using linear regression (#966)
    
    Currently, target parallelism computation assumes perfect linear scaling. 
However, real-time workloads often exhibit nonlinear scalability due to factors 
like network overhead and coordination costs.
    
    This change introduces an observed scalability coefficient, estimated using 
linear regression on past (parallelism, processing rate) data, to improve the 
accuracy of scaling decisions.
---
 .../generated/auto_scaler_configuration.html       |  18 ++
 .../apache/flink/autoscaler/JobVertexScaler.java   |  90 +++++++++
 .../flink/autoscaler/config/AutoScalerOptions.java |  36 ++++
 .../flink/autoscaler/utils/AutoScalerUtils.java    |  90 +++++++++
 .../flink/autoscaler/JobVertexScalerTest.java      | 211 +++++++++++++++++++++
 .../operator/validation/DefaultValidator.java      |   2 +
 .../operator/validation/DefaultValidatorTest.java  |  15 ++
 7 files changed, 462 insertions(+)

diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html 
b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
index ab2bbcb2..3c12ee10 100644
--- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
+++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
@@ -98,6 +98,24 @@
             <td>Duration</td>
             <td>Scaling metrics aggregation window size.</td>
         </tr>
+        <tr>
+            
<td><h5>job.autoscaler.observed-scalability.coefficient-min</h5></td>
+            <td style="word-wrap: break-word;">0.5</td>
+            <td>Double</td>
+            <td>Minimum allowed value for the observed scalability 
coefficient. Prevents aggressive scaling by clamping low coefficient estimates. 
If the estimated coefficient falls below this value, it is capped at the 
configured minimum.</td>
+        </tr>
+        <tr>
+            <td><h5>job.autoscaler.observed-scalability.enabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Enables the use of an observed scalability coefficient when 
computing target parallelism. If enabled, the system will estimate the 
scalability coefficient based on historical scaling data instead of assuming 
perfect linear scaling. This helps account for real-world inefficiencies such 
as network overhead and coordination costs.</td>
+        </tr>
+        <tr>
+            
<td><h5>job.autoscaler.observed-scalability.min-observations</h5></td>
+            <td style="word-wrap: break-word;">3</td>
+            <td>Integer</td>
+            <td>Defines the minimum number of historical scaling observations 
required to estimate the scalability coefficient. If the number of available 
observations is below this threshold, the system falls back to assuming linear 
scaling. Note: To effectively use a higher minimum observation count, you need 
to increase job.autoscaler.history.max.count. Avoid setting 
job.autoscaler.history.max.count to a very high value, as the number of 
retained data points is limited by the size of  [...]
+        </tr>
         <tr>
             
<td><h5>job.autoscaler.observed-true-processing-rate.lag-threshold</h5></td>
             <td style="word-wrap: break-word;">30 s</td>
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
index 4c185f89..492615f4 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
@@ -34,11 +34,15 @@ import lombok.Getter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.math.BigDecimal;
+import java.math.RoundingMode;
 import java.time.Clock;
 import java.time.Duration;
 import java.time.Instant;
 import java.time.ZoneId;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.SortedMap;
@@ -46,6 +50,8 @@ import java.util.SortedMap;
 import static 
org.apache.flink.autoscaler.JobVertexScaler.KeyGroupOrPartitionsAdjustMode.MAXIMIZE_UTILISATION;
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.OBSERVED_SCALABILITY_ENABLED;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.OBSERVED_SCALABILITY_MIN_OBSERVATIONS;
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_DOWN_INTERVAL;
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE;
@@ -178,6 +184,13 @@ public class JobVertexScaler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
 
         LOG.debug("Target processing capacity for {} is {}", vertex, 
targetCapacity);
         double scaleFactor = targetCapacity / averageTrueProcessingRate;
+        if (conf.get(OBSERVED_SCALABILITY_ENABLED)) {
+
+            double scalingCoefficient =
+                    
JobVertexScaler.calculateObservedScalingCoefficient(history, conf);
+
+            scaleFactor = scaleFactor / scalingCoefficient;
+        }
         double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
         double maxScaleFactor = 1 + conf.get(MAX_SCALE_UP_FACTOR);
         if (scaleFactor < minScaleFactor) {
@@ -236,6 +249,83 @@ public class JobVertexScaler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
                 delayedScaleDown);
     }
 
+    /**
+     * Calculates the scaling coefficient based on historical scaling data.
+     *
+     * <p>The scaling coefficient is computed using the least squares 
approach. If there are not
+     * enough observations, or if the computed coefficient is invalid, a 
default value of {@code
+     * 1.0} is returned, assuming linear scaling.
+     *
+     * @param history A {@code SortedMap} of {@code Instant} timestamps to 
{@code ScalingSummary}
+     * @param conf Deployment configuration.
+     * @return The computed scaling coefficient.
+     */
+    @VisibleForTesting
+    protected static double calculateObservedScalingCoefficient(
+            SortedMap<Instant, ScalingSummary> history, Configuration conf) {
+        /*
+         * The scaling coefficient is computed using the least squares approach
+         * to fit a linear model:
+         *
+         *      R_i = β * P_i * α
+         *
+         * where:
+         * - R_i = observed processing rate
+         * - P_i = parallelism
+         * - β   = baseline processing rate
+         * - α   = scaling coefficient to optimize
+         *
+         * The optimization minimizes the **sum of squared errors**:
+         *
+         *      Loss = ∑ (R_i - β * α * P_i)^2
+         *
+         * Differentiating w.r.t. α and solving for α:
+         *
+         *      α = ∑ (P_i * R_i) / (∑ (P_i^2) * β)
+         *
+         * We keep the system conservative for higher returns scenario by 
clamping computed α to an upper bound of 1.0.
+         */
+
+        var minObservations = conf.get(OBSERVED_SCALABILITY_MIN_OBSERVATIONS);
+
+        // not enough data to compute scaling coefficient; we assume linear 
scaling.
+        if (history.isEmpty() || history.size() < minObservations) {
+            return 1.0;
+        }
+
+        var baselineProcessingRate = 
AutoScalerUtils.computeBaselineProcessingRate(history);
+
+        if (Double.isNaN(baselineProcessingRate)) {
+            return 1.0;
+        }
+
+        List<Double> parallelismList = new ArrayList<>();
+        List<Double> processingRateList = new ArrayList<>();
+
+        for (Map.Entry<Instant, ScalingSummary> entry : history.entrySet()) {
+            ScalingSummary summary = entry.getValue();
+            double parallelism = summary.getCurrentParallelism();
+            double processingRate = 
summary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage();
+
+            if (Double.isNaN(processingRate)) {
+                LOG.warn(
+                        "True processing rate is not available in scaling 
history. Cannot compute scaling coefficient.");
+                return 1.0;
+            }
+
+            parallelismList.add(parallelism);
+            processingRateList.add(processingRate);
+        }
+
+        double lowerBound = 
conf.get(AutoScalerOptions.OBSERVED_SCALABILITY_COEFFICIENT_MIN);
+
+        var coefficient =
+                AutoScalerUtils.optimizeLinearScalingCoefficient(
+                        parallelismList, processingRateList, 
baselineProcessingRate, 1, lowerBound);
+
+        return BigDecimal.valueOf(coefficient).setScale(2, 
RoundingMode.CEILING).doubleValue();
+    }
+
     private ParallelismChange detectBlockScaling(
             Context context,
             JobVertexID vertex,
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
index 980db2f4..a67bfd50 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
@@ -382,4 +382,40 @@ public class AutoScalerOptions {
                                             
"scaling.key-group.partitions.adjust.mode"))
                             .withDescription(
                                     "How to adjust the parallelism of Source 
vertex or upstream shuffle is keyBy");
+
+    public static final ConfigOption<Boolean> OBSERVED_SCALABILITY_ENABLED =
+            autoScalerConfig("observed-scalability.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    
.withFallbackKeys(oldOperatorConfigKey("observed-scalability.enabled"))
+                    .withDescription(
+                            "Enables the use of an observed scalability 
coefficient when computing target parallelism. "
+                                    + "If enabled, the system will estimate 
the scalability coefficient based on historical scaling data "
+                                    + "instead of assuming perfect linear 
scaling. "
+                                    + "This helps account for real-world 
inefficiencies such as network overhead and coordination costs.");
+
+    public static final ConfigOption<Integer> 
OBSERVED_SCALABILITY_MIN_OBSERVATIONS =
+            autoScalerConfig("observed-scalability.min-observations")
+                    .intType()
+                    .defaultValue(3)
+                    
.withFallbackKeys(oldOperatorConfigKey("observed-scalability.min-observations"))
+                    .withDescription(
+                            "Defines the minimum number of historical scaling 
observations required to estimate the scalability coefficient. "
+                                    + "If the number of available observations 
is below this threshold, the system falls back to assuming linear scaling. "
+                                    + "Note: To effectively use a higher 
minimum observation count, you need to increase "
+                                    + VERTEX_SCALING_HISTORY_COUNT.key()
+                                    + ". Avoid setting "
+                                    + VERTEX_SCALING_HISTORY_COUNT.key()
+                                    + " to a very high value, as the number of 
retained data points is limited by the size of the state store—"
+                                    + "particularly when using 
Kubernetes-based state store.");
+
+    public static final ConfigOption<Double> 
OBSERVED_SCALABILITY_COEFFICIENT_MIN =
+            autoScalerConfig("observed-scalability.coefficient-min")
+                    .doubleType()
+                    .defaultValue(0.5)
+                    
.withFallbackKeys(oldOperatorConfigKey("observed-scalability.coefficient-min"))
+                    .withDescription(
+                            "Minimum allowed value for the observed 
scalability coefficient. "
+                                    + "Prevents aggressive scaling by clamping 
low coefficient estimates. "
+                                    + "If the estimated coefficient falls 
below this value, it is capped at the configured minimum.");
 }
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java
index 411ab9b2..837d429b 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.autoscaler.utils;
 
+import org.apache.flink.autoscaler.ScalingSummary;
 import org.apache.flink.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
 import org.apache.flink.autoscaler.metrics.ScalingMetric;
@@ -24,15 +25,19 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import java.time.Duration;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
 import java.util.Set;
+import java.util.SortedMap;
 
 import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
 import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
 
 /** AutoScaler utilities. */
 public class AutoScalerUtils {
@@ -94,4 +99,89 @@ public class AutoScalerUtils {
         conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, new 
ArrayList<>(excludedIds));
         return anyAdded;
     }
+
+    /**
+     * Computes the optimized linear scaling coefficient (α) by minimizing the 
least squares error.
+     *
+     * <p>This method estimates the scaling coefficient in a linear scaling 
model by fitting
+     * observed processing rates and parallelism levels.
+     *
+     * <p>The computed coefficient is clamped within the specified lower and 
upper bounds to ensure
+     * stability and prevent extreme scaling adjustments.
+     *
+     * @param parallelismLevels List of parallelism levels.
+     * @param processingRates List of observed processing rates.
+     * @param baselineProcessingRate Baseline processing rate.
+     * @param upperBound Maximum allowable value for the scaling coefficient.
+     * @param lowerBound Minimum allowable value for the scaling coefficient.
+     * @return The optimized scaling coefficient (α), constrained within 
{@code [lowerBound,
+     *     upperBound]}.
+     */
+    public static double optimizeLinearScalingCoefficient(
+            List<Double> parallelismLevels,
+            List<Double> processingRates,
+            double baselineProcessingRate,
+            double upperBound,
+            double lowerBound) {
+
+        double sum = 0.0;
+        double squaredSum = 0.0;
+
+        for (int i = 0; i < parallelismLevels.size(); i++) {
+            double parallelism = parallelismLevels.get(i);
+            double processingRate = processingRates.get(i);
+
+            sum += parallelism * processingRate;
+            squaredSum += parallelism * parallelism;
+        }
+
+        if (squaredSum == 0.0) {
+            return 1.0; // Fallback to linear scaling if denominator is zero
+        }
+
+        double alpha = sum / (squaredSum * baselineProcessingRate);
+
+        return Math.max(lowerBound, Math.min(upperBound, alpha));
+    }
+
+    /**
+     * Computes the baseline processing rate from historical scaling data.
+     *
+     * <p>The baseline processing rate represents the **processing rate per 
unit of parallelism**.
+     * It is determined using the smallest observed parallelism in the history.
+     *
+     * @param history A {@code SortedMap} where keys are timestamps ({@code 
Instant}), and values
+     *     are {@code ScalingSummary} objects.
+     * @return The computed baseline processing rate (processing rate per unit 
of parallelism).
+     */
+    public static double computeBaselineProcessingRate(SortedMap<Instant, 
ScalingSummary> history) {
+        ScalingSummary latestSmallestParallelismSummary = null;
+
+        for (Map.Entry<Instant, ScalingSummary> entry :
+                ((NavigableMap<Instant, ScalingSummary>) 
history).descendingMap().entrySet()) {
+            ScalingSummary summary = entry.getValue();
+            double parallelism = summary.getCurrentParallelism();
+
+            if (parallelism == 1) {
+                return 
summary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage();
+            }
+
+            if (latestSmallestParallelismSummary == null
+                    || parallelism < 
latestSmallestParallelismSummary.getCurrentParallelism()) {
+                latestSmallestParallelismSummary = entry.getValue();
+            }
+        }
+
+        if (latestSmallestParallelismSummary == null) {
+            return Double.NaN;
+        }
+
+        double parallelism = 
latestSmallestParallelismSummary.getCurrentParallelism();
+        double processingRate =
+                latestSmallestParallelismSummary
+                        .getMetrics()
+                        .get(TRUE_PROCESSING_RATE)
+                        .getAverage();
+        return processingRate / parallelism;
+    }
 }
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
index 9cdc7159..3d085e17 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
@@ -49,6 +49,8 @@ import static 
org.apache.flink.autoscaler.JobVertexScaler.INEFFECTIVE_MESSAGE_FO
 import static org.apache.flink.autoscaler.JobVertexScaler.INEFFECTIVE_SCALING;
 import static 
org.apache.flink.autoscaler.JobVertexScaler.SCALE_LIMITED_MESSAGE_FORMAT;
 import static org.apache.flink.autoscaler.JobVertexScaler.SCALING_LIMITED;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.OBSERVED_SCALABILITY_ENABLED;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.OBSERVED_SCALABILITY_MIN_OBSERVATIONS;
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
@@ -1156,4 +1158,213 @@ public class JobVertexScalerTest {
         ScalingMetricEvaluator.computeProcessingRateThresholds(metrics, conf, 
false, restartTime);
         return metrics;
     }
+
+    @Test
+    public void testCalculateScalingCoefficient() {
+        var currentTime = Instant.now();
+
+        var linearScalingHistory = new TreeMap<Instant, ScalingSummary>();
+        var linearScalingEvaluatedData1 = evaluated(4, 100, 200);
+        var linearScalingEvaluatedData2 = evaluated(2, 400, 100);
+        var linearScalingEvaluatedData3 = evaluated(8, 800, 400);
+
+        linearScalingHistory.put(
+                currentTime.minusSeconds(20),
+                new ScalingSummary(4, 2, linearScalingEvaluatedData1));
+        linearScalingHistory.put(
+                currentTime.minusSeconds(10),
+                new ScalingSummary(2, 8, linearScalingEvaluatedData2));
+        linearScalingHistory.put(
+                currentTime, new ScalingSummary(8, 16, 
linearScalingEvaluatedData3));
+
+        double linearScalingScalingCoefficient =
+                
JobVertexScaler.calculateObservedScalingCoefficient(linearScalingHistory, conf);
+
+        assertEquals(1.0, linearScalingScalingCoefficient);
+
+        var slightDiminishingReturnsScalingHistory = new TreeMap<Instant, 
ScalingSummary>();
+        var slightDiminishingReturnsEvaluatedData1 = evaluated(4, 98, 196);
+        var slightDiminishingReturnsEvaluatedData2 = evaluated(2, 396, 99);
+        var slightDiminishingReturnsEvaluatedData3 = evaluated(8, 780, 390);
+
+        slightDiminishingReturnsScalingHistory.put(
+                currentTime.minusSeconds(20),
+                new ScalingSummary(4, 2, 
slightDiminishingReturnsEvaluatedData1));
+        slightDiminishingReturnsScalingHistory.put(
+                currentTime.minusSeconds(10),
+                new ScalingSummary(2, 8, 
slightDiminishingReturnsEvaluatedData2));
+        slightDiminishingReturnsScalingHistory.put(
+                currentTime, new ScalingSummary(8, 16, 
slightDiminishingReturnsEvaluatedData3));
+
+        double slightDiminishingReturnsScalingCoefficient =
+                JobVertexScaler.calculateObservedScalingCoefficient(
+                        slightDiminishingReturnsScalingHistory, conf);
+
+        assertTrue(
+                slightDiminishingReturnsScalingCoefficient > 0.9
+                        && slightDiminishingReturnsScalingCoefficient < 1);
+
+        var sharpDiminishingReturnsScalingHistory = new TreeMap<Instant, 
ScalingSummary>();
+        var sharpDiminishingReturnsEvaluatedData1 = evaluated(4, 80, 160);
+        var sharpDiminishingReturnsEvaluatedData2 = evaluated(2, 384, 96);
+        var sharpDiminishingReturnsEvaluatedData3 = evaluated(8, 480, 240);
+
+        sharpDiminishingReturnsScalingHistory.put(
+                currentTime.minusSeconds(20),
+                new ScalingSummary(4, 2, 
sharpDiminishingReturnsEvaluatedData1));
+        sharpDiminishingReturnsScalingHistory.put(
+                currentTime.minusSeconds(10),
+                new ScalingSummary(2, 8, 
sharpDiminishingReturnsEvaluatedData2));
+        sharpDiminishingReturnsScalingHistory.put(
+                currentTime, new ScalingSummary(8, 16, 
sharpDiminishingReturnsEvaluatedData3));
+
+        double sharpDiminishingReturnsScalingCoefficient =
+                JobVertexScaler.calculateObservedScalingCoefficient(
+                        sharpDiminishingReturnsScalingHistory, conf);
+
+        assertTrue(
+                sharpDiminishingReturnsScalingCoefficient < 0.9
+                        && sharpDiminishingReturnsScalingCoefficient > 0.4);
+
+        var sharpDiminishingReturnsWithOneParallelismScalingHistory =
+                new TreeMap<Instant, ScalingSummary>();
+        var sharpDiminishingReturnsWithOneParallelismEvaluatedData1 = 
evaluated(1, 100, 50);
+        var sharpDiminishingReturnsWithOneParallelismEvaluatedData2 = 
evaluated(2, 160, 80);
+        var sharpDiminishingReturnsWithOneParallelismEvaluatedData3 = 
evaluated(4, 200, 100);
+
+        sharpDiminishingReturnsWithOneParallelismScalingHistory.put(
+                currentTime.minusSeconds(20),
+                new ScalingSummary(1, 2, 
sharpDiminishingReturnsWithOneParallelismEvaluatedData1));
+        sharpDiminishingReturnsWithOneParallelismScalingHistory.put(
+                currentTime.minusSeconds(10),
+                new ScalingSummary(2, 4, 
sharpDiminishingReturnsWithOneParallelismEvaluatedData2));
+        sharpDiminishingReturnsWithOneParallelismScalingHistory.put(
+                currentTime,
+                new ScalingSummary(4, 8, 
sharpDiminishingReturnsWithOneParallelismEvaluatedData3));
+
+        double sharpDiminishingReturnsWithOneParallelismScalingCoefficient =
+                JobVertexScaler.calculateObservedScalingCoefficient(
+                        
sharpDiminishingReturnsWithOneParallelismScalingHistory, conf);
+
+        assertTrue(
+                sharpDiminishingReturnsWithOneParallelismScalingCoefficient < 
0.9
+                        && 
sharpDiminishingReturnsWithOneParallelismScalingCoefficient > 0.4);
+
+        conf.set(OBSERVED_SCALABILITY_MIN_OBSERVATIONS, 1);
+
+        var withOneScalingHistoryRecord = new TreeMap<Instant, 
ScalingSummary>();
+
+        var withOneScalingHistoryRecordEvaluatedData1 = evaluated(4, 200, 100);
+
+        withOneScalingHistoryRecord.put(
+                currentTime, new ScalingSummary(4, 8, 
withOneScalingHistoryRecordEvaluatedData1));
+
+        double withOneScalingHistoryRecordScalingCoefficient =
+                JobVertexScaler.calculateObservedScalingCoefficient(
+                        withOneScalingHistoryRecord, conf);
+
+        assertEquals(1, withOneScalingHistoryRecordScalingCoefficient);
+
+        var diminishingReturnWithTwoScalingHistoryRecord = new 
TreeMap<Instant, ScalingSummary>();
+
+        var diminishingReturnWithTwoScalingHistoryRecordEvaluatedData1 = 
evaluated(2, 160, 80);
+        var diminishingReturnWithTwoScalingHistoryRecordEvaluatedData2 = 
evaluated(4, 200, 100);
+
+        diminishingReturnWithTwoScalingHistoryRecord.put(
+                currentTime.minusSeconds(10),
+                new ScalingSummary(
+                        2, 4, 
diminishingReturnWithTwoScalingHistoryRecordEvaluatedData1));
+        diminishingReturnWithTwoScalingHistoryRecord.put(
+                currentTime,
+                new ScalingSummary(
+                        4, 8, 
diminishingReturnWithTwoScalingHistoryRecordEvaluatedData2));
+
+        double diminishingReturnWithTwoScalingHistoryRecordScalingCoefficient =
+                JobVertexScaler.calculateObservedScalingCoefficient(
+                        diminishingReturnWithTwoScalingHistoryRecord, conf);
+
+        assertTrue(
+                diminishingReturnWithTwoScalingHistoryRecordScalingCoefficient 
< 0.9
+                        && 
diminishingReturnWithTwoScalingHistoryRecordScalingCoefficient > 0.4);
+
+        var linearReturnWithTwoScalingHistoryRecord = new TreeMap<Instant, 
ScalingSummary>();
+
+        var linearReturnWithTwoScalingHistoryRecordEvaluatedData1 = 
evaluated(2, 160, 80);
+        var linearReturnWithTwoScalingHistoryRecordEvaluatedData2 = 
evaluated(4, 320, 160);
+
+        linearReturnWithTwoScalingHistoryRecord.put(
+                currentTime.minusSeconds(10),
+                new ScalingSummary(2, 4, 
linearReturnWithTwoScalingHistoryRecordEvaluatedData1));
+        linearReturnWithTwoScalingHistoryRecord.put(
+                currentTime,
+                new ScalingSummary(4, 8, 
linearReturnWithTwoScalingHistoryRecordEvaluatedData2));
+
+        double linearReturnWithTwoScalingHistoryRecordScalingCoefficient =
+                JobVertexScaler.calculateObservedScalingCoefficient(
+                        linearReturnWithTwoScalingHistoryRecord, conf);
+
+        assertEquals(1, 
linearReturnWithTwoScalingHistoryRecordScalingCoefficient);
+    }
+
+    @ParameterizedTest
+    @MethodSource("adjustmentInputsProvider")
+    public void testParallelismScalingWithObservedScalingCoefficient(
+            Collection<ShipStrategy> inputShipStrategies) {
+        var op = new JobVertexID();
+        var delayedScaleDown = new DelayedScaleDown();
+        var currentTime = Instant.now();
+
+        conf.set(UTILIZATION_TARGET, 0.5);
+        conf.set(OBSERVED_SCALABILITY_ENABLED, true);
+
+        var linearScalingHistory = new TreeMap<Instant, ScalingSummary>();
+        var linearScalingEvaluatedData1 = evaluated(4, 100, 200);
+        var linearScalingEvaluatedData2 = evaluated(2, 400, 100);
+        var linearScalingEvaluatedData3 = evaluated(8, 800, 400);
+
+        linearScalingHistory.put(
+                currentTime.minusSeconds(20),
+                new ScalingSummary(4, 2, linearScalingEvaluatedData1));
+        linearScalingHistory.put(
+                currentTime.minusSeconds(10),
+                new ScalingSummary(2, 8, linearScalingEvaluatedData2));
+        linearScalingHistory.put(
+                currentTime, new ScalingSummary(8, 16, 
linearScalingEvaluatedData3));
+
+        assertEquals(
+                ParallelismChange.build(10, true),
+                vertexScaler.computeScaleTargetParallelism(
+                        context,
+                        op,
+                        inputShipStrategies,
+                        evaluated(2, 100, 40),
+                        linearScalingHistory,
+                        restartTime,
+                        delayedScaleDown));
+
+        var diminishingReturnsScalingHistory = new TreeMap<Instant, 
ScalingSummary>();
+        var diminishingReturnsEvaluatedData1 = evaluated(4, 80, 160);
+        var diminishingReturnsEvaluatedData2 = evaluated(2, 384, 96);
+        var diminishingReturnsEvaluatedData3 = evaluated(8, 480, 240);
+
+        diminishingReturnsScalingHistory.put(
+                currentTime.minusSeconds(20),
+                new ScalingSummary(4, 2, diminishingReturnsEvaluatedData1));
+        diminishingReturnsScalingHistory.put(
+                currentTime.minusSeconds(10),
+                new ScalingSummary(2, 8, diminishingReturnsEvaluatedData2));
+        diminishingReturnsScalingHistory.put(
+                currentTime, new ScalingSummary(8, 16, 
diminishingReturnsEvaluatedData3));
+
+        assertEquals(
+                ParallelismChange.build(15, true),
+                vertexScaler.computeScaleTargetParallelism(
+                        context,
+                        op,
+                        inputShipStrategies,
+                        evaluated(2, 100, 40),
+                        diminishingReturnsScalingHistory,
+                        restartTime,
+                        delayedScaleDown));
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
index e2b5db85..42fd56e7 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
@@ -65,6 +65,7 @@ import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.OBSERVED_SCALABILITY_COEFFICIENT_MIN;
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MAX;
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MIN;
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;
@@ -622,6 +623,7 @@ public class DefaultValidator implements 
FlinkResourceValidator {
                         UTILIZATION_MIN,
                         0.0d,
                         flinkConfiguration.get(UTILIZATION_TARGET)),
+                validateNumber(flinkConfiguration, 
OBSERVED_SCALABILITY_COEFFICIENT_MIN, 0.01d, 1d),
                 CalendarUtils.validateExcludedPeriods(flinkConfiguration));
     }
 
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
index 8caa24eb..08388b79 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
@@ -842,6 +842,21 @@ public class DefaultValidatorTest {
         assertTrue(result.isPresent());
     }
 
+    @Test
+    public void testAutoScalerDeploymentWithInvalidScalingCoefficientMin() {
+        var result =
+                testAutoScalerConfiguration(
+                        flinkConf ->
+                                flinkConf.put(
+                                        
AutoScalerOptions.OBSERVED_SCALABILITY_COEFFICIENT_MIN
+                                                .key(),
+                                        "1.2"));
+        assertErrorContains(
+                result,
+                getFormattedErrorMessage(
+                        
AutoScalerOptions.OBSERVED_SCALABILITY_COEFFICIENT_MIN, 0.01d, 1d));
+    }
+
     @Test
     public void testNonEnabledAutoScalerDeploymentJob() {
         var result =

Reply via email to