This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 3ead906f [FLINK-34152] Tune all directly observable memory types 
(except network memory) (#778)
3ead906f is described below

commit 3ead906f33a4b3790fa5c12f2018e09db9443a09
Author: Maximilian Michels <[email protected]>
AuthorDate: Thu Feb 22 13:42:40 2024 +0100

    [FLINK-34152] Tune all directly observable memory types (except network 
memory) (#778)
---
 .../generated/auto_scaler_configuration.html       |  26 +--
 .../jdbc/state/JdbcAutoScalerStateStore.java       |  33 ++-
 .../realizer/RescaleApiScalingRealizer.java        |   5 +-
 .../apache/flink/autoscaler/JobAutoScalerImpl.java |   8 +-
 .../flink/autoscaler/RestApiMetricsCollector.java  |  22 +-
 .../apache/flink/autoscaler/ScalingExecutor.java   |  13 +-
 .../flink/autoscaler/ScalingMetricEvaluator.java   |  30 ++-
 .../flink/autoscaler/config/AutoScalerOptions.java |  37 +---
 .../flink/autoscaler/metrics/FlinkMetric.java      |   8 +-
 .../flink/autoscaler/metrics/ScalingMetric.java    |  10 +-
 .../flink/autoscaler/metrics/ScalingMetrics.java   |  29 ++-
 .../flink/autoscaler/realizer/ScalingRealizer.java |   4 +-
 .../autoscaler/state/AutoScalerStateStore.java     |   8 +-
 .../state/InMemoryAutoScalerStateStore.java        |  17 +-
 .../flink/autoscaler/tuning/ConfigChanges.java     |  67 ++++++
 .../flink/autoscaler/tuning/MemoryBudget.java      |  42 ++++
 .../flink/autoscaler/tuning/MemoryTuning.java      | 235 ++++++++++++---------
 .../flink/autoscaler/JobAutoScalerImplTest.java    |  34 ++-
 .../autoscaler/RestApiMetricsCollectorTest.java    |  24 ++-
 .../flink/autoscaler/ScalingExecutorTest.java      |  27 ++-
 .../autoscaler/ScalingMetricEvaluatorTest.java     |  32 ++-
 .../autoscaler/metrics/ScalingMetricsTest.java     |  37 ++--
 .../realizer/TestingScalingRealizer.java           |  14 +-
 .../state/AbstractAutoScalerStateStoreTest.java    |  12 +-
 .../flink/autoscaler/utils/MemoryTuningTest.java   |  97 ++++++---
 .../autoscaler/KubernetesScalingRealizer.java      |  12 +-
 .../state/KubernetesAutoScalerStateStore.java      |  34 ++-
 .../autoscaler/KubernetesScalingRealizerTest.java  |   6 +-
 28 files changed, 582 insertions(+), 341 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html 
b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
index ee6fb731..65fb594d 100644
--- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
+++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
@@ -66,31 +66,19 @@
             <td><h5>job.autoscaler.memory.tuning.enabled</h5></td>
             <td style="word-wrap: break-word;">false</td>
             <td>Boolean</td>
-            <td>If enabled, the initial amount of memory specified for 
TaskManagers will be reduced according to the observed needs.</td>
+            <td>If enabled, the initial amount of memory specified for 
TaskManagers will be reduced/increased according to the observed needs.</td>
         </tr>
         <tr>
-            <td><h5>job.autoscaler.memory.tuning.heap.min</h5></td>
-            <td style="word-wrap: break-word;">512 mb</td>
-            <td>MemorySize</td>
-            <td>The minimum amount of TaskManager heap memory, if memory 
tuning is enabled.</td>
+            
<td><h5>job.autoscaler.memory.tuning.maximize-managed-memory</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>If enabled and managed memory is used (e.g. RocksDB turned 
on), any reduction of heap, network, or metaspace memory will increase the 
managed memory.</td>
         </tr>
         <tr>
-            <td><h5>job.autoscaler.memory.tuning.heap.overhead</h5></td>
+            <td><h5>job.autoscaler.memory.tuning.overhead</h5></td>
             <td style="word-wrap: break-word;">0.2</td>
             <td>Double</td>
-            <td>Overhead to add to heap tuning decisions (0-1). This ensures 
spare capacity and allows the memory to grow beyond the dynamically computed 
limits, but never beyond the original memory limits.</td>
-        </tr>
-        <tr>
-            <td><h5>job.autoscaler.memory.tuning.heap.target-usage</h5></td>
-            <td style="word-wrap: break-word;">MAX</td>
-            <td><p>Enum</p></td>
-            <td>The heap size to target. Average usage (AVG) will yield better 
savings. Max usage will yield more conservative savings.<br /><br />Possible 
values:<ul><li>"AVG"</li><li>"MAX"</li></ul></td>
-        </tr>
-        <tr>
-            
<td><h5>job.autoscaler.memory.tuning.heap.transfer-to-managed</h5></td>
-            <td style="word-wrap: break-word;">false</td>
-            <td>Boolean</td>
-            <td>If enabled, any reduction of heap memory will increase the 
managed memory used by RocksDB.</td>
+            <td>Overhead to add to tuning decisions (0-1). This ensures spare 
capacity and allows the memory to grow beyond the dynamically computed limits, 
but never beyond the original memory limits.</td>
         </tr>
         <tr>
             <td><h5>job.autoscaler.metrics.busy-time.aggregator</h5></td>
diff --git 
a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java
 
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java
index 3f2af617..1224e0d4 100644
--- 
a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java
+++ 
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java
@@ -24,8 +24,8 @@ import org.apache.flink.autoscaler.ScalingSummary;
 import org.apache.flink.autoscaler.ScalingTracking;
 import org.apache.flink.autoscaler.metrics.CollectedMetrics;
 import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.autoscaler.tuning.ConfigChanges;
 import org.apache.flink.autoscaler.utils.AutoScalerSerDeModule;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
@@ -38,6 +38,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.time.Instant;
 import java.util.HashMap;
@@ -192,24 +193,24 @@ public class JdbcAutoScalerStateStore<KEY, Context 
extends JobAutoScalerContext<
     }
 
     @Override
-    public void storeConfigOverrides(Context jobContext, Configuration 
configOverrides) {
+    public void storeConfigChanges(Context jobContext, ConfigChanges 
configChanges) {
         jdbcStateStore.putSerializedState(
                 getSerializeKey(jobContext),
                 CONFIG_OVERRIDES,
-                serializeConfigOverrides(configOverrides));
+                serializeConfigOverrides(configChanges));
     }
 
     @Nonnull
     @Override
-    public Configuration getConfigOverrides(Context jobContext) {
+    public ConfigChanges getConfigChanges(Context jobContext) {
         return jdbcStateStore
                 .getSerializedState(getSerializeKey(jobContext), 
CONFIG_OVERRIDES)
                 .map(JdbcAutoScalerStateStore::deserializeConfigOverrides)
-                .orElse(new Configuration());
+                .orElse(new ConfigChanges());
     }
 
     @Override
-    public void removeConfigOverrides(Context jobContext) {
+    public void removeConfigChanges(Context jobContext) {
         jdbcStateStore.removeSerializedState(getSerializeKey(jobContext), 
CONFIG_OVERRIDES);
     }
 
@@ -276,11 +277,23 @@ public class JdbcAutoScalerStateStore<KEY, Context 
extends JobAutoScalerContext<
         return ConfigurationUtils.convertValue(overrides, Map.class);
     }
 
-    private static String serializeConfigOverrides(Configuration overrides) {
-        return ConfigurationUtils.convertValue(overrides.toMap(), 
String.class);
+    @Nullable
+    private static String serializeConfigOverrides(ConfigChanges 
configChanges) {
+        try {
+            return YAML_MAPPER.writeValueAsString(configChanges);
+        } catch (Exception e) {
+            LOG.error("Failed to serialize ConfigOverrides", e);
+            return null;
+        }
     }
 
-    private static Configuration deserializeConfigOverrides(String overrides) {
-        return 
Configuration.fromMap(ConfigurationUtils.convertValue(overrides, Map.class));
+    @Nullable
+    private static ConfigChanges deserializeConfigOverrides(String 
configOverrides) {
+        try {
+            return YAML_MAPPER.readValue(configOverrides, new 
TypeReference<>() {});
+        } catch (Exception e) {
+            LOG.error("Failed to deserialize ConfigOverrides", e);
+            return null;
+        }
     }
 }
diff --git 
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizer.java
 
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizer.java
index 8cba81d5..abf0f95b 100644
--- 
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizer.java
+++ 
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizer.java
@@ -24,6 +24,7 @@ import org.apache.flink.autoscaler.JobAutoScalerContext;
 import org.apache.flink.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
 import org.apache.flink.autoscaler.realizer.ScalingRealizer;
+import org.apache.flink.autoscaler.tuning.ConfigChanges;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
@@ -126,12 +127,12 @@ public class RescaleApiScalingRealizer<KEY, Context 
extends JobAutoScalerContext
     }
 
     @Override
-    public void realizeConfigOverrides(Context context, Configuration 
configOverrides) {
+    public void realizeConfigOverrides(Context context, ConfigChanges 
configChanges) {
         // Not currently supported
         LOG.warn(
                 "{} does not support updating the TaskManager configuration 
({})",
                 getClass().getSimpleName(),
-                configOverrides);
+                configChanges);
     }
 
     private Map<JobVertexID, JobVertexResourceRequirements> getVertexResources(
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
index 6f3d4937..5bb7b791 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
@@ -26,7 +26,7 @@ import 
org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics;
 import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
 import org.apache.flink.autoscaler.realizer.ScalingRealizer;
 import org.apache.flink.autoscaler.state.AutoScalerStateStore;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.autoscaler.tuning.ConfigChanges;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.util.Preconditions;
 
@@ -167,9 +167,9 @@ public class JobAutoScalerImpl<KEY, Context extends 
JobAutoScalerContext<KEY>>
             return;
         }
 
-        Configuration configOverrides = stateStore.getConfigOverrides(ctx);
-        LOG.info("Applying config overrides: {}", configOverrides);
-        scalingRealizer.realizeConfigOverrides(ctx, configOverrides);
+        ConfigChanges configChanges = stateStore.getConfigChanges(ctx);
+        LOG.debug("Applying config overrides: {}", configChanges);
+        scalingRealizer.realizeConfigOverrides(ctx, configChanges);
     }
 
     private void runScalingLogic(Context ctx, AutoscalerFlinkMetrics 
autoscalerMetrics)
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java
index a5b2cf02..09b9bb65 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java
@@ -35,6 +35,8 @@ import 
org.apache.flink.runtime.rest.messages.job.metrics.Metric;
 import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricsAggregationParameter;
 import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
 
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
+
 import lombok.SneakyThrows;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
@@ -44,8 +46,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import static org.apache.flink.autoscaler.metrics.FlinkMetric.HEAP_MAX;
-import static org.apache.flink.autoscaler.metrics.FlinkMetric.HEAP_USED;
+import static org.apache.flink.autoscaler.metrics.FlinkMetric.HEAP_MEMORY_MAX;
+import static org.apache.flink.autoscaler.metrics.FlinkMetric.HEAP_MEMORY_USED;
+import static 
org.apache.flink.autoscaler.metrics.FlinkMetric.MANAGED_MEMORY_USED;
+import static 
org.apache.flink.autoscaler.metrics.FlinkMetric.METASPACE_MEMORY_USED;
 import static 
org.apache.flink.autoscaler.metrics.FlinkMetric.TOTAL_GC_TIME_PER_SEC;
 
 /** Metric collector using flink rest api. */
@@ -55,13 +59,15 @@ public class RestApiMetricsCollector<KEY, Context extends 
JobAutoScalerContext<K
 
     private static final Map<String, FlinkMetric> COMMON_TM_METRIC_NAMES =
             Map.of(
-                    "Status.JVM.Memory.Heap.Max", HEAP_MAX,
-                    "Status.JVM.Memory.Heap.Used", HEAP_USED);
+                    "Status.JVM.Memory.Heap.Max", HEAP_MEMORY_MAX,
+                    "Status.JVM.Memory.Heap.Used", HEAP_MEMORY_USED,
+                    "Status.Flink.Memory.Managed.Used", MANAGED_MEMORY_USED,
+                    "Status.JVM.Memory.Metaspace.Used", METASPACE_MEMORY_USED);
     private static final Map<String, FlinkMetric> TM_METRIC_NAMES_WITH_GC =
-            Map.of(
-                    "Status.JVM.Memory.Heap.Max", HEAP_MAX,
-                    "Status.JVM.Memory.Heap.Used", HEAP_USED,
-                    "Status.JVM.GarbageCollector.All.TimeMsPerSecond", 
TOTAL_GC_TIME_PER_SEC);
+            ImmutableMap.<String, FlinkMetric>builder()
+                    .putAll(COMMON_TM_METRIC_NAMES)
+                    .put("Status.JVM.GarbageCollector.All.TimeMsPerSecond", 
TOTAL_GC_TIME_PER_SEC)
+                    .build();
 
     @Override
     protected Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> 
queryAllAggregatedMetrics(
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
index fdbb3c17..4ff9ae36 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
@@ -118,12 +118,15 @@ public class ScalingExecutor<KEY, Context extends 
JobAutoScalerContext<KEY>> {
             return false;
         }
 
-        var tuningConfig =
+        var configOverrides =
                 MemoryTuning.tuneTaskManagerHeapMemory(
                         context, evaluatedMetrics, autoScalerEventHandler);
 
         if (scalingWouldExceedClusterResources(
-                tuningConfig, evaluatedMetrics, scalingSummaries, context)) {
+                configOverrides.applyOverrides(conf),
+                evaluatedMetrics,
+                scalingSummaries,
+                context)) {
             return false;
         }
 
@@ -138,7 +141,7 @@ public class ScalingExecutor<KEY, Context extends 
JobAutoScalerContext<KEY>> {
                 getVertexParallelismOverrides(
                         evaluatedMetrics.getVertexMetrics(), 
scalingSummaries));
 
-        autoScalerStateStore.storeConfigOverrides(context, tuningConfig);
+        autoScalerStateStore.storeConfigChanges(context, configOverrides);
 
         return true;
     }
@@ -267,13 +270,13 @@ public class ScalingExecutor<KEY, Context extends 
JobAutoScalerContext<KEY>> {
     }
 
     private boolean scalingWouldExceedClusterResources(
-            Configuration tuningConfig,
+            Configuration tunedConfig,
             EvaluatedMetrics evaluatedMetrics,
             Map<JobVertexID, ScalingSummary> scalingSummaries,
             JobAutoScalerContext<?> ctx) {
 
         final double taskManagerCpu = ctx.getTaskManagerCpu().orElse(0.);
-        final MemorySize taskManagerMemory = 
MemoryTuning.getTotalMemory(tuningConfig, ctx);
+        final MemorySize taskManagerMemory = 
MemoryTuning.getTotalMemory(tunedConfig, ctx);
 
         if (taskManagerCpu <= 0 || 
taskManagerMemory.compareTo(MemorySize.ZERO) <= 0) {
             // We can't extract the requirements, we can't make any assumptions
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
index b2a79092..6512758b 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
@@ -49,10 +49,12 @@ import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZ
 import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
 import static org.apache.flink.autoscaler.metrics.ScalingMetric.GC_PRESSURE;
 import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.HEAP_MAX_USAGE_RATIO;
-import static org.apache.flink.autoscaler.metrics.ScalingMetric.HEAP_USED;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.HEAP_MEMORY_USED;
 import static org.apache.flink.autoscaler.metrics.ScalingMetric.LAG;
 import static org.apache.flink.autoscaler.metrics.ScalingMetric.LOAD;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.MANAGED_MEMORY_USED;
 import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.METASPACE_MEMORY_USED;
 import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.NUM_TASK_SLOTS_USED;
 import static org.apache.flink.autoscaler.metrics.ScalingMetric.OBSERVED_TPR;
 import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
@@ -365,18 +367,12 @@ public class ScalingMetricEvaluator {
         var out = new HashMap<ScalingMetric, EvaluatedScalingMetric>();
 
         var gcPressure = latest.getOrDefault(GC_PRESSURE, Double.NaN);
-        var lastHeapUsage = latest.getOrDefault(HEAP_MAX_USAGE_RATIO, 
Double.NaN);
-
         out.put(GC_PRESSURE, EvaluatedScalingMetric.of(gcPressure));
-        out.put(
-                HEAP_MAX_USAGE_RATIO,
-                new EvaluatedScalingMetric(
-                        lastHeapUsage,
-                        getAverageGlobalMetric(HEAP_MAX_USAGE_RATIO, 
metricHistory)));
 
-        var latestObservation = latest.getOrDefault(HEAP_USED, Double.NaN);
-        double heapSizeAverage = getAverageGlobalMetric(HEAP_USED, 
metricHistory);
-        out.put(HEAP_USED, new EvaluatedScalingMetric(latestObservation, 
heapSizeAverage));
+        populateMetric(HEAP_MAX_USAGE_RATIO, metricHistory, out);
+        populateMetric(HEAP_MEMORY_USED, metricHistory, out);
+        populateMetric(MANAGED_MEMORY_USED, metricHistory, out);
+        populateMetric(METASPACE_MEMORY_USED, metricHistory, out);
 
         out.put(
                 NUM_TASK_SLOTS_USED,
@@ -385,6 +381,18 @@ public class ScalingMetricEvaluator {
         return out;
     }
 
+    private static void populateMetric(
+            ScalingMetric scalingMetric,
+            SortedMap<Instant, CollectedMetrics> metricHistory,
+            Map<ScalingMetric, EvaluatedScalingMetric> out) {
+        var latestMetrics = 
metricHistory.get(metricHistory.lastKey()).getGlobalMetrics();
+
+        var latestObservation = latestMetrics.getOrDefault(scalingMetric, 
Double.NaN);
+        double value = getAverageGlobalMetric(scalingMetric, metricHistory);
+
+        out.put(scalingMetric, new EvaluatedScalingMetric(latestObservation, 
value));
+    }
+
     private static double getAverageGlobalMetric(
             ScalingMetric metric, SortedMap<Instant, CollectedMetrics> 
metricsHistory) {
         return getAverage(metric, null, metricsHistory);
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
index 58bdfce0..52c0e2a0 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
@@ -18,10 +18,8 @@
 package org.apache.flink.autoscaler.config;
 
 import org.apache.flink.autoscaler.metrics.MetricAggregator;
-import org.apache.flink.autoscaler.tuning.MemoryTuning;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.configuration.MemorySize;
 
 import java.time.Duration;
 import java.util.List;
@@ -258,40 +256,23 @@ public class AutoScalerOptions {
                     .defaultValue(false)
                     
.withFallbackKeys(oldOperatorConfigKey("memory.tuning.enabled"))
                     .withDescription(
-                            "If enabled, the initial amount of memory 
specified for TaskManagers will be reduced according to the observed needs.");
+                            "If enabled, the initial amount of memory 
specified for TaskManagers will be reduced/increased according to the observed 
needs.");
 
-    public static final ConfigOption<MemoryTuning.HeapUsageTarget> 
MEMORY_TUNING_HEAP_TARGET =
-            autoScalerConfig("memory.tuning.heap.target-usage")
-                    .enumType(MemoryTuning.HeapUsageTarget.class)
-                    .defaultValue(MemoryTuning.HeapUsageTarget.MAX)
-                    
.withFallbackKeys(oldOperatorConfigKey("memory.tuning.heap.target-usage"))
-                    .withDescription(
-                            "The heap size to target. Average usage (AVG) will 
yield better savings. Max usage will yield more conservative savings.");
-
-    public static final ConfigOption<Double> MEMORY_TUNING_HEAP_OVERHEAD =
-            autoScalerConfig("memory.tuning.heap.overhead")
+    public static final ConfigOption<Double> MEMORY_TUNING_OVERHEAD =
+            autoScalerConfig("memory.tuning.overhead")
                     .doubleType()
                     .defaultValue(0.2)
-                    
.withFallbackKeys(oldOperatorConfigKey("memory.tuning.heap.overhead"))
+                    
.withFallbackKeys(oldOperatorConfigKey("memory.tuning.overhead"))
                     .withDescription(
-                            "Overhead to add to heap tuning decisions (0-1). 
This ensures spare capacity and allows the memory to grow beyond the 
dynamically computed limits, but never beyond the original memory limits.");
+                            "Overhead to add to tuning decisions (0-1). This 
ensures spare capacity and allows the memory to grow beyond the dynamically 
computed limits, but never beyond the original memory limits.");
 
-    public static final ConfigOption<MemorySize> MEMORY_TUNING_MIN_HEAP =
-            autoScalerConfig("memory.tuning.heap.min")
-                    .memoryType()
-                    .defaultValue(MemorySize.ofMebiBytes(512L))
-                    
.withFallbackKeys(oldOperatorConfigKey("memory.tuning.heap.min"))
-                    .withDescription(
-                            "The minimum amount of TaskManager heap memory, if 
memory tuning is enabled.");
-
-    public static final ConfigOption<Boolean> 
MEMORY_TUNING_TRANSFER_HEAP_TO_MANAGED =
-            autoScalerConfig("memory.tuning.heap.transfer-to-managed")
+    public static final ConfigOption<Boolean> 
MEMORY_TUNING_MAXIMIZE_MANAGED_MEMORY =
+            autoScalerConfig("memory.tuning.maximize-managed-memory")
                     .booleanType()
                     .defaultValue(false)
-                    .withFallbackKeys(
-                            
oldOperatorConfigKey("memory.tuning.heap.transfer-to-managed"))
+                    
.withFallbackKeys(oldOperatorConfigKey("memory.tuning.maximize-managed-memory"))
                     .withDescription(
-                            "If enabled, any reduction of heap memory will 
increase the managed memory used by RocksDB.");
+                            "If enabled and managed memory is used (e.g. 
RocksDB turned on), any reduction of heap, network, or metaspace memory will 
increase the managed memory.");
 
     public static final ConfigOption<Integer> VERTEX_SCALING_HISTORY_COUNT =
             autoScalerConfig("history.max.count")
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
index 5cf84c91..32abd946 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
@@ -41,9 +41,13 @@ public enum FlinkMetric {
     PENDING_RECORDS(s -> s.endsWith(".pendingRecords")),
     BACKPRESSURE_TIME_PER_SEC(s -> s.equals("backPressuredTimeMsPerSecond")),
 
-    HEAP_MAX(s -> s.equals("Status.JVM.Memory.Heap.Max")),
-    HEAP_USED(s -> s.equals("Status.JVM.Memory.Heap.Used")),
+    HEAP_MEMORY_MAX(s -> s.equals("Status.JVM.Memory.Heap.Max")),
+    HEAP_MEMORY_USED(s -> s.equals("Status.JVM.Memory.Heap.Used")),
+    MANAGED_MEMORY_USED(s -> s.equals("Status.Flink.Memory.Managed.Used")),
+    METASPACE_MEMORY_USED(s -> s.equals("Status.JVM.Memory.Metaspace.Used")),
+
     TOTAL_GC_TIME_PER_SEC(s -> 
s.equals("Status.JVM.GarbageCollector.All.TimeMsPerSecond")),
+
     NUM_TASK_SLOTS_TOTAL(s -> s.equals("taskSlotsTotal")),
     NUM_TASK_SLOTS_AVAILABLE(s -> s.equals("taskSlotsAvailable"));
 
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
index 4f9e3083..5466f691 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
@@ -74,8 +74,14 @@ public enum ScalingMetric {
      */
     GC_PRESSURE(false),
 
-    /** Measured used heap size in bytes. */
-    HEAP_USED(true),
+    /** Measured max used heap size in bytes. */
+    HEAP_MEMORY_USED(true),
+
+    /** Measured max managed memory size in bytes. */
+    MANAGED_MEMORY_USED(true),
+
+    /** Measured max metaspace memory size in bytes. */
+    METASPACE_MEMORY_USED(true),
 
     /** Percentage of max heap used (between 0 and 1). */
     HEAP_MAX_USAGE_RATIO(true),
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
index 0fbcc64b..84996235 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
@@ -20,7 +20,6 @@ package org.apache.flink.autoscaler.metrics;
 import org.apache.flink.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.autoscaler.topology.IOMetrics;
 import org.apache.flink.autoscaler.topology.JobTopology;
-import org.apache.flink.autoscaler.tuning.MemoryTuning;
 import org.apache.flink.autoscaler.utils.AutoScalerUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -170,24 +169,24 @@ public class ScalingMetrics {
             out.put(ScalingMetric.GC_PRESSURE, gcTime.getMax() / 1000);
         }
 
-        var heapMax = collectedTmMetrics.get(FlinkMetric.HEAP_MAX);
-        var heapUsed = collectedTmMetrics.get(FlinkMetric.HEAP_USED);
+        var heapMax = collectedTmMetrics.get(FlinkMetric.HEAP_MEMORY_MAX);
+        var heapUsed = collectedTmMetrics.get(FlinkMetric.HEAP_MEMORY_USED);
+
         if (heapMax != null && heapUsed != null) {
-            MemoryTuning.HeapUsageTarget heapTarget =
-                    conf.get(AutoScalerOptions.MEMORY_TUNING_HEAP_TARGET);
-            switch (heapTarget) {
-                case AVG:
-                    out.put(ScalingMetric.HEAP_USED, heapUsed.getAvg());
-                    break;
-                case MAX:
-                    out.put(ScalingMetric.HEAP_USED, heapUsed.getMax());
-                    break;
-                default:
-                    LOG.warn("Unknown value {} for heap target", heapTarget);
-            }
+            out.put(ScalingMetric.HEAP_MEMORY_USED, heapUsed.getMax());
             out.put(ScalingMetric.HEAP_MAX_USAGE_RATIO, heapUsed.getMax() / 
heapMax.getMax());
         }
 
+        var managedMemory = 
collectedTmMetrics.get(FlinkMetric.MANAGED_MEMORY_USED);
+        if (managedMemory != null) {
+            out.put(ScalingMetric.MANAGED_MEMORY_USED, managedMemory.getMax());
+        }
+
+        var metaspaceMemory = 
collectedTmMetrics.get(FlinkMetric.METASPACE_MEMORY_USED);
+        if (metaspaceMemory != null) {
+            out.put(ScalingMetric.METASPACE_MEMORY_USED, 
metaspaceMemory.getMax());
+        }
+
         return out;
     }
 
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/ScalingRealizer.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/ScalingRealizer.java
index aba94157..12b7ca36 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/ScalingRealizer.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/ScalingRealizer.java
@@ -19,7 +19,7 @@ package org.apache.flink.autoscaler.realizer;
 
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.autoscaler.JobAutoScalerContext;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.autoscaler.tuning.ConfigChanges;
 
 import java.util.Map;
 
@@ -37,5 +37,5 @@ public interface ScalingRealizer<KEY, Context extends 
JobAutoScalerContext<KEY>>
     void realizeParallelismOverrides(Context context, Map<String, String> 
parallelismOverrides);
 
     /** Updates the TaskManager memory configuration. */
-    void realizeConfigOverrides(Context context, Configuration 
configOverrides);
+    void realizeConfigOverrides(Context context, ConfigChanges configChanges);
 }
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
index 30be323f..7fb369cf 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
@@ -22,7 +22,7 @@ import org.apache.flink.autoscaler.JobAutoScalerContext;
 import org.apache.flink.autoscaler.ScalingSummary;
 import org.apache.flink.autoscaler.ScalingTracking;
 import org.apache.flink.autoscaler.metrics.CollectedMetrics;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.autoscaler.tuning.ConfigChanges;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import javax.annotation.Nonnull;
@@ -70,12 +70,12 @@ public interface AutoScalerStateStore<KEY, Context extends 
JobAutoScalerContext<
 
     void removeParallelismOverrides(Context jobContext) throws Exception;
 
-    void storeConfigOverrides(Context jobContext, Configuration 
configOverrides) throws Exception;
+    void storeConfigChanges(Context jobContext, ConfigChanges configChanges) 
throws Exception;
 
     @Nonnull
-    Configuration getConfigOverrides(Context jobContext) throws Exception;
+    ConfigChanges getConfigChanges(Context jobContext) throws Exception;
 
-    void removeConfigOverrides(Context jobContext) throws Exception;
+    void removeConfigChanges(Context jobContext) throws Exception;
 
     /** Removes all data from this context. Flush stil needs to be called. */
     void clearAll(Context jobContext) throws Exception;
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
index 60a9342e..35084db3 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
@@ -21,7 +21,7 @@ import org.apache.flink.autoscaler.JobAutoScalerContext;
 import org.apache.flink.autoscaler.ScalingSummary;
 import org.apache.flink.autoscaler.ScalingTracking;
 import org.apache.flink.autoscaler.metrics.CollectedMetrics;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.autoscaler.tuning.ConfigChanges;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import javax.annotation.Nonnull;
@@ -50,7 +50,7 @@ public class InMemoryAutoScalerStateStore<KEY, Context 
extends JobAutoScalerCont
 
     private final Map<KEY, Map<String, String>> parallelismOverridesStore;
 
-    private final Map<KEY, Configuration> tmConfigOverrides;
+    private final Map<KEY, ConfigChanges> tmConfigOverrides;
 
     private final Map<KEY, ScalingTracking> scalingTrackingStore;
 
@@ -59,7 +59,7 @@ public class InMemoryAutoScalerStateStore<KEY, Context 
extends JobAutoScalerCont
         collectedMetricsStore = new ConcurrentHashMap<>();
         parallelismOverridesStore = new ConcurrentHashMap<>();
         scalingTrackingStore = new ConcurrentHashMap<>();
-        tmConfigOverrides = new ConcurrentHashMap<>();
+        tmConfigOverrides = new ConcurrentHashMap<KEY, ConfigChanges>();
     }
 
     @Override
@@ -122,20 +122,19 @@ public class InMemoryAutoScalerStateStore<KEY, Context 
extends JobAutoScalerCont
     }
 
     @Override
-    public void storeConfigOverrides(Context jobContext, Configuration 
configOverrides)
-            throws Exception {
-        tmConfigOverrides.put(jobContext.getJobKey(), configOverrides);
+    public void storeConfigChanges(Context jobContext, ConfigChanges 
configChanges) {
+        tmConfigOverrides.put(jobContext.getJobKey(), configChanges);
     }
 
     @Nonnull
     @Override
-    public Configuration getConfigOverrides(Context jobContext) {
+    public ConfigChanges getConfigChanges(Context jobContext) {
         return 
Optional.ofNullable(tmConfigOverrides.get(jobContext.getJobKey()))
-                .orElse(new Configuration());
+                .orElse(new ConfigChanges());
     }
 
     @Override
-    public void removeConfigOverrides(Context jobContext) {
+    public void removeConfigChanges(Context jobContext) {
         tmConfigOverrides.remove(jobContext.getJobKey());
     }
 
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/ConfigChanges.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/ConfigChanges.java
new file mode 100644
index 00000000..f76a22fe
--- /dev/null
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/ConfigChanges.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.tuning;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.FallbackKey;
+
+import lombok.Getter;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/** Holds the configuration overrides and removals for a Flink Configuration. 
*/
+public class ConfigChanges {
+
+    /** Overrides which will be applied on top of the existing config. */
+    @Getter private final Map<String, String> overrides = new HashMap<>();
+
+    /** Removals which will be removed from the existing config. */
+    @Getter private final Set<String> removals = new HashSet<>();
+
+    public <T> ConfigChanges addOverride(ConfigOption<T> configOption, T 
value) {
+        overrides.put(configOption.key(), 
ConfigurationUtils.convertValue(value, String.class));
+        return this;
+    }
+
+    public ConfigChanges addOverride(String key, String value) {
+        overrides.put(key, value);
+        return this;
+    }
+
+    public ConfigChanges addRemoval(ConfigOption<?> configOption) {
+        removals.add(configOption.key());
+        for (FallbackKey fallbackKey : configOption.fallbackKeys()) {
+            removals.add(fallbackKey.getKey());
+        }
+        return this;
+    }
+
+    public Configuration applyOverrides(Configuration existingConfig) {
+        Configuration config = new Configuration(existingConfig);
+        for (String key : removals) {
+            config.removeKey(key);
+        }
+        config.addAll(Configuration.fromMap(overrides));
+        return config;
+    }
+}
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryBudget.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryBudget.java
new file mode 100644
index 00000000..4e18c74c
--- /dev/null
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryBudget.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.tuning;
+
+import org.apache.flink.util.Preconditions;
+
+/** Accounting for distributing memory over the available pools. */
+public class MemoryBudget {
+
+    private long remaining;
+
+    public MemoryBudget(long remaining) {
+        Preconditions.checkArgument(remaining >= 0);
+        this.remaining = remaining;
+    }
+
+    public long budget(long amount) {
+        Preconditions.checkArgument(amount >= 0);
+        var budgeted = Math.min(amount, remaining);
+        remaining -= budgeted;
+        return budgeted;
+    }
+
+    public long getRemaining() {
+        return remaining;
+    }
+}
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java
index b4cd00e5..8ca0dcc3 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java
@@ -21,11 +21,11 @@ import org.apache.flink.autoscaler.JobAutoScalerContext;
 import org.apache.flink.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
 import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
 import org.apache.flink.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.MemorySize;
-import org.apache.flink.configuration.StateBackendOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.util.config.memory.CommonProcessMemorySpec;
@@ -41,6 +41,10 @@ import org.slf4j.LoggerFactory;
 import java.util.Arrays;
 import java.util.Map;
 
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.HEAP_MEMORY_USED;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.MANAGED_MEMORY_USED;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.METASPACE_MEMORY_USED;
+
 /** Tunes the TaskManager memory. */
 public class MemoryTuning {
 
@@ -48,13 +52,7 @@ public class MemoryTuning {
     public static final ProcessMemoryUtils<TaskExecutorFlinkMemory> 
FLINK_MEMORY_UTILS =
             new ProcessMemoryUtils<>(getMemoryOptions(), new 
TaskExecutorFlinkMemoryUtils());
 
-    private static final Configuration EMPTY_CONFIG = new Configuration();
-
-    /** What memory usage to target. */
-    public enum HeapUsageTarget {
-        AVG,
-        MAX
-    }
+    private static final ConfigChanges EMPTY_CONFIG = new ConfigChanges();
 
     /**
      * Emits a Configuration which contains overrides for the current 
configuration. We are not
@@ -62,7 +60,7 @@ public class MemoryTuning {
      * overrides. This config is persisted separately and applied by the 
autoscaler. That way we can
      * clear any applied overrides if auto-tuning is disabled.
      */
-    public static Configuration tuneTaskManagerHeapMemory(
+    public static ConfigChanges tuneTaskManagerHeapMemory(
             JobAutoScalerContext<?> context,
             EvaluatedMetrics evaluatedMetrics,
             AutoScalerEventHandler eventHandler) {
@@ -80,57 +78,97 @@ public class MemoryTuning {
             return EMPTY_CONFIG;
         }
 
-        var maxHeapSize = memSpecs.getFlinkMemory().getJvmHeapMemorySize();
-        LOG.debug("Current configured heap size: {}", maxHeapSize);
+        MemorySize specHeapSize = 
memSpecs.getFlinkMemory().getJvmHeapMemorySize();
+        MemorySize specManagedSize = memSpecs.getFlinkMemory().getManaged();
+        MemorySize specNetworkSize = memSpecs.getFlinkMemory().getNetwork();
+        MemorySize specMetaspaceSize = memSpecs.getJvmMetaspaceSize();
+        LOG.info(
+                "Spec memory - heap: {}, managed: {}, network: {}, meta: {}",
+                specHeapSize.toHumanReadableString(),
+                specManagedSize.toHumanReadableString(),
+                specNetworkSize.toHumanReadableString(),
+                specMetaspaceSize.toHumanReadableString());
+
+        MemorySize maxMemoryBySpec = 
context.getTaskManagerMemory().orElse(MemorySize.ZERO);
+        if (maxMemoryBySpec.compareTo(MemorySize.ZERO) <= 0) {
+            LOG.warn("Spec TaskManager memory size could not be determined.");
+            return EMPTY_CONFIG;
+        }
+        MemoryBudget memBudget = new MemoryBudget(maxMemoryBySpec.getBytes());
+        // Add these current settings from the budget
+        
memBudget.budget(memSpecs.getFlinkMemory().getFrameworkOffHeap().getBytes());
+        
memBudget.budget(memSpecs.getFlinkMemory().getTaskOffHeap().getBytes());
+        memBudget.budget(memSpecs.getJvmOverheadSize().getBytes());
 
-        MemorySize newHeapSize = determineNewHeapSize(evaluatedMetrics, 
config, maxHeapSize);
-        LOG.info("New TM heap memory {}", newHeapSize.toHumanReadableString());
+        var globalMetrics = evaluatedMetrics.getGlobalMetrics();
+        // The order matters in case the memory usage is higher than the 
maximum available memory.
+        // Managed memory comes last because it can grow arbitrary for RocksDB 
jobs.
+        MemorySize newNetworkSize = adjustNetworkMemory(specNetworkSize, 
memBudget);
+        MemorySize newHeapSize =
+                determineNewSize(getUsage(HEAP_MEMORY_USED, globalMetrics), 
config, memBudget);
+        MemorySize newMetaspaceSize =
+                determineNewSize(getUsage(METASPACE_MEMORY_USED, 
globalMetrics), config, memBudget);
+        MemorySize newManagedSize =
+                adjustManagedMemory(
+                        getUsage(MANAGED_MEMORY_USED, globalMetrics),
+                        specManagedSize,
+                        config,
+                        memBudget);
+        LOG.info(
+                "Optimized memory sizes: heap: {} managed: {}, network: {}, 
meta: {}",
+                newHeapSize.toHumanReadableString(),
+                newManagedSize.toHumanReadableString(),
+                newNetworkSize.toHumanReadableString(),
+                newMetaspaceSize.toHumanReadableString());
 
         // Diff can be negative (memory shrinks) or positive (memory grows)
-        final long heapDiffBytes = newHeapSize.getBytes() - 
maxHeapSize.getBytes();
-
-        final MemorySize totalMemory = adjustTotalTmMemory(context, 
heapDiffBytes);
-        if (totalMemory.equals(MemorySize.ZERO)) {
+        final long heapDiffBytes = newHeapSize.getBytes() - 
specHeapSize.getBytes();
+        final long managedDiffBytes = newManagedSize.getBytes() - 
specManagedSize.getBytes();
+        final long networkDiffBytes = newNetworkSize.getBytes() - 
specNetworkSize.getBytes();
+        final long flinkMemoryDiffBytes = heapDiffBytes + managedDiffBytes + 
networkDiffBytes;
+
+        // Update total memory according to memory diffs
+        final MemorySize totalMemory =
+                new MemorySize(maxMemoryBySpec.getBytes() - 
memBudget.getRemaining());
+        if (totalMemory.compareTo(MemorySize.ZERO) <= 0) {
+            LOG.warn("Invalid total memory configuration: {}", totalMemory);
             return EMPTY_CONFIG;
         }
 
         // Prepare the tuning config for new configuration values
-        var tuningConfig = new Configuration();
-        // Update total memory according to new heap size
-        // Adjust the total container memory and the JVM heap size accordingly.
-        tuningConfig.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, totalMemory);
-        // Framework and Task heap memory configs add up together yield the 
max heap memory.
-        // To simplify the calculation, set the framework heap memory to zero.
-        tuningConfig.set(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY, 
MemorySize.ZERO);
-        tuningConfig.set(TaskManagerOptions.TASK_HEAP_MEMORY, newHeapSize);
-
-        // All memory options which can be configured via fractions need to be 
set to their
-        // absolute values or, if there is no absolute setting, the fractions 
need to be
-        // re-calculated.
-        MemorySize managedMemory = memSpecs.getFlinkMemory().getManaged();
-        if (shouldTransferHeapToManagedMemory(config, heapDiffBytes)) {
-            // If RocksDB is configured, give back the heap memory as managed 
memory to RocksDB
-            MemorySize newManagedMemory =
-                    new MemorySize(managedMemory.getBytes() + 
Math.abs(heapDiffBytes));
-            LOG.info(
-                    "Increasing managed memory size from {} to {}",
-                    managedMemory,
-                    newManagedMemory);
-            tuningConfig.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
newManagedMemory);
-        } else {
-            tuningConfig.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
managedMemory);
-        }
-
-        tuningConfig.set(
+        var tuningConfig = new ConfigChanges();
+        // Adjust the total container memory
+        tuningConfig.addOverride(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
totalMemory);
+        // We do not set the framework/task heap memory because those are 
automatically derived from
+        // setting the other mandatory memory options for managed memory, 
network, metaspace and jvm
+        // overhead. However, we do precise accounting for heap memory above. 
In contrast to other
+        // memory pools, there are no fractional variants for heap memory. 
Setting the absolute heap
+        // memory options could cause invalid configuration states when users 
adapt the total amount
+        // of memory. We also need to take care to remove any user-provided 
overrides for those.
+        tuningConfig.addRemoval(TaskManagerOptions.TASK_HEAP_MEMORY);
+        // Set default to zero because we already account for heap via task 
heap.
+        tuningConfig.addOverride(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY, 
MemorySize.ZERO);
+
+        MemorySize flinkMemorySize =
+                new MemorySize(
+                        memSpecs.getTotalFlinkMemorySize().getBytes() + 
flinkMemoryDiffBytes);
+
+        // All memory options which can be configured via fractions need to be 
re-calculated.
+        tuningConfig.addOverride(
+                TaskManagerOptions.MANAGED_MEMORY_FRACTION,
+                getFraction(newManagedSize, flinkMemorySize));
+        tuningConfig.addRemoval(TaskManagerOptions.MANAGED_MEMORY_SIZE);
+
+        tuningConfig.addOverride(
                 TaskManagerOptions.NETWORK_MEMORY_FRACTION,
-                getFraction(
-                        memSpecs.getFlinkMemory().getNetwork(),
-                        new MemorySize(
-                                memSpecs.getTotalFlinkMemorySize().getBytes() 
+ heapDiffBytes)));
-        tuningConfig.set(
+                getFraction(newNetworkSize, flinkMemorySize));
+
+        tuningConfig.addOverride(
                 TaskManagerOptions.JVM_OVERHEAD_FRACTION,
                 getFraction(memSpecs.getJvmOverheadSize(), totalMemory));
 
+        tuningConfig.addOverride(TaskManagerOptions.JVM_METASPACE, 
newMetaspaceSize);
+
         eventHandler.handleEvent(
                 context,
                 AutoScalerEventHandler.Type.Normal,
@@ -151,39 +189,47 @@ public class MemoryTuning {
         return tuningConfig;
     }
 
-    private static MemorySize determineNewHeapSize(
-            EvaluatedMetrics evaluatedMetrics, Configuration config, 
MemorySize maxHeapSize) {
+    private static MemorySize determineNewSize(
+            MemorySize usage, Configuration config, MemoryBudget memoryBudget) 
{
 
-        double overheadFactor = 1 + 
config.get(AutoScalerOptions.MEMORY_TUNING_HEAP_OVERHEAD);
-        long heapTargetSizeBytes =
-                (long) (getHeapUsed(evaluatedMetrics).getBytes() * 
overheadFactor);
+        double overheadFactor = 1 + 
config.get(AutoScalerOptions.MEMORY_TUNING_OVERHEAD);
+        long targetSizeBytes = (long) (usage.getBytes() * overheadFactor);
 
-        // Apply min/max heap size limits
-        heapTargetSizeBytes =
-                Math.min(
-                        Math.max(
-                                // Lower limit is the minimum configured heap 
size
-                                
config.get(AutoScalerOptions.MEMORY_TUNING_MIN_HEAP).getBytes(),
-                                heapTargetSizeBytes),
-                        // Upper limit is the original max heap size in the 
spec
-                        maxHeapSize.getBytes());
+        // Upper limit is the available memory budget
+        targetSizeBytes = memoryBudget.budget(targetSizeBytes);
 
-        return new MemorySize(heapTargetSizeBytes);
+        return new MemorySize(targetSizeBytes);
     }
 
-    private static MemorySize getHeapUsed(EvaluatedMetrics evaluatedMetrics) {
-        var globalMetrics = evaluatedMetrics.getGlobalMetrics();
-        MemorySize heapUsed =
-                new MemorySize((long) 
globalMetrics.get(ScalingMetric.HEAP_USED).getAverage());
-        LOG.info("TM heap used size: {}", heapUsed);
-        return heapUsed;
+    private static MemorySize adjustManagedMemory(
+            MemorySize managedMemoryUsage,
+            MemorySize managedMemoryConfigured,
+            Configuration config,
+            MemoryBudget memBudget) {
+        // Managed memory usage can't accurately be measured yet.
+        // It is either zero (no usage) or an opaque amount of memory 
(RocksDB).
+        if (managedMemoryUsage.compareTo(MemorySize.ZERO) <= 0) {
+            return MemorySize.ZERO;
+        } else if 
(config.get(AutoScalerOptions.MEMORY_TUNING_MAXIMIZE_MANAGED_MEMORY)) {
+            long maxManagedMemorySize = memBudget.budget(Long.MAX_VALUE);
+            return new MemorySize(maxManagedMemorySize);
+        } else {
+            return managedMemoryConfigured;
+        }
+    }
+
+    private static MemorySize adjustNetworkMemory(MemorySize usage, 
MemoryBudget memBudget) {
+        // TODO mxm: Follow-up to tune network memory via
+        // https://issues.apache.org/jira/browse/FLINK-34471
+        long networkBytes = memBudget.budget(usage.getBytes());
+        return new MemorySize(networkBytes);
     }
 
-    private static boolean shouldTransferHeapToManagedMemory(
-            Configuration config, long heapDiffBytes) {
-        return 
config.get(AutoScalerOptions.MEMORY_TUNING_TRANSFER_HEAP_TO_MANAGED)
-                && heapDiffBytes < 0
-                && 
"rocksdb".equalsIgnoreCase(config.get(StateBackendOptions.STATE_BACKEND));
+    private static MemorySize getUsage(
+            ScalingMetric scalingMetric, Map<ScalingMetric, 
EvaluatedScalingMetric> globalMetrics) {
+        MemorySize heapUsed = new MemorySize((long) 
globalMetrics.get(scalingMetric).getAverage());
+        LOG.debug("{}: {}", scalingMetric, heapUsed);
+        return heapUsed;
     }
 
     public static MemorySize getTotalMemory(Configuration config, 
JobAutoScalerContext<?> ctx) {
@@ -194,28 +240,6 @@ public class MemoryTuning {
         return ctx.getTaskManagerMemory().orElse(MemorySize.ZERO);
     }
 
-    private static MemorySize adjustTotalTmMemory(JobAutoScalerContext<?> ctx, 
long heapDiffBytes) {
-
-        var specTaskManagerMemory = 
ctx.getTaskManagerMemory().orElse(MemorySize.ZERO);
-        if (specTaskManagerMemory.compareTo(MemorySize.ZERO) <= 0) {
-            LOG.warn("Spec TaskManager memory size could not be determined.");
-            return MemorySize.ZERO;
-        }
-
-        if (shouldTransferHeapToManagedMemory(ctx.getConfiguration(), 
heapDiffBytes)) {
-            // Total size does not change
-            return specTaskManagerMemory;
-        }
-
-        long newTotalMemBytes = specTaskManagerMemory.getBytes() + 
heapDiffBytes;
-        // TM container memory can never grow beyond the user-specified max 
memory
-        newTotalMemBytes = Math.min(newTotalMemBytes, 
specTaskManagerMemory.getBytes());
-
-        MemorySize totalMemory = new MemorySize(newTotalMemBytes);
-        LOG.info("Setting new total TaskManager memory to {}", totalMemory);
-        return totalMemory;
-    }
-
     private static ProcessMemoryOptions getMemoryOptions() {
         return new ProcessMemoryOptions(
                 Arrays.asList(
@@ -238,14 +262,27 @@ public class MemoryTuning {
     }
 
     /** Format config such that it can be directly used as a Flink 
configuration. */
-    private static String formatConfig(Configuration config) {
+    private static String formatConfig(ConfigChanges config) {
         var sb = new StringBuilder();
-        for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
+        for (Map.Entry<String, String> entry : 
config.getOverrides().entrySet()) {
             sb.append(entry.getKey())
                     .append(": ")
                     .append(entry.getValue())
                     .append(System.lineSeparator());
         }
+        if (!config.getRemovals().isEmpty()) {
+            sb.append("Remove the following config entries if present: [");
+            boolean first = true;
+            for (String toRemove : config.getRemovals()) {
+                if (first) {
+                    first = false;
+                } else {
+                    sb.append(", ");
+                }
+                sb.append(toRemove);
+            }
+            sb.append("]");
+        }
         return sb.toString();
     }
 }
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
index 80bbe75a..e3ed6625 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
@@ -30,9 +30,8 @@ import org.apache.flink.autoscaler.state.AutoScalerStateStore;
 import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
 import org.apache.flink.autoscaler.topology.JobTopology;
 import org.apache.flink.autoscaler.topology.VertexInfo;
+import org.apache.flink.autoscaler.tuning.ConfigChanges;
 import org.apache.flink.client.program.rest.RestClusterClient;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.metrics.Gauge;
@@ -306,23 +305,27 @@ public class JobAutoScalerImplTest {
                         null, null, null, eventCollector, scalingRealizer, 
stateStore);
 
         // Initially we should return empty overrides, do not crate any state
-        assertThat(stateStore.getConfigOverrides(context).toMap()).isEmpty();
+        
assertThat(stateStore.getConfigChanges(context).getOverrides()).isEmpty();
 
-        var config = new Configuration();
-        config.set(TaskManagerOptions.TASK_HEAP_MEMORY, new MemorySize(42));
-        stateStore.storeConfigOverrides(context, config);
+        ConfigChanges config = new ConfigChanges();
+        config.addOverride(TaskManagerOptions.MANAGED_MEMORY_FRACTION, 0.42f);
+        config.addRemoval(TaskManagerOptions.TASK_HEAP_MEMORY);
+        stateStore.storeConfigChanges(context, config);
         stateStore.flush(context);
 
         autoscaler.applyConfigOverrides(context);
-        assertThat(getEvent().getConfigOverrides().toMap())
-                
.containsExactly(entry(TaskManagerOptions.TASK_HEAP_MEMORY.key(), "42 bytes"));
-        assertThat(stateStore.getConfigOverrides(context)).isEqualTo(config);
+        var event = getEvent();
+        assertThat(event.getConfigChanges().getOverrides())
+                
.containsExactly(entry(TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(), 
"0.42"));
+        assertThat(event.getConfigChanges().getRemovals())
+                .containsExactly(TaskManagerOptions.TASK_HEAP_MEMORY.key());
+        assertThat(stateStore.getConfigChanges(context)).isEqualTo(config);
 
         // Disabling autoscaler should clear overrides
         context.getConfiguration().setString(AUTOSCALER_ENABLED.key(), 
"false");
         autoscaler.scale(context);
         autoscaler.applyConfigOverrides(context);
-        assertThat(getEvent().getConfigOverrides().toMap()).isEmpty();
+        assertThat(getEvent().getConfigChanges().getOverrides()).isEmpty();
     }
 
     @Test
@@ -368,17 +371,6 @@ public class JobAutoScalerImplTest {
         assertEquals(expectedOverrides, 
scalingEvent.getParallelismOverrides());
     }
 
-    @Nullable
-    private TestingScalingRealizer.Event<JobID, JobAutoScalerContext<JobID>> 
getEvent(
-            Map<String, String> expectedOverrides) {
-        var scalingEvent = getEvent();
-        if (expectedOverrides == null) {
-            assertThat(scalingEvent).isNull();
-            return null;
-        }
-        return scalingEvent;
-    }
-
     @Nullable
     private TestingScalingRealizer.Event<JobID, JobAutoScalerContext<JobID>> 
getEvent() {
         return scalingRealizer.events.poll();
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java
index 863950af..0cf5b15a 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java
@@ -60,6 +60,8 @@ public class RestApiMetricsCollectorTest {
     private static final String GC_METRIC_NAME = 
"Status.JVM.GarbageCollector.All.TimeMsPerSecond";
     private static final String HEAP_MAX_NAME = "Status.JVM.Memory.Heap.Max";
     private static final String HEAP_USED_NAME = "Status.JVM.Memory.Heap.Used";
+    private static final String MANAGED_MEMORY_NAME = 
"Status.Flink.Memory.Managed.Used";
+    private static final String METASPACE_MEMORY_NAME = 
"Status.JVM.Memory.Metaspace.Used";
 
     @Test
     public void testAggregateMultiplePendingRecordsMetricsPerSource() throws 
Exception {
@@ -233,11 +235,23 @@ public class RestApiMetricsCollectorTest {
         // Test only heap metrics available
         var heapMax = new AggregatedMetric(HEAP_MAX_NAME, null, 100., null, 
null);
         var heapUsed = new AggregatedMetric(HEAP_USED_NAME, null, 50., null, 
null);
+        var managedUsed = new AggregatedMetric(MANAGED_MEMORY_NAME, null, 42., 
null, null);
+        var metaspaceUsed = new AggregatedMetric(METASPACE_MEMORY_NAME, null, 
11., null, null);
         metricValues.put(HEAP_MAX_NAME, heapMax);
         metricValues.put(HEAP_USED_NAME, heapUsed);
+        metricValues.put(MANAGED_MEMORY_NAME, managedUsed);
+        metricValues.put(METASPACE_MEMORY_NAME, metaspaceUsed);
 
         assertMetricsEquals(
-                Map.of(FlinkMetric.HEAP_MAX, heapMax, FlinkMetric.HEAP_USED, 
heapUsed),
+                Map.of(
+                        FlinkMetric.HEAP_MEMORY_MAX,
+                        heapMax,
+                        FlinkMetric.HEAP_MEMORY_USED,
+                        heapUsed,
+                        FlinkMetric.MANAGED_MEMORY_USED,
+                        managedUsed,
+                        FlinkMetric.METASPACE_MEMORY_USED,
+                        metaspaceUsed),
                 collector.queryTmMetrics(context));
         collector.cleanup(context.getJobKey());
 
@@ -247,10 +261,14 @@ public class RestApiMetricsCollectorTest {
 
         assertMetricsEquals(
                 Map.of(
-                        FlinkMetric.HEAP_MAX,
+                        FlinkMetric.HEAP_MEMORY_MAX,
                         heapMax,
-                        FlinkMetric.HEAP_USED,
+                        FlinkMetric.HEAP_MEMORY_USED,
                         heapUsed,
+                        FlinkMetric.MANAGED_MEMORY_USED,
+                        managedUsed,
+                        FlinkMetric.METASPACE_MEMORY_USED,
+                        metaspaceUsed,
                         FlinkMetric.TOTAL_GC_TIME_PER_SEC,
                         gcTime),
                 collector.queryTmMetrics(context));
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
index da116223..ef5d99f2 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
@@ -273,8 +273,12 @@ public class ScalingExecutorTest {
                 Map.of(
                         ScalingMetric.NUM_TASK_SLOTS_USED,
                         EvaluatedScalingMetric.of(9),
-                        ScalingMetric.HEAP_USED,
+                        ScalingMetric.HEAP_MEMORY_USED,
                         EvaluatedScalingMetric.avg(MemorySize.parse("5 
Gb").getBytes()),
+                        ScalingMetric.MANAGED_MEMORY_USED,
+                        EvaluatedScalingMetric.avg(MemorySize.parse("2 
Gb").getBytes()),
+                        ScalingMetric.METASPACE_MEMORY_USED,
+                        EvaluatedScalingMetric.avg(MemorySize.parse("300 
mb").getBytes()),
                         ScalingMetric.HEAP_MAX_USAGE_RATIO,
                         EvaluatedScalingMetric.of(Double.NaN),
                         ScalingMetric.GC_PRESSURE,
@@ -286,12 +290,21 @@ public class ScalingExecutorTest {
         assertTrue(
                 scalingDecisionExecutor.scaleResource(
                         context, metrics, new HashMap<>(), new 
ScalingTracking(), now));
-        assertEquals(
-                "6.000gb (6442450944 bytes)",
-                stateStore
-                        .getConfigOverrides(context)
-                        .get(TaskManagerOptions.TASK_HEAP_MEMORY)
-                        .toHumanReadableString());
+        assertThat(stateStore.getConfigChanges(context).getOverrides())
+                .containsExactlyInAnyOrderEntriesOf(
+                        Map.of(
+                                
TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(),
+                                "0.561",
+                                
TaskManagerOptions.NETWORK_MEMORY_FRACTION.key(),
+                                "0.14",
+                                TaskManagerOptions.JVM_METASPACE.key(),
+                                "360 mb",
+                                TaskManagerOptions.JVM_OVERHEAD_FRACTION.key(),
+                                "0.097",
+                                TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key(),
+                                "0 bytes",
+                                TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(),
+                                "11114905646 bytes"));
     }
 
     @ParameterizedTest
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
index c6b594a4..bfba0c0b 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
@@ -503,7 +503,11 @@ public class ScalingMetricEvaluatorTest {
                         EvaluatedScalingMetric.of(Double.NaN),
                         ScalingMetric.GC_PRESSURE,
                         EvaluatedScalingMetric.of(Double.NaN),
-                        ScalingMetric.HEAP_USED,
+                        ScalingMetric.HEAP_MEMORY_USED,
+                        EvaluatedScalingMetric.of(Double.NaN),
+                        ScalingMetric.MANAGED_MEMORY_USED,
+                        EvaluatedScalingMetric.of(Double.NaN),
+                        ScalingMetric.METASPACE_MEMORY_USED,
                         EvaluatedScalingMetric.of(Double.NaN),
                         ScalingMetric.NUM_TASK_SLOTS_USED,
                         EvaluatedScalingMetric.of(Double.NaN)),
@@ -518,16 +522,24 @@ public class ScalingMetricEvaluatorTest {
                                 0.5,
                                 ScalingMetric.GC_PRESSURE,
                                 0.6,
-                                ScalingMetric.HEAP_USED,
-                                512.)));
+                                ScalingMetric.HEAP_MEMORY_USED,
+                                512.,
+                                ScalingMetric.MANAGED_MEMORY_USED,
+                                420.,
+                                ScalingMetric.METASPACE_MEMORY_USED,
+                                110.)));
         assertEquals(
                 Map.of(
                         ScalingMetric.HEAP_MAX_USAGE_RATIO,
                         new EvaluatedScalingMetric(0.5, 0.5),
                         ScalingMetric.GC_PRESSURE,
                         EvaluatedScalingMetric.of(0.6),
-                        ScalingMetric.HEAP_USED,
+                        ScalingMetric.HEAP_MEMORY_USED,
                         new EvaluatedScalingMetric(512, 512),
+                        ScalingMetric.MANAGED_MEMORY_USED,
+                        new EvaluatedScalingMetric(420, 420),
+                        ScalingMetric.METASPACE_MEMORY_USED,
+                        new EvaluatedScalingMetric(110, 110),
                         ScalingMetric.NUM_TASK_SLOTS_USED,
                         EvaluatedScalingMetric.of(Double.NaN)),
                 ScalingMetricEvaluator.evaluateGlobalMetrics(globalMetrics));
@@ -541,8 +553,12 @@ public class ScalingMetricEvaluatorTest {
                                 0.7,
                                 ScalingMetric.GC_PRESSURE,
                                 0.8,
-                                ScalingMetric.HEAP_USED,
+                                ScalingMetric.HEAP_MEMORY_USED,
                                 1024.,
+                                ScalingMetric.MANAGED_MEMORY_USED,
+                                840.,
+                                ScalingMetric.METASPACE_MEMORY_USED,
+                                220.,
                                 ScalingMetric.NUM_TASK_SLOTS_USED,
                                 42.)));
         assertEquals(
@@ -551,8 +567,12 @@ public class ScalingMetricEvaluatorTest {
                         new EvaluatedScalingMetric(0.7, 0.6),
                         ScalingMetric.GC_PRESSURE,
                         EvaluatedScalingMetric.of(0.8),
-                        ScalingMetric.HEAP_USED,
+                        ScalingMetric.HEAP_MEMORY_USED,
                         new EvaluatedScalingMetric(1024., 768.),
+                        ScalingMetric.MANAGED_MEMORY_USED,
+                        new EvaluatedScalingMetric(840., 630.),
+                        ScalingMetric.METASPACE_MEMORY_USED,
+                        new EvaluatedScalingMetric(220., 165.),
                         ScalingMetric.NUM_TASK_SLOTS_USED,
                         EvaluatedScalingMetric.of(42.)),
                 ScalingMetricEvaluator.evaluateGlobalMetrics(globalMetrics));
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
index 2ce95f70..c689a0a9 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
@@ -21,7 +21,6 @@ import org.apache.flink.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.autoscaler.topology.IOMetrics;
 import org.apache.flink.autoscaler.topology.JobTopology;
 import org.apache.flink.autoscaler.topology.VertexInfo;
-import org.apache.flink.autoscaler.tuning.MemoryTuning;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
@@ -247,40 +246,32 @@ public class ScalingMetricsTest {
     @Test
     public void testGlobalMetrics() {
         Configuration conf = new Configuration();
-        conf.set(AutoScalerOptions.MEMORY_TUNING_HEAP_TARGET, 
MemoryTuning.HeapUsageTarget.AVG);
         assertEquals(Map.of(), ScalingMetrics.computeGlobalMetrics(Map.of(), 
Map.of(), conf));
         assertEquals(
                 Map.of(),
                 ScalingMetrics.computeGlobalMetrics(
-                        Map.of(), Map.of(FlinkMetric.HEAP_USED, aggMax(100)), 
conf));
+                        Map.of(), Map.of(FlinkMetric.HEAP_MEMORY_USED, 
aggMax(100)), conf));
+
         assertEquals(
                 Map.of(
                         ScalingMetric.HEAP_MAX_USAGE_RATIO,
                         0.5,
-                        ScalingMetric.GC_PRESSURE,
-                        0.25,
-                        ScalingMetric.HEAP_USED,
-                        75.),
-                ScalingMetrics.computeGlobalMetrics(
-                        Map.of(),
-                        Map.of(
-                                FlinkMetric.HEAP_USED,
-                                aggAvgMax(75, 100),
-                                FlinkMetric.HEAP_MAX,
-                                aggMax(200.),
-                                FlinkMetric.TOTAL_GC_TIME_PER_SEC,
-                                aggMax(250.)),
-                        conf));
-
-        conf.set(AutoScalerOptions.MEMORY_TUNING_HEAP_TARGET, 
MemoryTuning.HeapUsageTarget.MAX);
-        assertEquals(
-                Map.of(ScalingMetric.HEAP_MAX_USAGE_RATIO, 0.5, 
ScalingMetric.HEAP_USED, 100.),
+                        ScalingMetric.HEAP_MEMORY_USED,
+                        100.,
+                        ScalingMetric.MANAGED_MEMORY_USED,
+                        133.,
+                        ScalingMetric.METASPACE_MEMORY_USED,
+                        22.),
                 ScalingMetrics.computeGlobalMetrics(
                         Map.of(),
                         Map.of(
-                                FlinkMetric.HEAP_USED,
+                                FlinkMetric.HEAP_MEMORY_USED,
                                 aggAvgMax(75, 100),
-                                FlinkMetric.HEAP_MAX,
+                                FlinkMetric.MANAGED_MEMORY_USED,
+                                aggAvgMax(128, 133),
+                                FlinkMetric.METASPACE_MEMORY_USED,
+                                aggAvgMax(11, 22),
+                                FlinkMetric.HEAP_MEMORY_MAX,
                                 aggMax(200.)),
                         conf));
     }
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/realizer/TestingScalingRealizer.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/realizer/TestingScalingRealizer.java
index d46c5bba..df0d1d55 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/realizer/TestingScalingRealizer.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/realizer/TestingScalingRealizer.java
@@ -18,7 +18,7 @@
 package org.apache.flink.autoscaler.realizer;
 
 import org.apache.flink.autoscaler.JobAutoScalerContext;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.autoscaler.tuning.ConfigChanges;
 
 import lombok.Getter;
 
@@ -38,8 +38,8 @@ public class TestingScalingRealizer<KEY, Context extends 
JobAutoScalerContext<KE
     }
 
     @Override
-    public void realizeConfigOverrides(Context context, Configuration 
configOverrides) {
-        events.add(new Event<>(context, configOverrides));
+    public void realizeConfigOverrides(Context context, ConfigChanges 
configChanges) {
+        events.add(new Event<>(context, configChanges));
     }
 
     /** The collected event. */
@@ -49,16 +49,16 @@ public class TestingScalingRealizer<KEY, Context extends 
JobAutoScalerContext<KE
 
         @Getter private Map<String, String> parallelismOverrides;
 
-        @Getter private Configuration configOverrides;
+        @Getter private ConfigChanges configChanges;
 
         public Event(Context context, Map<String, String> 
parallelismOverrides) {
             this.context = context;
             this.parallelismOverrides = parallelismOverrides;
         }
 
-        public Event(Context context, Configuration configOverrides) {
+        public Event(Context context, ConfigChanges configChanges) {
             this.context = context;
-            this.configOverrides = configOverrides;
+            this.configChanges = configChanges;
         }
 
         @Override
@@ -69,7 +69,7 @@ public class TestingScalingRealizer<KEY, Context extends 
JobAutoScalerContext<KE
                     + ", parallelismOverrides="
                     + parallelismOverrides
                     + ", configOverrides="
-                    + configOverrides
+                    + configChanges
                     + '}';
         }
     }
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java
index 11db27c0..da0f0c2c 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java
@@ -21,7 +21,7 @@ import org.apache.flink.autoscaler.JobAutoScalerContext;
 import org.apache.flink.autoscaler.ScalingSummary;
 import org.apache.flink.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.autoscaler.metrics.CollectedMetrics;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.autoscaler.tuning.ConfigChanges;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import org.junit.jupiter.api.Assertions;
@@ -184,26 +184,26 @@ public abstract class AbstractAutoScalerStateStoreTest<
                         new JobVertexID(),
                         new TreeMap<>(Map.of(Instant.now(), new 
ScalingSummary()))));
         stateStore.storeParallelismOverrides(ctx, Map.of(new 
JobVertexID().toHexString(), "23"));
-        stateStore.storeConfigOverrides(
-                ctx, Configuration.fromMap(Map.of("config.value", "value")));
+        stateStore.storeConfigChanges(
+                ctx, new ConfigChanges().addOverride("config.value", "value"));
 
         assertThat(stateStore.getCollectedMetrics(ctx)).isNotEmpty();
         assertThat(stateStore.getScalingHistory(ctx)).isNotEmpty();
         assertThat(stateStore.getParallelismOverrides(ctx)).isNotEmpty();
-        assertThat(stateStore.getConfigOverrides(ctx).toMap()).isNotEmpty();
+        
assertThat(stateStore.getConfigChanges(ctx).getOverrides()).isNotEmpty();
 
         stateStore.flush(ctx);
 
         assertThat(stateStore.getCollectedMetrics(ctx)).isNotEmpty();
         assertThat(stateStore.getScalingHistory(ctx)).isNotEmpty();
         assertThat(stateStore.getParallelismOverrides(ctx)).isNotEmpty();
-        assertThat(stateStore.getConfigOverrides(ctx).toMap()).isNotEmpty();
+        
assertThat(stateStore.getConfigChanges(ctx).getOverrides()).isNotEmpty();
 
         stateStore.clearAll(ctx);
 
         assertThat(stateStore.getCollectedMetrics(ctx)).isEmpty();
         assertThat(stateStore.getScalingHistory(ctx)).isEmpty();
         assertThat(stateStore.getParallelismOverrides(ctx)).isEmpty();
-        assertThat(stateStore.getConfigOverrides(ctx).toMap()).isEmpty();
+        assertThat(stateStore.getConfigChanges(ctx).getOverrides()).isEmpty();
     }
 }
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/MemoryTuningTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/MemoryTuningTest.java
index 1bbf518f..3e2583a1 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/MemoryTuningTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/MemoryTuningTest.java
@@ -25,15 +25,16 @@ import 
org.apache.flink.autoscaler.event.TestingEventCollector;
 import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
 import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
 import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.tuning.ConfigChanges;
 import org.apache.flink.autoscaler.tuning.MemoryTuning;
 import org.apache.flink.configuration.MemorySize;
-import org.apache.flink.configuration.StateBackendOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
+import java.util.HashMap;
 import java.util.Map;
 
 import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
@@ -71,61 +72,93 @@ public class MemoryTuningTest {
 
         var globalMetrics =
                 Map.of(
-                        ScalingMetric.HEAP_USED,
-                        
EvaluatedScalingMetric.avg(MemorySize.ofMebiBytes(5096).getBytes()));
+                        ScalingMetric.HEAP_MEMORY_USED,
+                        
EvaluatedScalingMetric.avg(MemorySize.ofMebiBytes(5096).getBytes()),
+                        ScalingMetric.MANAGED_MEMORY_USED,
+                        
EvaluatedScalingMetric.avg(MemorySize.ofMebiBytes(10000).getBytes()),
+                        ScalingMetric.METASPACE_MEMORY_USED,
+                        
EvaluatedScalingMetric.avg(MemorySize.ofMebiBytes(100).getBytes()));
 
         var metrics = new EvaluatedMetrics(vertexMetrics, globalMetrics);
 
-        Map<String, String> overrides =
-                MemoryTuning.tuneTaskManagerHeapMemory(context, metrics, 
eventHandler).toMap();
+        ConfigChanges configChanges =
+                MemoryTuning.tuneTaskManagerHeapMemory(context, metrics, 
eventHandler);
         // Test reducing overall memory
-        assertThat(overrides)
+        assertThat(configChanges.getOverrides())
                 .containsExactlyInAnyOrderEntriesOf(
                         Map.of(
-                                TaskManagerOptions.TASK_HEAP_MEMORY.key(),
-                                "6412251955 bytes",
-                                TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key(),
-                                "0 bytes",
-                                TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
-                                "12348031160 bytes",
-                                TaskManagerOptions.JVM_OVERHEAD_FRACTION.key(),
-                                "0.046",
+                                
TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(),
+                                "0.562",
                                 
TaskManagerOptions.NETWORK_MEMORY_FRACTION.key(),
                                 "0.14",
+                                TaskManagerOptions.JVM_METASPACE.key(),
+                                "120 mb",
+                                TaskManagerOptions.JVM_OVERHEAD_FRACTION.key(),
+                                "0.099",
+                                TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key(),
+                                "0 bytes",
                                 TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(),
-                                "23323685913 bytes"));
+                                "10833048417 bytes"));
+
+        assertThat(configChanges.getRemovals())
+                .containsExactlyInAnyOrder(
+                        TaskManagerOptions.TASK_HEAP_MEMORY.key(),
+                        TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
+                        TaskManagerOptions.MANAGED_MEMORY_SIZE
+                                .fallbackKeys()
+                                .iterator()
+                                .next()
+                                .getKey());
 
         assertThat(eventHandler.events.poll().getMessage())
                 .startsWith(
                         "Memory tuning recommends the following configuration 
(automatic tuning is enabled):");
 
-        // Test giving back memory to RocksDB
-        config.set(AutoScalerOptions.MEMORY_TUNING_TRANSFER_HEAP_TO_MANAGED, 
true);
-        config.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
-        overrides = MemoryTuning.tuneTaskManagerHeapMemory(context, metrics, 
eventHandler).toMap();
-        assertThat(overrides)
+        // Test maximize managed memory
+        config.set(AutoScalerOptions.MEMORY_TUNING_MAXIMIZE_MANAGED_MEMORY, 
true);
+        configChanges = MemoryTuning.tuneTaskManagerHeapMemory(context, 
metrics, eventHandler);
+        assertThat(configChanges.getOverrides())
                 .containsExactlyInAnyOrderEntriesOf(
                         Map.of(
-                                TaskManagerOptions.TASK_HEAP_MEMORY.key(),
-                                "6412251955 bytes",
-                                TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key(),
-                                "0 bytes",
-                                TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
-                                "21236599967 bytes",
+                                
TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(),
+                                "0.689",
+                                
TaskManagerOptions.NETWORK_MEMORY_FRACTION.key(),
+                                "0.1",
+                                TaskManagerOptions.JVM_METASPACE.key(),
+                                "120 mb",
                                 TaskManagerOptions.JVM_OVERHEAD_FRACTION.key(),
                                 "0.033",
-                                
TaskManagerOptions.NETWORK_MEMORY_FRACTION.key(),
-                                "0.14",
+                                TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key(),
+                                "0 bytes",
                                 TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(),
                                 totalMemory.toString()));
 
-        assertThat(eventHandler.events.poll().getMessage())
-                .startsWith(
-                        "Memory tuning recommends the following configuration 
(automatic tuning is enabled):");
+        // Test managed memory is zero
+        metrics = new EvaluatedMetrics(vertexMetrics, new 
HashMap<>(globalMetrics));
+        metrics.getGlobalMetrics()
+                .put(ScalingMetric.MANAGED_MEMORY_USED, 
EvaluatedScalingMetric.avg(0));
+        configChanges = MemoryTuning.tuneTaskManagerHeapMemory(context, 
metrics, eventHandler);
+        assertThat(configChanges.getOverrides())
+                .containsExactlyInAnyOrderEntriesOf(
+                        Map.of(
+                                
TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(),
+                                "0.0",
+                                
TaskManagerOptions.NETWORK_MEMORY_FRACTION.key(),
+                                "0.32",
+                                TaskManagerOptions.JVM_METASPACE.key(),
+                                "120 mb",
+                                TaskManagerOptions.JVM_OVERHEAD_FRACTION.key(),
+                                "0.099",
+                                TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key(),
+                                "0 bytes",
+                                TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(),
+                                "10833048417 bytes"));
 
         // Test tuning disabled
         config.set(AutoScalerOptions.MEMORY_TUNING_ENABLED, false);
-        assertThat(MemoryTuning.tuneTaskManagerHeapMemory(context, metrics, 
eventHandler).toMap())
+        assertThat(
+                        MemoryTuning.tuneTaskManagerHeapMemory(context, 
metrics, eventHandler)
+                                .getOverrides())
                 .isEmpty();
 
         assertThat(eventHandler.events.poll().getMessage())
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java
index 8eb6ebdc..16455bf1 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java
@@ -18,6 +18,7 @@
 package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.autoscaler.realizer.ScalingRealizer;
+import org.apache.flink.autoscaler.tuning.ConfigChanges;
 import org.apache.flink.autoscaler.tuning.MemoryTuning;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
@@ -55,17 +56,22 @@ public class KubernetesScalingRealizer
 
     @Override
     public void realizeConfigOverrides(
-            KubernetesJobAutoScalerContext context, Configuration 
configOverrides) {
+            KubernetesJobAutoScalerContext context, ConfigChanges 
configChanges) {
         if (!(context.getResource() instanceof FlinkDeployment)) {
             // We can't adjust the configuration of non-job deployments.
             return;
         }
         FlinkDeployment flinkDeployment = ((FlinkDeployment) 
context.getResource());
         // Apply config overrides
-        
flinkDeployment.getSpec().getFlinkConfiguration().putAll(configOverrides.toMap());
+        Map<String, String> flinkConf = 
flinkDeployment.getSpec().getFlinkConfiguration();
+        for (String keyToRemove : configChanges.getRemovals()) {
+            flinkConf.remove(keyToRemove);
+        }
+        flinkConf.putAll(configChanges.getOverrides());
 
         // Update total memory in spec
-        var totalMemoryOverride = MemoryTuning.getTotalMemory(configOverrides, 
context);
+        var totalMemoryOverride =
+                MemoryTuning.getTotalMemory(Configuration.fromMap(flinkConf), 
context);
         if (totalMemoryOverride.compareTo(MemorySize.ZERO) <= 0) {
             LOG.warn("Total memory override {} is not valid", 
totalMemoryOverride);
             return;
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
index 53ba7eb4..fe812bef 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
@@ -22,8 +22,8 @@ import org.apache.flink.autoscaler.ScalingSummary;
 import org.apache.flink.autoscaler.ScalingTracking;
 import org.apache.flink.autoscaler.metrics.CollectedMetrics;
 import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.autoscaler.tuning.ConfigChanges;
 import org.apache.flink.autoscaler.utils.AutoScalerSerDeModule;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import 
org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -43,6 +43,8 @@ import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -195,22 +197,22 @@ public class KubernetesAutoScalerStateStore
 
     @NotNull
     @Override
-    public Configuration getConfigOverrides(KubernetesJobAutoScalerContext 
jobContext) {
+    public ConfigChanges getConfigChanges(KubernetesJobAutoScalerContext 
jobContext) {
         return configMapStore
                 .getSerializedState(jobContext, CONFIG_OVERRIDES_KEY)
                 
.map(KubernetesAutoScalerStateStore::deserializeConfigOverrides)
-                .orElse(new Configuration());
+                .orElse(new ConfigChanges());
     }
 
     @Override
-    public void storeConfigOverrides(
-            KubernetesJobAutoScalerContext jobContext, Configuration 
overrides) {
+    public void storeConfigChanges(
+            KubernetesJobAutoScalerContext jobContext, ConfigChanges 
overrides) {
         configMapStore.putSerializedState(
                 jobContext, CONFIG_OVERRIDES_KEY, 
serializeConfigOverrides(overrides));
     }
 
     @Override
-    public void removeConfigOverrides(KubernetesJobAutoScalerContext 
jobContext) {
+    public void removeConfigChanges(KubernetesJobAutoScalerContext jobContext) 
{
         configMapStore.removeSerializedState(jobContext, CONFIG_OVERRIDES_KEY);
     }
 
@@ -276,12 +278,24 @@ public class KubernetesAutoScalerStateStore
         return ConfigurationUtils.convertValue(overrides, Map.class);
     }
 
-    private static String serializeConfigOverrides(Configuration overrides) {
-        return ConfigurationUtils.convertValue(overrides.toMap(), 
String.class);
+    @Nullable
+    private static String serializeConfigOverrides(ConfigChanges 
configChanges) {
+        try {
+            return YAML_MAPPER.writeValueAsString(configChanges);
+        } catch (Exception e) {
+            LOG.error("Failed to serialize ConfigOverrides", e);
+            return null;
+        }
     }
 
-    private static Configuration deserializeConfigOverrides(String overrides) {
-        return 
Configuration.fromMap(ConfigurationUtils.convertValue(overrides, Map.class));
+    @Nullable
+    private static ConfigChanges deserializeConfigOverrides(String 
configOverrides) {
+        try {
+            return YAML_MAPPER.readValue(configOverrides, new 
TypeReference<>() {});
+        } catch (Exception e) {
+            LOG.error("Failed to deserialize ConfigOverrides", e);
+            return null;
+        }
     }
 
     @VisibleForTesting
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java
index 9a448304..994732ab 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.kubernetes.operator.autoscaler;
 
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.autoscaler.tuning.ConfigChanges;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -68,9 +68,9 @@ public class KubernetesScalingRealizerTest {
         KubernetesJobAutoScalerContext ctx =
                 TestingKubernetesAutoscalerUtils.createContext("test", null);
 
-        var overrides = new Configuration();
+        ConfigChanges overrides = new ConfigChanges();
         MemorySize memoryOverride = MemorySize.ofMebiBytes(4096);
-        overrides.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, memoryOverride);
+        overrides.addOverride(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
memoryOverride);
         new KubernetesScalingRealizer().realizeConfigOverrides(ctx, overrides);
 
         assertThat(ctx.getResource()).isInstanceOf(FlinkDeployment.class);


Reply via email to