This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new d526174d [FLINK-34152] Add an option to scale memory when downscaling 
(#786)
d526174d is described below

commit d526174d9ab4c3ccf92548c821d9f44acbd3f247
Author: Maximilian Michels <[email protected]>
AuthorDate: Thu Mar 7 16:56:05 2024 +0100

    [FLINK-34152] Add an option to scale memory when downscaling (#786)
    
    This adds an option to increase heap memory when removing
    TaskManagers. The scaling is applied after adjusting the memory pools (heap,
    metaspace, network, managed) and only affects heap memory.
    
    The reason for adding this functionality is that the likelihood of running 
into
    memory constrained scenarios when downscaling is increased after applying 
memory
    tuning. As a precaution, we temporarily increase the memory up to the 
maximum
    allowed TaskManager memory.
---
 .../generated/auto_scaler_configuration.html       |   6 +
 .../apache/flink/autoscaler/ScalingExecutor.java   |   4 +-
 .../flink/autoscaler/config/AutoScalerOptions.java |   9 ++
 .../flink/autoscaler/tuning/ConfigChanges.java     |   2 +-
 .../flink/autoscaler/tuning/MemoryScaling.java     | 114 ++++++++++++++++++
 .../flink/autoscaler/tuning/MemoryTuning.java      |  26 +++--
 .../flink/autoscaler/ScalingExecutorTest.java      |   4 +-
 .../flink/autoscaler/tuning/MemoryScalingTest.java | 129 +++++++++++++++++++++
 .../flink/autoscaler/tuning/MemoryTuningTest.java  |  41 +++++--
 9 files changed, 314 insertions(+), 21 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html 
b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
index 65fb594d..5dc38861 100644
--- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
+++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
@@ -80,6 +80,12 @@
             <td>Double</td>
             <td>Overhead to add to tuning decisions (0-1). This ensures spare 
capacity and allows the memory to grow beyond the dynamically computed limits, 
but never beyond the original memory limits.</td>
         </tr>
+        <tr>
+            
<td><h5>job.autoscaler.memory.tuning.scale-down-compensation.enabled</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>If this option is enabled and memory tuning is enabled, 
TaskManager memory will be increased when scaling down. This ensures that after 
applying memory tuning there is sufficient memory when running with fewer 
TaskManagers.</td>
+        </tr>
         <tr>
             <td><h5>job.autoscaler.metrics.busy-time.aggregator</h5></td>
             <td style="word-wrap: break-word;">MAX</td>
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
index 0e714507..7acb482d 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
@@ -122,7 +122,7 @@ public class ScalingExecutor<KEY, Context extends 
JobAutoScalerContext<KEY>> {
         }
 
         var configOverrides =
-                MemoryTuning.tuneTaskManagerHeapMemory(
+                MemoryTuning.tuneTaskManagerMemory(
                         context,
                         evaluatedMetrics,
                         jobTopology,
@@ -130,7 +130,7 @@ public class ScalingExecutor<KEY, Context extends 
JobAutoScalerContext<KEY>> {
                         autoScalerEventHandler);
 
         if (scalingWouldExceedClusterResources(
-                configOverrides.applyOverrides(conf),
+                configOverrides.newConfigWithOverrides(conf),
                 evaluatedMetrics,
                 scalingSummaries,
                 context)) {
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
index 52c0e2a0..080472fd 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
@@ -258,6 +258,15 @@ public class AutoScalerOptions {
                     .withDescription(
                             "If enabled, the initial amount of memory 
specified for TaskManagers will be reduced/increased according to the observed 
needs.");
 
+    public static final ConfigOption<Boolean> MEMORY_SCALING_ENABLED =
+            autoScalerConfig("memory.tuning.scale-down-compensation.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withFallbackKeys(
+                            
oldOperatorConfigKey("memory.tuning.scale-down-compensation.enabled"))
+                    .withDescription(
+                            "If this option is enabled and memory tuning is 
enabled, TaskManager memory will be increased when scaling down. This ensures 
that after applying memory tuning there is sufficient memory when running with 
fewer TaskManagers.");
+
     public static final ConfigOption<Double> MEMORY_TUNING_OVERHEAD =
             autoScalerConfig("memory.tuning.overhead")
                     .doubleType()
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/ConfigChanges.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/ConfigChanges.java
index f76a22fe..f3e92e32 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/ConfigChanges.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/ConfigChanges.java
@@ -56,7 +56,7 @@ public class ConfigChanges {
         return this;
     }
 
-    public Configuration applyOverrides(Configuration existingConfig) {
+    public Configuration newConfigWithOverrides(Configuration existingConfig) {
         Configuration config = new Configuration(existingConfig);
         for (String key : removals) {
             config.removeKey(key);
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryScaling.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryScaling.java
new file mode 100644
index 00000000..2863bfb1
--- /dev/null
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryScaling.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.tuning;
+
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.utils.ResourceCheckUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Memory scaling ensures that memory is scaled alongside with the number of 
available TaskManagers.
+ *
+ * <p>When scaling down, TaskManagers are removed which can drastically limit 
the amount of
+ * available memory. To mitigate this issue, we keep the total cluster memory 
constant, until we can
+ * measure the actual needed memory usage.
+ *
+ * <p>When scaling up, i.e. adding more TaskManagers, we don't remove memory 
to ensure that we do
+ * not run into memory-constrained scenarios. However, MemoryTuning will still 
be applied which can
+ * result in a lower TaskManager memory baseline.
+ */
+public class MemoryScaling {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MemoryScaling.class);
+
+    /**
+     * Scales the amount of memory per TaskManager proportionally to the 
number of TaskManagers
+     * removed/added.
+     */
+    public static MemorySize applyMemoryScaling(
+            MemorySize currentMemorySize,
+            MemoryBudget memoryBudget,
+            JobAutoScalerContext<?> context,
+            Map<JobVertexID, ScalingSummary> scalingSummaries,
+            EvaluatedMetrics evaluatedMetrics) {
+
+        if 
(!context.getConfiguration().get(AutoScalerOptions.MEMORY_SCALING_ENABLED)) {
+            return currentMemorySize;
+        }
+
+        double memScalingFactor =
+                getMemoryScalingFactor(
+                        evaluatedMetrics, scalingSummaries, 
context.getConfiguration());
+
+        long additionalBytes =
+                memoryBudget.budget(
+                        (long) (memScalingFactor * 
currentMemorySize.getBytes())
+                                - currentMemorySize.getBytes());
+
+        MemorySize scaledTotalMemory =
+                new MemorySize(currentMemorySize.getBytes() + additionalBytes);
+
+        LOG.info(
+                "Scaling factor: {}, Adjusting memory from {} to {}.",
+                memScalingFactor,
+                currentMemorySize,
+                scaledTotalMemory);
+
+        return scaledTotalMemory;
+    }
+
+    /**
+     * Returns a factor for scaling the total amount of process memory when 
the number of
+     * TaskManagers change.
+     */
+    private static double getMemoryScalingFactor(
+            EvaluatedMetrics evaluatedMetrics,
+            Map<JobVertexID, ScalingSummary> scalingSummaries,
+            Configuration config) {
+        int numTaskSlotsUsed =
+                (int)
+                        evaluatedMetrics
+                                .getGlobalMetrics()
+                                .get(ScalingMetric.NUM_TASK_SLOTS_USED)
+                                .getCurrent();
+        int numTaskSlotsAfterRescale =
+                ResourceCheckUtils.estimateNumTaskSlotsAfterRescale(
+                        evaluatedMetrics.getVertexMetrics(), scalingSummaries, 
numTaskSlotsUsed);
+
+        int numTaskSlotsPerTM = config.get(TaskManagerOptions.NUM_TASK_SLOTS);
+
+        int numTMsBeforeRescale = (int) Math.ceil(numTaskSlotsUsed / (double) 
numTaskSlotsPerTM);
+        int numTMsAfterRescale =
+                (int) Math.ceil(numTaskSlotsAfterRescale / (double) 
numTaskSlotsPerTM);
+
+        // Only add memory, don't remove any
+        return Math.max(1, numTMsBeforeRescale / (double) numTMsAfterRescale);
+    }
+}
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java
index dd7eb759..ad13cd9b 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java
@@ -68,11 +68,11 @@ public class MemoryTuning {
 
     /**
      * Emits a Configuration which contains overrides for the current 
configuration. We are not
-     * modifying the config directly, but we are emitting a new configuration 
which contains any
-     * overrides. This config is persisted separately and applied by the 
autoscaler. That way we can
-     * clear any applied overrides if auto-tuning is disabled.
+     * modifying the config directly, but we are emitting ConfigChanges which 
contain any overrides
+     * or removals. This config is persisted separately and applied by the 
autoscaler. That way we
+     * can clear any applied overrides if auto-tuning is disabled.
      */
-    public static ConfigChanges tuneTaskManagerHeapMemory(
+    public static ConfigChanges tuneTaskManagerMemory(
             JobAutoScalerContext<?> context,
             EvaluatedMetrics evaluatedMetrics,
             JobTopology jobTopology,
@@ -108,8 +108,9 @@ public class MemoryTuning {
             LOG.warn("Spec TaskManager memory size could not be determined.");
             return EMPTY_CONFIG;
         }
+
         MemoryBudget memBudget = new MemoryBudget(maxMemoryBySpec.getBytes());
-        // Add these current settings from the budget
+        // Budget the original spec's memory settings which we do not modify
         
memBudget.budget(memSpecs.getFlinkMemory().getFrameworkOffHeap().getBytes());
         
memBudget.budget(memSpecs.getFlinkMemory().getTaskOffHeap().getBytes());
         memBudget.budget(memSpecs.getJvmOverheadSize().getBytes());
@@ -134,6 +135,10 @@ public class MemoryTuning {
                         specManagedSize,
                         config,
                         memBudget);
+        // Rescale heap according to scaling decision after distributing all 
memory pools
+        newHeapSize =
+                MemoryScaling.applyMemoryScaling(
+                        newHeapSize, memBudget, context, scalingSummaries, 
evaluatedMetrics);
         LOG.info(
                 "Optimized memory sizes: heap: {} managed: {}, network: {}, 
meta: {}",
                 newHeapSize.toHumanReadableString(),
@@ -165,6 +170,7 @@ public class MemoryTuning {
         // memory pools, there are no fractional variants for heap memory. 
Setting the absolute heap
         // memory options could cause invalid configuration states when users 
adapt the total amount
         // of memory. We also need to take care to remove any user-provided 
overrides for those.
+        tuningConfig.addRemoval(TaskManagerOptions.TOTAL_FLINK_MEMORY);
         tuningConfig.addRemoval(TaskManagerOptions.TASK_HEAP_MEMORY);
         // Set default to zero because we already account for heap via task 
heap.
         tuningConfig.addOverride(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY, 
MemorySize.ZERO);
@@ -233,7 +239,8 @@ public class MemoryTuning {
             long maxManagedMemorySize = memBudget.budget(Long.MAX_VALUE);
             return new MemorySize(maxManagedMemorySize);
         } else {
-            return managedMemoryConfigured;
+            long managedMemorySize = 
memBudget.budget(managedMemoryConfigured.getBytes());
+            return new MemorySize(managedMemorySize);
         }
     }
 
@@ -322,9 +329,10 @@ public class MemoryTuning {
 
     private static MemorySize getUsage(
             ScalingMetric scalingMetric, Map<ScalingMetric, 
EvaluatedScalingMetric> globalMetrics) {
-        MemorySize heapUsed = new MemorySize((long) 
globalMetrics.get(scalingMetric).getAverage());
-        LOG.debug("{}: {}", scalingMetric, heapUsed);
-        return heapUsed;
+        MemorySize memoryUsed =
+                new MemorySize((long) 
globalMetrics.get(scalingMetric).getAverage());
+        LOG.debug("{}: {}", scalingMetric, memoryUsed);
+        return memoryUsed;
     }
 
     public static MemorySize getTotalMemory(Configuration config, 
JobAutoScalerContext<?> ctx) {
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
index 764e312f..7c8ead4e 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
@@ -362,11 +362,11 @@ public class ScalingExecutorTest {
                                 TaskManagerOptions.JVM_METASPACE.key(),
                                 "360 mb",
                                 TaskManagerOptions.JVM_OVERHEAD_FRACTION.key(),
-                                "0.134",
+                                "0.053",
                                 TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key(),
                                 "0 bytes",
                                 TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(),
-                                "7864064 kb"));
+                                "20400832696 bytes"));
     }
 
     @ParameterizedTest
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/tuning/MemoryScalingTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/tuning/MemoryScalingTest.java
new file mode 100644
index 00000000..1a07ea44
--- /dev/null
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/tuning/MemoryScalingTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.tuning;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.TestingAutoscalerUtils;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class MemoryScalingTest {
+
+    JobAutoScalerContext<JobID> context = 
TestingAutoscalerUtils.createResourceAwareContext();
+
+    @BeforeEach
+    void setup() {
+        
context.getConfiguration().set(AutoScalerOptions.MEMORY_TUNING_ENABLED, true);
+    }
+
+    @Test
+    void testMemoryScalingDownscaling() {
+        int currentParallelism = 20;
+        int rescaleParallelism = 10;
+        MemorySize currentMemory = MemorySize.parse("10 gb");
+        MemoryBudget memoryBudget = new MemoryBudget(MemorySize.parse("30 
gb").getBytes());
+
+        assertThat(
+                        runMemoryScaling(
+                                currentParallelism,
+                                rescaleParallelism,
+                                context,
+                                currentMemory,
+                                memoryBudget))
+                .isEqualTo(MemorySize.parse("20 gb"));
+    }
+
+    @Test
+    void testMemoryScalingUpscaling() {
+        int currentParallelism = 10;
+        int rescaleParallelism = 20;
+        MemorySize currentMemory = MemorySize.parse("10 gb");
+        MemoryBudget memoryBudget = new MemoryBudget(MemorySize.parse("30 
gb").getBytes());
+
+        assertThat(
+                        runMemoryScaling(
+                                currentParallelism,
+                                rescaleParallelism,
+                                context,
+                                currentMemory,
+                                memoryBudget))
+                .isEqualTo(MemorySize.parse("10 gb"));
+    }
+
+    @Test
+    void testMemoryScalingDisabled() {
+        
context.getConfiguration().set(AutoScalerOptions.MEMORY_SCALING_ENABLED, false);
+        MemorySize currentMemory = MemorySize.parse("10 gb");
+        MemoryBudget memoryBudget = new MemoryBudget(MemorySize.parse("30 
gb").getBytes());
+
+        assertThat(
+                        MemoryScaling.applyMemoryScaling(
+                                currentMemory, memoryBudget, context, 
Map.of(), null))
+                .isEqualTo(currentMemory);
+    }
+
+    private static MemorySize runMemoryScaling(
+            int currentParallelism,
+            int rescaleParallelism,
+            JobAutoScalerContext<JobID> context,
+            MemorySize currentMemory,
+            MemoryBudget memoryBudget) {
+        var globalMetrics =
+                Map.of(
+                        ScalingMetric.NUM_TASK_SLOTS_USED,
+                        EvaluatedScalingMetric.of(currentParallelism));
+        var jobVertex1 = new JobVertexID();
+        var jobVertex2 = new JobVertexID();
+        var vertexMetrics =
+                Map.of(
+                        jobVertex1,
+                                Map.of(
+                                        ScalingMetric.PARALLELISM,
+                                        
EvaluatedScalingMetric.of(currentParallelism)),
+                        jobVertex2,
+                                Map.of(
+                                        ScalingMetric.PARALLELISM,
+                                        
EvaluatedScalingMetric.of(currentParallelism)));
+        var metrics = new EvaluatedMetrics(vertexMetrics, globalMetrics);
+
+        Map<JobVertexID, ScalingSummary> scalingSummaries =
+                Map.of(
+                        jobVertex1,
+                                new ScalingSummary(
+                                        currentParallelism, 
rescaleParallelism, Map.of()),
+                        jobVertex2,
+                                new ScalingSummary(
+                                        currentParallelism, 
rescaleParallelism, Map.of()));
+
+        return MemoryScaling.applyMemoryScaling(
+                currentMemory, memoryBudget, context, scalingSummaries, 
metrics);
+    }
+}
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/tuning/MemoryTuningTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/tuning/MemoryTuningTest.java
index 77b0e19c..18f6e9c3 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/tuning/MemoryTuningTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/tuning/MemoryTuningTest.java
@@ -55,6 +55,7 @@ public class MemoryTuningTest {
         var context = TestingAutoscalerUtils.createResourceAwareContext();
         var config = context.getConfiguration();
         config.set(AutoScalerOptions.MEMORY_TUNING_ENABLED, true);
+        config.set(AutoScalerOptions.MEMORY_SCALING_ENABLED, false);
         config.set(TaskManagerOptions.NUM_TASK_SLOTS, 5);
         config.set(AutoScalerOptions.SCALING_EVENT_INTERVAL, Duration.ZERO);
         MemorySize totalMemory = MemorySize.parse("30 gb");
@@ -82,7 +83,9 @@ public class MemoryTuningTest {
                         ScalingMetric.MANAGED_MEMORY_USED,
                         
EvaluatedScalingMetric.avg(MemorySize.ofMebiBytes(10000).getBytes()),
                         ScalingMetric.METASPACE_MEMORY_USED,
-                        
EvaluatedScalingMetric.avg(MemorySize.ofMebiBytes(100).getBytes()));
+                        
EvaluatedScalingMetric.avg(MemorySize.ofMebiBytes(100).getBytes()),
+                        ScalingMetric.NUM_TASK_SLOTS_USED,
+                        EvaluatedScalingMetric.of(50));
 
         var metrics = new EvaluatedMetrics(vertexMetrics, globalMetrics);
 
@@ -98,7 +101,7 @@ public class MemoryTuningTest {
                         jobVertex2, new ScalingSummary(50, 10, Map.of()));
 
         ConfigChanges configChanges =
-                MemoryTuning.tuneTaskManagerHeapMemory(
+                MemoryTuning.tuneTaskManagerMemory(
                         context, metrics, jobTopology, scalingSummaries, 
eventHandler);
         // Test reducing overall memory
         assertThat(configChanges.getOverrides())
@@ -113,14 +116,15 @@ public class MemoryTuningTest {
                                 TaskManagerOptions.JVM_METASPACE.key(),
                                 "120 mb",
                                 TaskManagerOptions.JVM_OVERHEAD_FRACTION.key(),
-                                "0.139",
+                                "0.054",
                                 TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key(),
                                 "0 bytes",
                                 TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(),
-                                "7760130867 bytes"));
+                                "20108162027 bytes"));
 
         assertThat(configChanges.getRemovals())
                 .containsExactlyInAnyOrder(
+                        TaskManagerOptions.TOTAL_FLINK_MEMORY.key(),
                         TaskManagerOptions.TASK_HEAP_MEMORY.key(),
                         TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
                         TaskManagerOptions.MANAGED_MEMORY_SIZE
@@ -136,7 +140,7 @@ public class MemoryTuningTest {
         // Test maximize managed memory
         config.set(AutoScalerOptions.MEMORY_TUNING_MAXIMIZE_MANAGED_MEMORY, 
true);
         configChanges =
-                MemoryTuning.tuneTaskManagerHeapMemory(
+                MemoryTuning.tuneTaskManagerMemory(
                         context, metrics, jobTopology, scalingSummaries, 
eventHandler);
         assertThat(configChanges.getOverrides())
                 .containsExactlyInAnyOrderEntriesOf(
@@ -161,7 +165,7 @@ public class MemoryTuningTest {
         metrics.getGlobalMetrics()
                 .put(ScalingMetric.MANAGED_MEMORY_USED, 
EvaluatedScalingMetric.avg(0));
         configChanges =
-                MemoryTuning.tuneTaskManagerHeapMemory(
+                MemoryTuning.tuneTaskManagerMemory(
                         context, metrics, jobTopology, scalingSummaries, 
eventHandler);
         assertThat(configChanges.getOverrides())
                 .containsExactlyInAnyOrderEntriesOf(
@@ -181,10 +185,33 @@ public class MemoryTuningTest {
                                 TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(),
                                 "7760130867 bytes"));
 
+        // Test integration with MemoryScaling
+        config.set(AutoScalerOptions.MEMORY_SCALING_ENABLED, true);
+        configChanges =
+                MemoryTuning.tuneTaskManagerMemory(
+                        context, metrics, jobTopology, scalingSummaries, 
eventHandler);
+        assertThat(configChanges.getOverrides())
+                .containsExactlyInAnyOrderEntriesOf(
+                        Map.of(
+                                
TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(),
+                                "0.0",
+                                TaskManagerOptions.NETWORK_MEMORY_MIN.key(),
+                                "13760 kb",
+                                TaskManagerOptions.NETWORK_MEMORY_MAX.key(),
+                                "13760 kb",
+                                TaskManagerOptions.JVM_METASPACE.key(),
+                                "120 mb",
+                                TaskManagerOptions.JVM_OVERHEAD_FRACTION.key(),
+                                "0.076",
+                                TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key(),
+                                "0 bytes",
+                                TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(),
+                                "14172382822 bytes"));
+
         // Test tuning disabled
         config.set(AutoScalerOptions.MEMORY_TUNING_ENABLED, false);
         assertThat(
-                        MemoryTuning.tuneTaskManagerHeapMemory(
+                        MemoryTuning.tuneTaskManagerMemory(
                                         context,
                                         metrics,
                                         jobTopology,

Reply via email to