This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 48692b72  [FLINK-34152] Tune heap memory of autoscaled jobs (#762)
48692b72 is described below

commit 48692b7243086a2ccf2cb4c64a6b00306fa1d65f
Author: Maximilian Michels <[email protected]>
AuthorDate: Thu Feb 15 12:46:06 2024 +0100

     [FLINK-34152] Tune heap memory of autoscaled jobs (#762)
---
 .../generated/auto_scaler_configuration.html       |  30 +++
 .../jdbc/state/JdbcAutoScalerStateStore.java       |  32 +++
 .../flink/autoscaler/jdbc/state/StateType.java     |   3 +-
 .../flinkcluster/FlinkClusterJobListFetcher.java   |   3 -
 .../realizer/RescaleApiScalingRealizer.java        |  12 +-
 .../StandaloneAutoscalerExecutorTest.java          |  10 +-
 .../realizer/RescaleApiScalingRealizerTest.java    |  18 +-
 .../flink/autoscaler/JobAutoScalerContext.java     |  24 +-
 .../apache/flink/autoscaler/JobAutoScalerImpl.java |  22 +-
 .../flink/autoscaler/RestApiMetricsCollector.java  |   6 +-
 .../apache/flink/autoscaler/ScalingExecutor.java   |  21 +-
 .../flink/autoscaler/ScalingMetricCollector.java   |   2 +-
 .../flink/autoscaler/ScalingMetricEvaluator.java   |  14 +-
 .../flink/autoscaler/config/AutoScalerOptions.java |  43 ++++
 .../autoscaler/metrics/EvaluatedScalingMetric.java |   8 +-
 .../flink/autoscaler/metrics/ScalingMetric.java    |   5 +-
 .../flink/autoscaler/metrics/ScalingMetrics.java   |  19 +-
 .../flink/autoscaler/realizer/ScalingRealizer.java |   6 +-
 .../autoscaler/state/AutoScalerStateStore.java     |   8 +
 .../state/InMemoryAutoScalerStateStore.java        |  25 ++
 .../flink/autoscaler/tuning/MemoryTuning.java      | 251 +++++++++++++++++++++
 .../flink/autoscaler/JobAutoScalerImplTest.java    |  57 ++++-
 .../flink/autoscaler/JobVertexScalerTest.java      |   5 +-
 .../autoscaler/RecommendedParallelismTest.java     |   5 -
 .../autoscaler/RestApiMetricsCollectorTest.java    |   5 -
 .../flink/autoscaler/ScalingExecutorTest.java      |  58 ++++-
 .../autoscaler/ScalingMetricEvaluatorTest.java     |  24 +-
 .../flink/autoscaler/TestingAutoscalerUtils.java   |  13 +-
 .../autoscaler/metrics/ScalingMetricsTest.java     |  36 ++-
 .../realizer/TestingScalingRealizer.java           |  34 ++-
 .../state/AbstractAutoScalerStateStoreTest.java    |   6 +
 .../flink/autoscaler/utils/MemoryTuningTest.java   | 139 ++++++++++++
 .../autoscaler/KubernetesJobAutoScalerContext.java |  12 +-
 .../autoscaler/KubernetesScalingRealizer.java      |  40 +++-
 .../state/KubernetesAutoScalerStateStore.java      |  34 +++
 .../operator/config/FlinkConfigBuilder.java        |   1 -
 .../operator/controller/FlinkResourceContext.java  |   2 +-
 .../KubernetesJobAutoScalerContextTest.java        |   7 +-
 .../autoscaler/KubernetesScalingRealizerTest.java  |  28 ++-
 39 files changed, 951 insertions(+), 117 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html 
b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
index aa5ff525..ee6fb731 100644
--- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
+++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
@@ -62,6 +62,36 @@
             <td>Double</td>
             <td>Max allowed percentage of heap usage during scaling 
operations. Autoscaling will be paused if the heap usage exceeds this 
threshold.</td>
         </tr>
+        <tr>
+            <td><h5>job.autoscaler.memory.tuning.enabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>If enabled, the initial amount of memory specified for 
TaskManagers will be reduced according to the observed needs.</td>
+        </tr>
+        <tr>
+            <td><h5>job.autoscaler.memory.tuning.heap.min</h5></td>
+            <td style="word-wrap: break-word;">512 mb</td>
+            <td>MemorySize</td>
+            <td>The minimum amount of TaskManager heap memory, if memory 
tuning is enabled.</td>
+        </tr>
+        <tr>
+            <td><h5>job.autoscaler.memory.tuning.heap.overhead</h5></td>
+            <td style="word-wrap: break-word;">0.2</td>
+            <td>Double</td>
+            <td>Overhead to add to heap tuning decisions (0-1). This ensures 
spare capacity and allows the memory to grow beyond the dynamically computed 
limits, but never beyond the original memory limits.</td>
+        </tr>
+        <tr>
+            <td><h5>job.autoscaler.memory.tuning.heap.target-usage</h5></td>
+            <td style="word-wrap: break-word;">MAX</td>
+            <td><p>Enum</p></td>
+            <td>The heap size to target. Average usage (AVG) will yield better 
savings. Max usage will yield more conservative savings.<br /><br />Possible 
values:<ul><li>"AVG"</li><li>"MAX"</li></ul></td>
+        </tr>
+        <tr>
+            
<td><h5>job.autoscaler.memory.tuning.heap.transfer-to-managed</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>If enabled, any reduction of heap memory will increase the 
managed memory used by RocksDB.</td>
+        </tr>
         <tr>
             <td><h5>job.autoscaler.metrics.busy-time.aggregator</h5></td>
             <td style="word-wrap: break-word;">MAX</td>
diff --git 
a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java
 
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java
index 7b459ef4..3f2af617 100644
--- 
a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java
+++ 
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java
@@ -25,6 +25,7 @@ import org.apache.flink.autoscaler.ScalingTracking;
 import org.apache.flink.autoscaler.metrics.CollectedMetrics;
 import org.apache.flink.autoscaler.state.AutoScalerStateStore;
 import org.apache.flink.autoscaler.utils.AutoScalerSerDeModule;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
@@ -46,6 +47,7 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 
 import static 
org.apache.flink.autoscaler.jdbc.state.StateType.COLLECTED_METRICS;
+import static 
org.apache.flink.autoscaler.jdbc.state.StateType.CONFIG_OVERRIDES;
 import static 
org.apache.flink.autoscaler.jdbc.state.StateType.PARALLELISM_OVERRIDES;
 import static org.apache.flink.autoscaler.jdbc.state.StateType.SCALING_HISTORY;
 import static 
org.apache.flink.autoscaler.jdbc.state.StateType.SCALING_TRACKING;
@@ -189,6 +191,28 @@ public class JdbcAutoScalerStateStore<KEY, Context extends 
JobAutoScalerContext<
         jdbcStateStore.removeSerializedState(getSerializeKey(jobContext), 
PARALLELISM_OVERRIDES);
     }
 
+    @Override
+    public void storeConfigOverrides(Context jobContext, Configuration 
configOverrides) {
+        jdbcStateStore.putSerializedState(
+                getSerializeKey(jobContext),
+                CONFIG_OVERRIDES,
+                serializeConfigOverrides(configOverrides));
+    }
+
+    @Nonnull
+    @Override
+    public Configuration getConfigOverrides(Context jobContext) {
+        return jdbcStateStore
+                .getSerializedState(getSerializeKey(jobContext), 
CONFIG_OVERRIDES)
+                .map(JdbcAutoScalerStateStore::deserializeConfigOverrides)
+                .orElse(new Configuration());
+    }
+
+    @Override
+    public void removeConfigOverrides(Context jobContext) {
+        jdbcStateStore.removeSerializedState(getSerializeKey(jobContext), 
CONFIG_OVERRIDES);
+    }
+
     @Override
     public void clearAll(Context jobContext) {
         jdbcStateStore.clearAll(getSerializeKey(jobContext));
@@ -251,4 +275,12 @@ public class JdbcAutoScalerStateStore<KEY, Context extends 
JobAutoScalerContext<
     private static Map<String, String> deserializeParallelismOverrides(String 
overrides) {
         return ConfigurationUtils.convertValue(overrides, Map.class);
     }
+
+    private static String serializeConfigOverrides(Configuration overrides) {
+        return ConfigurationUtils.convertValue(overrides.toMap(), 
String.class);
+    }
+
+    private static Configuration deserializeConfigOverrides(String overrides) {
+        return 
Configuration.fromMap(ConfigurationUtils.convertValue(overrides, Map.class));
+    }
 }
diff --git 
a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/StateType.java
 
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/StateType.java
index 728afaa2..ca2864f4 100644
--- 
a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/StateType.java
+++ 
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/StateType.java
@@ -26,7 +26,8 @@ public enum StateType {
     SCALING_HISTORY("scalingHistory"),
     SCALING_TRACKING("scalingTracking"),
     COLLECTED_METRICS("collectedMetrics"),
-    PARALLELISM_OVERRIDES("parallelismOverrides");
+    PARALLELISM_OVERRIDES("parallelismOverrides"),
+    CONFIG_OVERRIDES("configOverrides");
 
     /**
      * The identifier of each state type, it will be used to store. Please 
ensure the identifier is
diff --git 
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcher.java
 
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcher.java
index 63d79cd4..e99424c5 100644
--- 
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcher.java
+++ 
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcher.java
@@ -22,7 +22,6 @@ import org.apache.flink.autoscaler.JobAutoScalerContext;
 import org.apache.flink.autoscaler.standalone.JobListFetcher;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
@@ -83,8 +82,6 @@ public class FlinkClusterJobListFetcher
                 jobStatusMessage.getJobState(),
                 conf,
                 new UnregisteredMetricsGroup(),
-                0,
-                MemorySize.ZERO,
                 () -> restClientGetter.apply(conf));
     }
 
diff --git 
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizer.java
 
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizer.java
index d04e6d3e..8cba81d5 100644
--- 
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizer.java
+++ 
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizer.java
@@ -64,7 +64,8 @@ public class RescaleApiScalingRealizer<KEY, Context extends 
JobAutoScalerContext
     }
 
     @Override
-    public void realize(Context context, Map<String, String> 
parallelismOverrides) {
+    public void realizeParallelismOverrides(
+            Context context, Map<String, String> parallelismOverrides) {
         Configuration conf = context.getConfiguration();
         if (!conf.get(JobManagerOptions.SCHEDULER)
                 .equals(JobManagerOptions.SchedulerType.Adaptive)) {
@@ -124,6 +125,15 @@ public class RescaleApiScalingRealizer<KEY, Context 
extends JobAutoScalerContext
         }
     }
 
+    @Override
+    public void realizeConfigOverrides(Context context, Configuration 
configOverrides) {
+        // Not currently supported
+        LOG.warn(
+                "{} does not support updating the TaskManager configuration 
({})",
+                getClass().getSimpleName(),
+                configOverrides);
+    }
+
     private Map<JobVertexID, JobVertexResourceRequirements> getVertexResources(
             RestClusterClient<String> client, JobID jobID, Duration 
restClientTimeout)
             throws Exception {
diff --git 
a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java
 
b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java
index d2e50092..ca905071 100644
--- 
a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java
+++ 
b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.autoscaler.JobAutoScaler;
 import org.apache.flink.autoscaler.JobAutoScalerContext;
 import org.apache.flink.autoscaler.event.TestingEventCollector;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MemorySize;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -231,13 +230,6 @@ class StandaloneAutoscalerExecutorTest {
     private static JobAutoScalerContext<JobID> createJobAutoScalerContext() {
         var jobID = new JobID();
         return new JobAutoScalerContext<>(
-                jobID,
-                jobID,
-                JobStatus.RUNNING,
-                new Configuration(),
-                null,
-                0,
-                MemorySize.ZERO,
-                null);
+                jobID, jobID, JobStatus.RUNNING, new Configuration(), null, 
null);
     }
 }
diff --git 
a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizerTest.java
 
b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizerTest.java
index 6b209eb0..b16cbdac 100644
--- 
a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizerTest.java
+++ 
b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizerTest.java
@@ -24,7 +24,6 @@ import 
org.apache.flink.autoscaler.event.TestingEventCollector;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.configuration.MemorySize;
 import 
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
 import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -92,7 +91,7 @@ class RescaleApiScalingRealizerTest {
 
         assertThat(updatedRequirements).isNotDone();
         assertThat(closeFuture).isNotDone();
-        scalingRealizer.realize(jobContext, newResourceRequirements);
+        scalingRealizer.realizeParallelismOverrides(jobContext, 
newResourceRequirements);
 
         // The ResourceRequirements should be updated when the 
newResourceRequirements is changed.
         if (resourceIsChanged) {
@@ -128,7 +127,7 @@ class RescaleApiScalingRealizerTest {
         RescaleApiScalingRealizer<JobID, JobAutoScalerContext<JobID>> 
scalingRealizer =
                 new RescaleApiScalingRealizer<>(eventCollector);
 
-        scalingRealizer.realize(jobContext, resourceRequirements);
+        scalingRealizer.realizeParallelismOverrides(jobContext, 
resourceRequirements);
         assertThat(eventCollector.events).isEmpty();
     }
 
@@ -149,8 +148,6 @@ class RescaleApiScalingRealizerTest {
                         JobStatus.CANCELLING,
                         conf,
                         null,
-                        0,
-                        MemorySize.ZERO,
                         () ->
                                 fail(
                                         "The rest client shouldn't be created 
if the job isn't running."));
@@ -159,7 +156,7 @@ class RescaleApiScalingRealizerTest {
         RescaleApiScalingRealizer<JobID, JobAutoScalerContext<JobID>> 
scalingRealizer =
                 new RescaleApiScalingRealizer<>(eventCollector);
 
-        scalingRealizer.realize(jobContext, resourceRequirements);
+        scalingRealizer.realizeParallelismOverrides(jobContext, 
resourceRequirements);
         assertThat(eventCollector.events).isEmpty();
     }
 
@@ -167,14 +164,7 @@ class RescaleApiScalingRealizerTest {
             JobID jobID,
             SupplierWithException<RestClusterClient<String>, Exception> 
restClientSupplier) {
         return new JobAutoScalerContext<>(
-                jobID,
-                jobID,
-                JobStatus.RUNNING,
-                new Configuration(),
-                null,
-                0,
-                MemorySize.ZERO,
-                restClientSupplier);
+                jobID, jobID, JobStatus.RUNNING, new Configuration(), null, 
restClientSupplier);
     }
 
     private JobResourceRequirements createResourceRequirements(
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerContext.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerContext.java
index 026e0fb3..9c5e614c 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerContext.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerContext.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.util.function.SupplierWithException;
 
@@ -33,6 +34,8 @@ import lombok.ToString;
 
 import javax.annotation.Nullable;
 
+import java.util.Optional;
+
 /**
  * The job autoscaler context, it includes all details related to the current 
job.
  *
@@ -52,19 +55,28 @@ public class JobAutoScalerContext<KEY> {
 
     @Nullable @Getter private final JobStatus jobStatus;
 
+    /**
+     * The configuration based on the latest user-provided spec. This is not 
the already deployed /
+     * observed configuration.
+     */
     @Getter private final Configuration configuration;
 
     @Getter private final MetricGroup metricGroup;
 
-    /** Task manager CPU as a fraction (if available). */
-    @Getter private final double taskManagerCpu;
-
-    /** Task manager memory size (if available). */
-    @Getter @Nullable private final MemorySize taskManagerMemory;
-
     @ToString.Exclude
     private final SupplierWithException<RestClusterClient<String>, Exception> 
restClientSupplier;
 
+    /** Retrieve the currently configured TaskManager CPU. */
+    public Optional<Double> getTaskManagerCpu() {
+        // Not supported by default
+        return Optional.empty();
+    }
+
+    /** Retrieve the currently configured TaskManager memory. */
+    public Optional<MemorySize> getTaskManagerMemory() {
+        return 
Optional.ofNullable(getConfiguration().get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+    }
+
     public RestClusterClient<String> getRestClusterClient() throws Exception {
         return restClientSupplier.get();
     }
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
index b3839ff4..91b18248 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics;
 import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
 import org.apache.flink.autoscaler.realizer.ScalingRealizer;
 import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.util.Preconditions;
 
@@ -106,7 +107,13 @@ public class JobAutoScalerImpl<KEY, Context extends 
JobAutoScalerContext<KEY>>
         } catch (Throwable e) {
             onError(ctx, autoscalerMetrics, e);
         } finally {
-            applyParallelismOverrides(ctx);
+            try {
+                applyParallelismOverrides(ctx);
+                applyConfigOverrides(ctx);
+            } catch (Exception e) {
+                LOG.error("Error applying overrides.", e);
+                onError(ctx, autoscalerMetrics, e);
+            }
         }
     }
 
@@ -151,7 +158,18 @@ public class JobAutoScalerImpl<KEY, Context extends 
JobAutoScalerContext<KEY>>
                         userOverrides.put(k, v);
                     }
                 });
-        scalingRealizer.realize(ctx, userOverrides);
+        scalingRealizer.realizeParallelismOverrides(ctx, userOverrides);
+    }
+
+    @VisibleForTesting
+    void applyConfigOverrides(Context ctx) throws Exception {
+        if 
(!ctx.getConfiguration().get(AutoScalerOptions.MEMORY_TUNING_ENABLED)) {
+            return;
+        }
+
+        Configuration configOverrides = stateStore.getConfigOverrides(ctx);
+        LOG.info("Applying config overrides: {}", configOverrides);
+        scalingRealizer.realizeConfigOverrides(ctx, configOverrides);
     }
 
     private void runScalingLogic(Context ctx, AutoscalerFlinkMetrics 
autoscalerMetrics)
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java
index 572bec9b..a5b2cf02 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java
@@ -187,7 +187,11 @@ public class RestApiMetricsCollector<KEY, Context extends 
JobAutoScalerContext<K
 
         MetricsAggregationParameter aggregationParameter =
                 (MetricsAggregationParameter) queryParamIt.next();
-        
aggregationParameter.resolve(List.of(MetricsAggregationParameter.AggregationMode.MAX));
+        aggregationParameter.resolve(
+                List.of(
+                        MetricsAggregationParameter.AggregationMode.MIN,
+                        MetricsAggregationParameter.AggregationMode.MAX,
+                        MetricsAggregationParameter.AggregationMode.AVG));
 
         var responseBody =
                 restClient
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
index 094d4680..fdbb3c17 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
@@ -26,6 +26,7 @@ import org.apache.flink.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.autoscaler.resources.NoopResourceCheck;
 import org.apache.flink.autoscaler.resources.ResourceCheck;
 import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.autoscaler.tuning.MemoryTuning;
 import org.apache.flink.autoscaler.utils.CalendarUtils;
 import org.apache.flink.autoscaler.utils.ResourceCheckUtils;
 import org.apache.flink.configuration.Configuration;
@@ -117,7 +118,12 @@ public class ScalingExecutor<KEY, Context extends 
JobAutoScalerContext<KEY>> {
             return false;
         }
 
-        if (scalingWouldExceedClusterResources(evaluatedMetrics, 
scalingSummaries, context)) {
+        var tuningConfig =
+                MemoryTuning.tuneTaskManagerHeapMemory(
+                        context, evaluatedMetrics, autoScalerEventHandler);
+
+        if (scalingWouldExceedClusterResources(
+                tuningConfig, evaluatedMetrics, scalingSummaries, context)) {
             return false;
         }
 
@@ -132,6 +138,8 @@ public class ScalingExecutor<KEY, Context extends 
JobAutoScalerContext<KEY>> {
                 getVertexParallelismOverrides(
                         evaluatedMetrics.getVertexMetrics(), 
scalingSummaries));
 
+        autoScalerStateStore.storeConfigOverrides(context, tuningConfig);
+
         return true;
     }
 
@@ -243,7 +251,7 @@ public class ScalingExecutor<KEY, Context extends 
JobAutoScalerContext<KEY>> {
             return true;
         }
 
-        var heapUsage = 
evaluatedMetrics.get(ScalingMetric.HEAP_USAGE).getAverage();
+        var heapUsage = 
evaluatedMetrics.get(ScalingMetric.HEAP_MAX_USAGE_RATIO).getAverage();
         if (heapUsage > conf.get(AutoScalerOptions.HEAP_USAGE_THRESHOLD)) {
             autoScalerEventHandler.handleEvent(
                     ctx,
@@ -259,16 +267,15 @@ public class ScalingExecutor<KEY, Context extends 
JobAutoScalerContext<KEY>> {
     }
 
     private boolean scalingWouldExceedClusterResources(
+            Configuration tuningConfig,
             EvaluatedMetrics evaluatedMetrics,
             Map<JobVertexID, ScalingSummary> scalingSummaries,
             JobAutoScalerContext<?> ctx) {
 
-        final double taskManagerCpu = ctx.getTaskManagerCpu();
-        final MemorySize taskManagerMemory = ctx.getTaskManagerMemory();
+        final double taskManagerCpu = ctx.getTaskManagerCpu().orElse(0.);
+        final MemorySize taskManagerMemory = 
MemoryTuning.getTotalMemory(tuningConfig, ctx);
 
-        if (taskManagerCpu <= 0
-                || taskManagerMemory == null
-                || taskManagerMemory.compareTo(MemorySize.ZERO) <= 0) {
+        if (taskManagerCpu <= 0 || 
taskManagerMemory.compareTo(MemorySize.ZERO) <= 0) {
             // We can't extract the requirements, we can't make any assumptions
             return false;
         }
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
index ed5a2e47..c1926541 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
@@ -324,7 +324,7 @@ public abstract class ScalingMetricCollector<KEY, Context 
extends JobAutoScalerC
         LOG.debug("Output ratios: {}", outputRatios);
 
         var globalMetrics =
-                ScalingMetrics.computeGlobalMetrics(collectedJmMetrics, 
collectedTmMetrics);
+                ScalingMetrics.computeGlobalMetrics(collectedJmMetrics, 
collectedTmMetrics, conf);
         LOG.debug("Global metrics: {}", globalMetrics);
 
         return new CollectedMetrics(out, outputRatios, globalMetrics);
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
index 08700235..6b156cc8 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
@@ -50,7 +50,8 @@ import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZ
 import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
 import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE;
 import static org.apache.flink.autoscaler.metrics.ScalingMetric.GC_PRESSURE;
-import static org.apache.flink.autoscaler.metrics.ScalingMetric.HEAP_USAGE;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.HEAP_MAX_USAGE_RATIO;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.HEAP_USED;
 import static org.apache.flink.autoscaler.metrics.ScalingMetric.LAG;
 import static org.apache.flink.autoscaler.metrics.ScalingMetric.LOAD;
 import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
@@ -314,13 +315,18 @@ public class ScalingMetricEvaluator {
         var out = new HashMap<ScalingMetric, EvaluatedScalingMetric>();
 
         var gcPressure = latest.getOrDefault(GC_PRESSURE, Double.NaN);
-        var lastHeapUsage = latest.getOrDefault(HEAP_USAGE, Double.NaN);
+        var lastHeapUsage = latest.getOrDefault(HEAP_MAX_USAGE_RATIO, 
Double.NaN);
 
         out.put(GC_PRESSURE, EvaluatedScalingMetric.of(gcPressure));
         out.put(
-                HEAP_USAGE,
+                HEAP_MAX_USAGE_RATIO,
                 new EvaluatedScalingMetric(
-                        lastHeapUsage, getAverageGlobalMetric(HEAP_USAGE, 
metricHistory)));
+                        lastHeapUsage,
+                        getAverageGlobalMetric(HEAP_MAX_USAGE_RATIO, 
metricHistory)));
+
+        var latestObservation = latest.getOrDefault(HEAP_USED, Double.NaN);
+        double heapSizeAverage = getAverageGlobalMetric(HEAP_USED, 
metricHistory);
+        out.put(HEAP_USED, new EvaluatedScalingMetric(latestObservation, 
heapSizeAverage));
 
         out.put(
                 NUM_TASK_SLOTS_USED,
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
index 269dddd2..58bdfce0 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
@@ -18,8 +18,10 @@
 package org.apache.flink.autoscaler.config;
 
 import org.apache.flink.autoscaler.metrics.MetricAggregator;
+import org.apache.flink.autoscaler.tuning.MemoryTuning;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.MemorySize;
 
 import java.time.Duration;
 import java.util.List;
@@ -250,6 +252,47 @@ public class AutoScalerOptions {
                     .withDescription(
                             "Max allowed percentage of heap usage during 
scaling operations. Autoscaling will be paused if the heap usage exceeds this 
threshold.");
 
+    public static final ConfigOption<Boolean> MEMORY_TUNING_ENABLED =
+            autoScalerConfig("memory.tuning.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    
.withFallbackKeys(oldOperatorConfigKey("memory.tuning.enabled"))
+                    .withDescription(
+                            "If enabled, the initial amount of memory 
specified for TaskManagers will be reduced according to the observed needs.");
+
+    public static final ConfigOption<MemoryTuning.HeapUsageTarget> 
MEMORY_TUNING_HEAP_TARGET =
+            autoScalerConfig("memory.tuning.heap.target-usage")
+                    .enumType(MemoryTuning.HeapUsageTarget.class)
+                    .defaultValue(MemoryTuning.HeapUsageTarget.MAX)
+                    
.withFallbackKeys(oldOperatorConfigKey("memory.tuning.heap.target-usage"))
+                    .withDescription(
+                            "The heap size to target. Average usage (AVG) will 
yield better savings. Max usage will yield more conservative savings.");
+
+    public static final ConfigOption<Double> MEMORY_TUNING_HEAP_OVERHEAD =
+            autoScalerConfig("memory.tuning.heap.overhead")
+                    .doubleType()
+                    .defaultValue(0.2)
+                    
.withFallbackKeys(oldOperatorConfigKey("memory.tuning.heap.overhead"))
+                    .withDescription(
+                            "Overhead to add to heap tuning decisions (0-1). 
This ensures spare capacity and allows the memory to grow beyond the 
dynamically computed limits, but never beyond the original memory limits.");
+
+    public static final ConfigOption<MemorySize> MEMORY_TUNING_MIN_HEAP =
+            autoScalerConfig("memory.tuning.heap.min")
+                    .memoryType()
+                    .defaultValue(MemorySize.ofMebiBytes(512L))
+                    
.withFallbackKeys(oldOperatorConfigKey("memory.tuning.heap.min"))
+                    .withDescription(
+                            "The minimum amount of TaskManager heap memory, if 
memory tuning is enabled.");
+
+    public static final ConfigOption<Boolean> 
MEMORY_TUNING_TRANSFER_HEAP_TO_MANAGED =
+            autoScalerConfig("memory.tuning.heap.transfer-to-managed")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withFallbackKeys(
+                            
oldOperatorConfigKey("memory.tuning.heap.transfer-to-managed"))
+                    .withDescription(
+                            "If enabled, any reduction of heap memory will 
increase the managed memory used by RocksDB.");
+
     public static final ConfigOption<Integer> VERTEX_SCALING_HISTORY_COUNT =
             autoScalerConfig("history.max.count")
                     .intType()
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/EvaluatedScalingMetric.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/EvaluatedScalingMetric.java
index c60adf18..cc013dd7 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/EvaluatedScalingMetric.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/EvaluatedScalingMetric.java
@@ -33,7 +33,11 @@ public class EvaluatedScalingMetric {
         this.average = ScalingMetrics.roundMetric(average);
     }
 
-    public static EvaluatedScalingMetric of(double value) {
-        return new EvaluatedScalingMetric(value, Double.NaN);
+    public static EvaluatedScalingMetric of(double current) {
+        return new EvaluatedScalingMetric(current, Double.NaN);
+    }
+
+    public static EvaluatedScalingMetric avg(double average) {
+        return new EvaluatedScalingMetric(Double.NaN, average);
     }
 }
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
index 1ab1bd3d..dfc8276b 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
@@ -77,8 +77,11 @@ public enum ScalingMetric {
      */
     GC_PRESSURE(false),
 
+    /** Measured used heap size in bytes. */
+    HEAP_USED(true),
+
     /** Percentage of max heap used (between 0 and 1). */
-    HEAP_USAGE(true),
+    HEAP_MAX_USAGE_RATIO(true),
 
     NUM_TASK_SLOTS_USED(false);
 
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
index 516e0046..c39427d4 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
@@ -19,6 +19,7 @@ package org.apache.flink.autoscaler.metrics;
 
 import org.apache.flink.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.tuning.MemoryTuning;
 import org.apache.flink.autoscaler.utils.AutoScalerUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -163,7 +164,8 @@ public class ScalingMetrics {
 
     public static Map<ScalingMetric, Double> computeGlobalMetrics(
             Map<FlinkMetric, Metric> collectedJmMetrics,
-            Map<FlinkMetric, AggregatedMetric> collectedTmMetrics) {
+            Map<FlinkMetric, AggregatedMetric> collectedTmMetrics,
+            Configuration conf) {
         if (collectedTmMetrics == null) {
             return null;
         }
@@ -192,7 +194,19 @@ public class ScalingMetrics {
         var heapMax = collectedTmMetrics.get(FlinkMetric.HEAP_MAX);
         var heapUsed = collectedTmMetrics.get(FlinkMetric.HEAP_USED);
         if (heapMax != null && heapUsed != null) {
-            out.put(ScalingMetric.HEAP_USAGE, heapUsed.getMax() / 
heapMax.getMax());
+            MemoryTuning.HeapUsageTarget heapTarget =
+                    conf.get(AutoScalerOptions.MEMORY_TUNING_HEAP_TARGET);
+            switch (heapTarget) {
+                case AVG:
+                    out.put(ScalingMetric.HEAP_USED, heapUsed.getAvg());
+                    break;
+                case MAX:
+                    out.put(ScalingMetric.HEAP_USED, heapUsed.getMax());
+                    break;
+                default:
+                    LOG.warn("Unknown value {} for heap target", heapTarget);
+            }
+            out.put(ScalingMetric.HEAP_MAX_USAGE_RATIO, heapUsed.getMax() / 
heapMax.getMax());
         }
 
         return out;
@@ -209,6 +223,7 @@ public class ScalingMetrics {
         }
     }
 
+    // TODO: FLINK-34213: Consider using accumulated busy time instead of 
busyMsPerSecond
     private static double getBusyTimeMsPerSecond(
             Map<FlinkMetric, AggregatedMetric> flinkMetrics,
             Configuration conf,
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/ScalingRealizer.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/ScalingRealizer.java
index b4895648..aba94157 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/ScalingRealizer.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/ScalingRealizer.java
@@ -19,6 +19,7 @@ package org.apache.flink.autoscaler.realizer;
 
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.configuration.Configuration;
 
 import java.util.Map;
 
@@ -33,5 +34,8 @@ import java.util.Map;
 public interface ScalingRealizer<KEY, Context extends 
JobAutoScalerContext<KEY>> {
 
     /** Update job's parallelism to parallelismOverrides. */
-    void realize(Context context, Map<String, String> parallelismOverrides);
+    void realizeParallelismOverrides(Context context, Map<String, String> 
parallelismOverrides);
+
+    /** Updates the TaskManager memory configuration. */
+    void realizeConfigOverrides(Context context, Configuration 
configOverrides);
 }
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
index 2d2c60de..30be323f 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
@@ -22,6 +22,7 @@ import org.apache.flink.autoscaler.JobAutoScalerContext;
 import org.apache.flink.autoscaler.ScalingSummary;
 import org.apache.flink.autoscaler.ScalingTracking;
 import org.apache.flink.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import javax.annotation.Nonnull;
@@ -69,6 +70,13 @@ public interface AutoScalerStateStore<KEY, Context extends 
JobAutoScalerContext<
 
     void removeParallelismOverrides(Context jobContext) throws Exception;
 
+    void storeConfigOverrides(Context jobContext, Configuration 
configOverrides) throws Exception;
+
+    @Nonnull
+    Configuration getConfigOverrides(Context jobContext) throws Exception;
+
+    void removeConfigOverrides(Context jobContext) throws Exception;
+
     /** Removes all data from this context. Flush stil needs to be called. */
     void clearAll(Context jobContext) throws Exception;
 
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
index 6c5b7a29..60a9342e 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
@@ -21,8 +21,11 @@ import org.apache.flink.autoscaler.JobAutoScalerContext;
 import org.apache.flink.autoscaler.ScalingSummary;
 import org.apache.flink.autoscaler.ScalingTracking;
 import org.apache.flink.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
+import javax.annotation.Nonnull;
+
 import java.time.Instant;
 import java.util.HashMap;
 import java.util.Map;
@@ -47,6 +50,8 @@ public class InMemoryAutoScalerStateStore<KEY, Context 
extends JobAutoScalerCont
 
     private final Map<KEY, Map<String, String>> parallelismOverridesStore;
 
+    private final Map<KEY, Configuration> tmConfigOverrides;
+
     private final Map<KEY, ScalingTracking> scalingTrackingStore;
 
     public InMemoryAutoScalerStateStore() {
@@ -54,6 +59,7 @@ public class InMemoryAutoScalerStateStore<KEY, Context 
extends JobAutoScalerCont
         collectedMetricsStore = new ConcurrentHashMap<>();
         parallelismOverridesStore = new ConcurrentHashMap<>();
         scalingTrackingStore = new ConcurrentHashMap<>();
+        tmConfigOverrides = new ConcurrentHashMap<>();
     }
 
     @Override
@@ -115,6 +121,24 @@ public class InMemoryAutoScalerStateStore<KEY, Context 
extends JobAutoScalerCont
                 .orElse(new HashMap<>());
     }
 
+    @Override
+    public void storeConfigOverrides(Context jobContext, Configuration 
configOverrides)
+            throws Exception {
+        tmConfigOverrides.put(jobContext.getJobKey(), configOverrides);
+    }
+
+    @Nonnull
+    @Override
+    public Configuration getConfigOverrides(Context jobContext) {
+        return 
Optional.ofNullable(tmConfigOverrides.get(jobContext.getJobKey()))
+                .orElse(new Configuration());
+    }
+
+    @Override
+    public void removeConfigOverrides(Context jobContext) {
+        tmConfigOverrides.remove(jobContext.getJobKey());
+    }
+
     @Override
     public void removeParallelismOverrides(Context jobContext) {
         parallelismOverridesStore.remove(jobContext.getJobKey());
@@ -125,6 +149,7 @@ public class InMemoryAutoScalerStateStore<KEY, Context 
extends JobAutoScalerCont
         scalingHistoryStore.remove(jobContext.getJobKey());
         parallelismOverridesStore.remove(jobContext.getJobKey());
         collectedMetricsStore.remove(jobContext.getJobKey());
+        tmConfigOverrides.remove(jobContext.getJobKey());
     }
 
     @Override
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java
new file mode 100644
index 00000000..b4cd00e5
--- /dev/null
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.tuning;
+
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.runtime.util.config.memory.CommonProcessMemorySpec;
+import 
org.apache.flink.runtime.util.config.memory.JvmMetaspaceAndOverheadOptions;
+import org.apache.flink.runtime.util.config.memory.ProcessMemoryOptions;
+import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils;
+import 
org.apache.flink.runtime.util.config.memory.taskmanager.TaskExecutorFlinkMemory;
+import 
org.apache.flink.runtime.util.config.memory.taskmanager.TaskExecutorFlinkMemoryUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Map;
+
+/** Tunes the TaskManager memory. */
+public class MemoryTuning {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MemoryTuning.class);
+    public static final ProcessMemoryUtils<TaskExecutorFlinkMemory> 
FLINK_MEMORY_UTILS =
+            new ProcessMemoryUtils<>(getMemoryOptions(), new 
TaskExecutorFlinkMemoryUtils());
+
+    private static final Configuration EMPTY_CONFIG = new Configuration();
+
+    /** What memory usage to target. */
+    public enum HeapUsageTarget {
+        AVG,
+        MAX
+    }
+
+    /**
+     * Emits a Configuration which contains overrides for the current 
configuration. We are not
+     * modifying the config directly, but we are emitting a new configuration 
which contains any
+     * overrides. This config is persisted separately and applied by the 
autoscaler. That way we can
+     * clear any applied overrides if auto-tuning is disabled.
+     */
+    public static Configuration tuneTaskManagerHeapMemory(
+            JobAutoScalerContext<?> context,
+            EvaluatedMetrics evaluatedMetrics,
+            AutoScalerEventHandler eventHandler) {
+
+        // Please note that this config is the original configuration created 
from the user spec.
+        // It does not contain any already applied overrides.
+        var config = new UnmodifiableConfiguration(context.getConfiguration());
+
+        // Gather original memory configuration from the user spec
+        CommonProcessMemorySpec<TaskExecutorFlinkMemory> memSpecs;
+        try {
+            memSpecs = FLINK_MEMORY_UTILS.memoryProcessSpecFromConfig(config);
+        } catch (IllegalConfigurationException e) {
+            LOG.warn("Current memory configuration is not valid. Aborting 
memory tuning.");
+            return EMPTY_CONFIG;
+        }
+
+        var maxHeapSize = memSpecs.getFlinkMemory().getJvmHeapMemorySize();
+        LOG.debug("Current configured heap size: {}", maxHeapSize);
+
+        MemorySize newHeapSize = determineNewHeapSize(evaluatedMetrics, 
config, maxHeapSize);
+        LOG.info("New TM heap memory {}", newHeapSize.toHumanReadableString());
+
+        // Diff can be negative (memory shrinks) or positive (memory grows)
+        final long heapDiffBytes = newHeapSize.getBytes() - 
maxHeapSize.getBytes();
+
+        final MemorySize totalMemory = adjustTotalTmMemory(context, 
heapDiffBytes);
+        if (totalMemory.equals(MemorySize.ZERO)) {
+            return EMPTY_CONFIG;
+        }
+
+        // Prepare the tuning config for new configuration values
+        var tuningConfig = new Configuration();
+        // Update total memory according to new heap size
+        // Adjust the total container memory and the JVM heap size accordingly.
+        tuningConfig.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, totalMemory);
+        // Framework and Task heap memory configs add up together yield the 
max heap memory.
+        // To simplify the calculation, set the framework heap memory to zero.
+        tuningConfig.set(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY, 
MemorySize.ZERO);
+        tuningConfig.set(TaskManagerOptions.TASK_HEAP_MEMORY, newHeapSize);
+
+        // All memory options which can be configured via fractions need to be 
set to their
+        // absolute values or, if there is no absolute setting, the fractions 
need to be
+        // re-calculated.
+        MemorySize managedMemory = memSpecs.getFlinkMemory().getManaged();
+        if (shouldTransferHeapToManagedMemory(config, heapDiffBytes)) {
+            // If RocksDB is configured, give back the heap memory as managed 
memory to RocksDB
+            MemorySize newManagedMemory =
+                    new MemorySize(managedMemory.getBytes() + 
Math.abs(heapDiffBytes));
+            LOG.info(
+                    "Increasing managed memory size from {} to {}",
+                    managedMemory,
+                    newManagedMemory);
+            tuningConfig.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
newManagedMemory);
+        } else {
+            tuningConfig.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
managedMemory);
+        }
+
+        tuningConfig.set(
+                TaskManagerOptions.NETWORK_MEMORY_FRACTION,
+                getFraction(
+                        memSpecs.getFlinkMemory().getNetwork(),
+                        new MemorySize(
+                                memSpecs.getTotalFlinkMemorySize().getBytes() 
+ heapDiffBytes)));
+        tuningConfig.set(
+                TaskManagerOptions.JVM_OVERHEAD_FRACTION,
+                getFraction(memSpecs.getJvmOverheadSize(), totalMemory));
+
+        eventHandler.handleEvent(
+                context,
+                AutoScalerEventHandler.Type.Normal,
+                "Configuration recommendation",
+                String.format(
+                        "Memory tuning recommends the following configuration 
(automatic tuning is %s):\n%s",
+                        config.get(AutoScalerOptions.MEMORY_TUNING_ENABLED)
+                                ? "enabled"
+                                : "disabled",
+                        formatConfig(tuningConfig)),
+                "MemoryTuning",
+                config.get(AutoScalerOptions.SCALING_EVENT_INTERVAL));
+
+        if 
(!context.getConfiguration().get(AutoScalerOptions.MEMORY_TUNING_ENABLED)) {
+            return EMPTY_CONFIG;
+        }
+
+        return tuningConfig;
+    }
+
+    private static MemorySize determineNewHeapSize(
+            EvaluatedMetrics evaluatedMetrics, Configuration config, 
MemorySize maxHeapSize) {
+
+        double overheadFactor = 1 + 
config.get(AutoScalerOptions.MEMORY_TUNING_HEAP_OVERHEAD);
+        long heapTargetSizeBytes =
+                (long) (getHeapUsed(evaluatedMetrics).getBytes() * 
overheadFactor);
+
+        // Apply min/max heap size limits
+        heapTargetSizeBytes =
+                Math.min(
+                        Math.max(
+                                // Lower limit is the minimum configured heap 
size
+                                
config.get(AutoScalerOptions.MEMORY_TUNING_MIN_HEAP).getBytes(),
+                                heapTargetSizeBytes),
+                        // Upper limit is the original max heap size in the 
spec
+                        maxHeapSize.getBytes());
+
+        return new MemorySize(heapTargetSizeBytes);
+    }
+
+    private static MemorySize getHeapUsed(EvaluatedMetrics evaluatedMetrics) {
+        var globalMetrics = evaluatedMetrics.getGlobalMetrics();
+        MemorySize heapUsed =
+                new MemorySize((long) 
globalMetrics.get(ScalingMetric.HEAP_USED).getAverage());
+        LOG.info("TM heap used size: {}", heapUsed);
+        return heapUsed;
+    }
+
+    private static boolean shouldTransferHeapToManagedMemory(
+            Configuration config, long heapDiffBytes) {
+        return 
config.get(AutoScalerOptions.MEMORY_TUNING_TRANSFER_HEAP_TO_MANAGED)
+                && heapDiffBytes < 0
+                && 
"rocksdb".equalsIgnoreCase(config.get(StateBackendOptions.STATE_BACKEND));
+    }
+
+    public static MemorySize getTotalMemory(Configuration config, 
JobAutoScalerContext<?> ctx) {
+        MemorySize overrideSize = 
config.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY);
+        if (overrideSize != null) {
+            return overrideSize;
+        }
+        return ctx.getTaskManagerMemory().orElse(MemorySize.ZERO);
+    }
+
+    private static MemorySize adjustTotalTmMemory(JobAutoScalerContext<?> ctx, 
long heapDiffBytes) {
+
+        var specTaskManagerMemory = 
ctx.getTaskManagerMemory().orElse(MemorySize.ZERO);
+        if (specTaskManagerMemory.compareTo(MemorySize.ZERO) <= 0) {
+            LOG.warn("Spec TaskManager memory size could not be determined.");
+            return MemorySize.ZERO;
+        }
+
+        if (shouldTransferHeapToManagedMemory(ctx.getConfiguration(), 
heapDiffBytes)) {
+            // Total size does not change
+            return specTaskManagerMemory;
+        }
+
+        long newTotalMemBytes = specTaskManagerMemory.getBytes() + 
heapDiffBytes;
+        // TM container memory can never grow beyond the user-specified max 
memory
+        newTotalMemBytes = Math.min(newTotalMemBytes, 
specTaskManagerMemory.getBytes());
+
+        MemorySize totalMemory = new MemorySize(newTotalMemBytes);
+        LOG.info("Setting new total TaskManager memory to {}", totalMemory);
+        return totalMemory;
+    }
+
+    private static ProcessMemoryOptions getMemoryOptions() {
+        return new ProcessMemoryOptions(
+                Arrays.asList(
+                        TaskManagerOptions.TASK_HEAP_MEMORY,
+                        TaskManagerOptions.MANAGED_MEMORY_SIZE),
+                TaskManagerOptions.TOTAL_FLINK_MEMORY,
+                TaskManagerOptions.TOTAL_PROCESS_MEMORY,
+                new JvmMetaspaceAndOverheadOptions(
+                        TaskManagerOptions.JVM_METASPACE,
+                        TaskManagerOptions.JVM_OVERHEAD_MIN,
+                        TaskManagerOptions.JVM_OVERHEAD_MAX,
+                        TaskManagerOptions.JVM_OVERHEAD_FRACTION));
+    }
+
+    private static float getFraction(MemorySize enumerator, MemorySize 
denominator) {
+        // Round to three decimal places
+        return (float)
+                (Math.round(enumerator.getBytes() / (double) 
denominator.getBytes() * 1000)
+                        / 1000.);
+    }
+
+    /** Format config such that it can be directly used as a Flink 
configuration. */
+    private static String formatConfig(Configuration config) {
+        var sb = new StringBuilder();
+        for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
+            sb.append(entry.getKey())
+                    .append(": ")
+                    .append(entry.getValue())
+                    .append(System.lineSeparator());
+        }
+        return sb.toString();
+    }
+}
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
index a7d80ee2..bb1962f5 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
@@ -31,7 +31,10 @@ import 
org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
 import org.apache.flink.autoscaler.topology.JobTopology;
 import org.apache.flink.autoscaler.topology.VertexInfo;
 import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
@@ -44,6 +47,8 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import javax.annotation.Nullable;
+
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Collection;
@@ -53,6 +58,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 
+import static java.util.Map.entry;
 import static 
org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext;
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
@@ -288,6 +294,33 @@ public class JobAutoScalerImplTest {
         assertParallelismOverrides(Map.of(v1.toString(), "1", v2.toString(), 
"4"));
     }
 
+    @Test
+    void testApplyConfigOverrides() throws Exception {
+        
context.getConfiguration().set(AutoScalerOptions.MEMORY_TUNING_ENABLED, true);
+        var autoscaler =
+                new JobAutoScalerImpl<>(
+                        null, null, null, eventCollector, scalingRealizer, 
stateStore);
+
+        // Initially we should return empty overrides, do not crate any state
+        assertThat(stateStore.getConfigOverrides(context).toMap()).isEmpty();
+
+        var config = new Configuration();
+        config.set(TaskManagerOptions.TASK_HEAP_MEMORY, new MemorySize(42));
+        stateStore.storeConfigOverrides(context, config);
+        stateStore.flush(context);
+
+        autoscaler.applyConfigOverrides(context);
+        assertThat(getEvent().getConfigOverrides().toMap())
+                
.containsExactly(entry(TaskManagerOptions.TASK_HEAP_MEMORY.key(), "42 bytes"));
+        assertThat(stateStore.getConfigOverrides(context)).isEqualTo(config);
+
+        // Disabling autoscaler should clear overrides
+        context.getConfiguration().setString(AUTOSCALER_ENABLED.key(), 
"false");
+        autoscaler.scale(context);
+        autoscaler.applyConfigOverrides(context);
+        assertThat(getEvent().getConfigOverrides().toMap()).isEmpty();
+    }
+
     @Test
     void testAutoscalerDisabled() throws Exception {
         context.getConfiguration().setBoolean(AUTOSCALER_ENABLED, false);
@@ -320,12 +353,30 @@ public class JobAutoScalerImplTest {
 
     private void assertParallelismOverrides(Map<String, String> 
expectedOverrides) {
         TestingScalingRealizer.Event<JobID, JobAutoScalerContext<JobID>> 
scalingEvent;
-        scalingEvent = scalingRealizer.events.poll();
-        if (expectedOverrides == null) {
-            assertThat(scalingEvent).isNull();
+        do {
+            scalingEvent = getEvent();
+        } while (scalingEvent != null && 
scalingEvent.getParallelismOverrides() == null);
+
+        if (scalingEvent == null) {
             return;
         }
         assertThat(scalingEvent).isNotNull();
         assertEquals(expectedOverrides, 
scalingEvent.getParallelismOverrides());
     }
+
+    @Nullable
+    private TestingScalingRealizer.Event<JobID, JobAutoScalerContext<JobID>> 
getEvent(
+            Map<String, String> expectedOverrides) {
+        var scalingEvent = getEvent();
+        if (expectedOverrides == null) {
+            assertThat(scalingEvent).isNull();
+            return null;
+        }
+        return scalingEvent;
+    }
+
+    @Nullable
+    private TestingScalingRealizer.Event<JobID, JobAutoScalerContext<JobID>> 
getEvent() {
+        return scalingRealizer.events.poll();
+    }
 }
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
index 46f85489..456105d2 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
@@ -24,7 +24,6 @@ import 
org.apache.flink.autoscaler.event.TestingEventCollector;
 import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
 import org.apache.flink.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
@@ -72,8 +71,6 @@ public class JobVertexScalerTest {
                         JobStatus.RUNNING,
                         conf,
                         new UnregisteredMetricsGroup(),
-                        0,
-                        MemorySize.ZERO,
                         null);
         restartTime = conf.get(AutoScalerOptions.RESTART_TIME);
     }
@@ -503,7 +500,7 @@ public class JobVertexScalerTest {
                 ScalingMetric.TRUE_PROCESSING_RATE,
                 new EvaluatedScalingMetric(trueProcessingRate, 
trueProcessingRate));
         metrics.put(ScalingMetric.GC_PRESSURE, 
EvaluatedScalingMetric.of(Double.NaN));
-        metrics.put(ScalingMetric.HEAP_USAGE, 
EvaluatedScalingMetric.of(Double.NaN));
+        metrics.put(ScalingMetric.HEAP_MAX_USAGE_RATIO, 
EvaluatedScalingMetric.of(Double.NaN));
         ScalingMetricEvaluator.computeProcessingRateThresholds(metrics, conf, 
false, restartTime);
         return metrics;
     }
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
index cf866c01..9f10e19f 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
@@ -29,7 +29,6 @@ import org.apache.flink.autoscaler.state.AutoScalerStateStore;
 import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
 import org.apache.flink.autoscaler.topology.JobTopology;
 import org.apache.flink.autoscaler.topology.VertexInfo;
-import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
 
@@ -252,8 +251,6 @@ public class RecommendedParallelismTest {
                         JobStatus.CREATED,
                         context.getConfiguration(),
                         context.getMetricGroup(),
-                        0,
-                        MemorySize.ZERO,
                         getRestClusterClientSupplier());
     }
 
@@ -266,8 +263,6 @@ public class RecommendedParallelismTest {
                         JobStatus.RUNNING,
                         context.getConfiguration(),
                         context.getMetricGroup(),
-                        0,
-                        MemorySize.ZERO,
                         getRestClusterClientSupplier());
     }
 
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java
index 462fd21c..863950af 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.autoscaler.metrics.FlinkMetric;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import 
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -115,8 +114,6 @@ public class RestApiMetricsCollectorTest {
                         JobStatus.RUNNING,
                         conf,
                         new UnregisteredMetricsGroup(),
-                        0,
-                        MemorySize.ZERO,
                         () -> restClusterClient);
 
         var jobVertexIDMapMap = collector.queryAllAggregatedMetrics(context, 
metrics);
@@ -228,8 +225,6 @@ public class RestApiMetricsCollectorTest {
                         JobStatus.RUNNING,
                         conf,
                         new UnregisteredMetricsGroup(),
-                        0,
-                        MemorySize.ZERO,
                         () -> client);
         var collector = new RestApiMetricsCollector<JobID, 
JobAutoScalerContext<JobID>>();
 
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
index c3ef9296..c5591a76 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.autoscaler.state.AutoScalerStateStore;
 import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import org.junit.jupiter.api.BeforeEach;
@@ -68,7 +69,7 @@ public class ScalingExecutorTest {
     private static final Map<ScalingMetric, EvaluatedScalingMetric> 
dummyGlobalMetrics =
             Map.of(
                     ScalingMetric.GC_PRESSURE, 
EvaluatedScalingMetric.of(Double.NaN),
-                    ScalingMetric.HEAP_USAGE, 
EvaluatedScalingMetric.of(Double.NaN));
+                    ScalingMetric.HEAP_MAX_USAGE_RATIO, 
EvaluatedScalingMetric.of(Double.NaN));
 
     @BeforeEach
     public void setup() {
@@ -223,7 +224,7 @@ public class ScalingExecutorTest {
                                 EvaluatedScalingMetric.of(9),
                                 ScalingMetric.GC_PRESSURE,
                                 EvaluatedScalingMetric.of(Double.NaN),
-                                ScalingMetric.HEAP_USAGE,
+                                ScalingMetric.HEAP_MAX_USAGE_RATIO,
                                 EvaluatedScalingMetric.of(Double.NaN)));
 
         // Would normally scale without resource usage check
@@ -256,6 +257,43 @@ public class ScalingExecutorTest {
                         now));
     }
 
+    @Test
+    public void testMemoryTuning() throws Exception {
+        context = TestingAutoscalerUtils.createResourceAwareContext();
+        
context.getConfiguration().set(AutoScalerOptions.MEMORY_TUNING_ENABLED, true);
+        context.getConfiguration().set(TaskManagerOptions.NUM_TASK_SLOTS, 5);
+        context.getConfiguration()
+                .set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.parse("30 gb"));
+
+        var source = new JobVertexID();
+        var sink = new JobVertexID();
+        var now = Instant.now();
+
+        var globalMetrics =
+                Map.of(
+                        ScalingMetric.NUM_TASK_SLOTS_USED,
+                        EvaluatedScalingMetric.of(9),
+                        ScalingMetric.HEAP_USED,
+                        EvaluatedScalingMetric.avg(MemorySize.parse("5 
Gb").getBytes()),
+                        ScalingMetric.HEAP_MAX_USAGE_RATIO,
+                        EvaluatedScalingMetric.of(Double.NaN),
+                        ScalingMetric.GC_PRESSURE,
+                        EvaluatedScalingMetric.of(Double.NaN));
+        var vertexMetrics =
+                Map.of(source, evaluated(10, 50, 100, 50, 0), sink, 
evaluated(10, 50, 100, 50, 0));
+        var metrics = new EvaluatedMetrics(vertexMetrics, globalMetrics);
+
+        assertTrue(
+                scalingDecisionExecutor.scaleResource(
+                        context, metrics, new HashMap<>(), new 
ScalingTracking(), now));
+        assertEquals(
+                "6.000gb (6442450944 bytes)",
+                stateStore
+                        .getConfigOverrides(context)
+                        .get(TaskManagerOptions.TASK_HEAP_MEMORY)
+                        .toHumanReadableString());
+    }
+
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     public void testScalingEventsWith0IntervalConfig(boolean scalingEnabled) 
throws Exception {
@@ -350,7 +388,7 @@ public class ScalingExecutorTest {
                         Map.of(
                                 ScalingMetric.GC_PRESSURE,
                                 EvaluatedScalingMetric.of(Double.NaN),
-                                ScalingMetric.HEAP_USAGE,
+                                ScalingMetric.HEAP_MAX_USAGE_RATIO,
                                 EvaluatedScalingMetric.of(Double.NaN)));
 
         // Baseline, no GC/Heap metrics
@@ -365,7 +403,7 @@ public class ScalingExecutorTest {
                         Map.of(
                                 ScalingMetric.GC_PRESSURE,
                                 EvaluatedScalingMetric.of(0.49),
-                                ScalingMetric.HEAP_USAGE,
+                                ScalingMetric.HEAP_MAX_USAGE_RATIO,
                                 new EvaluatedScalingMetric(0.9, 0.79)));
         assertTrue(
                 scalingDecisionExecutor.scaleResource(
@@ -380,7 +418,7 @@ public class ScalingExecutorTest {
                         Map.of(
                                 ScalingMetric.GC_PRESSURE,
                                 EvaluatedScalingMetric.of(0.51),
-                                ScalingMetric.HEAP_USAGE,
+                                ScalingMetric.HEAP_MAX_USAGE_RATIO,
                                 new EvaluatedScalingMetric(0.9, 0.79)));
         assertFalse(
                 scalingDecisionExecutor.scaleResource(
@@ -395,7 +433,7 @@ public class ScalingExecutorTest {
                         Map.of(
                                 ScalingMetric.GC_PRESSURE,
                                 EvaluatedScalingMetric.of(0.49),
-                                ScalingMetric.HEAP_USAGE,
+                                ScalingMetric.HEAP_MAX_USAGE_RATIO,
                                 new EvaluatedScalingMetric(0.6, 0.81)));
         assertFalse(
                 scalingDecisionExecutor.scaleResource(
@@ -405,10 +443,11 @@ public class ScalingExecutorTest {
     }
 
     private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(
-            int parallelism, double target, double procRate, double 
catchupRate) {
+            int parallelism, double current, double target, double procRate, 
double catchupRate) {
         var metrics = new HashMap<ScalingMetric, EvaluatedScalingMetric>();
         metrics.put(ScalingMetric.PARALLELISM, 
EvaluatedScalingMetric.of(parallelism));
         metrics.put(ScalingMetric.MAX_PARALLELISM, 
EvaluatedScalingMetric.of(720));
+        metrics.put(ScalingMetric.CURRENT_PROCESSING_RATE, 
EvaluatedScalingMetric.avg(current));
         metrics.put(ScalingMetric.TARGET_DATA_RATE, new 
EvaluatedScalingMetric(target, target));
         metrics.put(ScalingMetric.CATCH_UP_DATA_RATE, 
EvaluatedScalingMetric.of(catchupRate));
         metrics.put(
@@ -420,6 +459,11 @@ public class ScalingExecutorTest {
         return metrics;
     }
 
+    private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(
+            int parallelism, double target, double procRate, double 
catchupRate) {
+        return evaluated(parallelism, Double.NaN, target, procRate, 
catchupRate);
+    }
+
     private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(
             int parallelism, double target, double procRate) {
         return evaluated(parallelism, target, procRate, 0.);
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
index 0bfbd989..cbbc2b0e 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
@@ -576,10 +576,12 @@ public class ScalingMetricEvaluatorTest {
 
         assertEquals(
                 Map.of(
-                        ScalingMetric.HEAP_USAGE,
+                        ScalingMetric.HEAP_MAX_USAGE_RATIO,
                         EvaluatedScalingMetric.of(Double.NaN),
                         ScalingMetric.GC_PRESSURE,
                         EvaluatedScalingMetric.of(Double.NaN),
+                        ScalingMetric.HEAP_USED,
+                        EvaluatedScalingMetric.of(Double.NaN),
                         ScalingMetric.NUM_TASK_SLOTS_USED,
                         EvaluatedScalingMetric.of(Double.NaN)),
                 ScalingMetricEvaluator.evaluateGlobalMetrics(globalMetrics));
@@ -589,13 +591,21 @@ public class ScalingMetricEvaluatorTest {
                 new CollectedMetrics(
                         Map.of(),
                         Map.of(),
-                        Map.of(ScalingMetric.HEAP_USAGE, 0.5, 
ScalingMetric.GC_PRESSURE, 0.6)));
+                        Map.of(
+                                ScalingMetric.HEAP_MAX_USAGE_RATIO,
+                                0.5,
+                                ScalingMetric.GC_PRESSURE,
+                                0.6,
+                                ScalingMetric.HEAP_USED,
+                                512.)));
         assertEquals(
                 Map.of(
-                        ScalingMetric.HEAP_USAGE,
+                        ScalingMetric.HEAP_MAX_USAGE_RATIO,
                         new EvaluatedScalingMetric(0.5, 0.5),
                         ScalingMetric.GC_PRESSURE,
                         EvaluatedScalingMetric.of(0.6),
+                        ScalingMetric.HEAP_USED,
+                        new EvaluatedScalingMetric(512, 512),
                         ScalingMetric.NUM_TASK_SLOTS_USED,
                         EvaluatedScalingMetric.of(Double.NaN)),
                 ScalingMetricEvaluator.evaluateGlobalMetrics(globalMetrics));
@@ -606,18 +616,22 @@ public class ScalingMetricEvaluatorTest {
                         Map.of(),
                         Map.of(),
                         Map.of(
-                                ScalingMetric.HEAP_USAGE,
+                                ScalingMetric.HEAP_MAX_USAGE_RATIO,
                                 0.7,
                                 ScalingMetric.GC_PRESSURE,
                                 0.8,
+                                ScalingMetric.HEAP_USED,
+                                1024.,
                                 ScalingMetric.NUM_TASK_SLOTS_USED,
                                 42.)));
         assertEquals(
                 Map.of(
-                        ScalingMetric.HEAP_USAGE,
+                        ScalingMetric.HEAP_MAX_USAGE_RATIO,
                         new EvaluatedScalingMetric(0.7, 0.6),
                         ScalingMetric.GC_PRESSURE,
                         EvaluatedScalingMetric.of(0.8),
+                        ScalingMetric.HEAP_USED,
+                        new EvaluatedScalingMetric(1024., 768.),
                         ScalingMetric.NUM_TASK_SLOTS_USED,
                         EvaluatedScalingMetric.of(42.)),
                 ScalingMetricEvaluator.evaluateGlobalMetrics(globalMetrics));
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/TestingAutoscalerUtils.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/TestingAutoscalerUtils.java
index 163134d9..9869e4a3 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/TestingAutoscalerUtils.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/TestingAutoscalerUtils.java
@@ -32,6 +32,7 @@ import org.apache.flink.util.function.SupplierWithException;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 /** The utils for test autoscaler. */
@@ -47,8 +48,6 @@ public class TestingAutoscalerUtils {
                 JobStatus.RUNNING,
                 new Configuration(),
                 metricGroup,
-                0,
-                MemorySize.ZERO,
                 getRestClusterClientSupplier());
     }
 
@@ -62,17 +61,15 @@ public class TestingAutoscalerUtils {
                 JobStatus.RUNNING,
                 new Configuration(),
                 metricGroup,
-                0,
-                MemorySize.ZERO,
                 TestingAutoscalerUtils.getRestClusterClientSupplier()) {
             @Override
-            public double getTaskManagerCpu() {
-                return 100;
+            public Optional<Double> getTaskManagerCpu() {
+                return Optional.of(100.);
             }
 
             @Override
-            public MemorySize getTaskManagerMemory() {
-                return MemorySize.parse("65536 bytes");
+            public Optional<MemorySize> getTaskManagerMemory() {
+                return Optional.of(MemorySize.parse("30 gb"));
             }
         };
     }
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
index 9fb8b8b9..fe640711 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.autoscaler.metrics;
 import org.apache.flink.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.autoscaler.topology.JobTopology;
 import org.apache.flink.autoscaler.topology.VertexInfo;
+import org.apache.flink.autoscaler.tuning.MemoryTuning;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
@@ -482,22 +483,43 @@ public class ScalingMetricsTest {
 
     @Test
     public void testGlobalMetrics() {
-        assertEquals(Map.of(), ScalingMetrics.computeGlobalMetrics(Map.of(), 
Map.of()));
+        Configuration conf = new Configuration();
+        conf.set(AutoScalerOptions.MEMORY_TUNING_HEAP_TARGET, 
MemoryTuning.HeapUsageTarget.AVG);
+        assertEquals(Map.of(), ScalingMetrics.computeGlobalMetrics(Map.of(), 
Map.of(), conf));
         assertEquals(
                 Map.of(),
                 ScalingMetrics.computeGlobalMetrics(
-                        Map.of(), Map.of(FlinkMetric.HEAP_USED, aggMax(100))));
+                        Map.of(), Map.of(FlinkMetric.HEAP_USED, aggMax(100)), 
conf));
         assertEquals(
-                Map.of(ScalingMetric.HEAP_USAGE, 0.5, 
ScalingMetric.GC_PRESSURE, 0.25),
+                Map.of(
+                        ScalingMetric.HEAP_MAX_USAGE_RATIO,
+                        0.5,
+                        ScalingMetric.GC_PRESSURE,
+                        0.25,
+                        ScalingMetric.HEAP_USED,
+                        75.),
                 ScalingMetrics.computeGlobalMetrics(
                         Map.of(),
                         Map.of(
                                 FlinkMetric.HEAP_USED,
-                                aggMax(100),
+                                aggAvgMax(75, 100),
                                 FlinkMetric.HEAP_MAX,
                                 aggMax(200.),
                                 FlinkMetric.TOTAL_GC_TIME_PER_SEC,
-                                aggMax(250.))));
+                                aggMax(250.)),
+                        conf));
+
+        conf.set(AutoScalerOptions.MEMORY_TUNING_HEAP_TARGET, 
MemoryTuning.HeapUsageTarget.MAX);
+        assertEquals(
+                Map.of(ScalingMetric.HEAP_MAX_USAGE_RATIO, 0.5, 
ScalingMetric.HEAP_USED, 100.),
+                ScalingMetrics.computeGlobalMetrics(
+                        Map.of(),
+                        Map.of(
+                                FlinkMetric.HEAP_USED,
+                                aggAvgMax(75, 100),
+                                FlinkMetric.HEAP_MAX,
+                                aggMax(200.)),
+                        conf));
     }
 
     private static AggregatedMetric aggSum(double sum) {
@@ -507,4 +529,8 @@ public class ScalingMetricsTest {
     private static AggregatedMetric aggMax(double max) {
         return new AggregatedMetric("", Double.NaN, max, Double.NaN, 
Double.NaN);
     }
+
+    private static AggregatedMetric aggAvgMax(double avg, double max) {
+        return new AggregatedMetric("", Double.NaN, max, avg, Double.NaN);
+    }
 }
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/realizer/TestingScalingRealizer.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/realizer/TestingScalingRealizer.java
index 1e5031c1..d46c5bba 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/realizer/TestingScalingRealizer.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/realizer/TestingScalingRealizer.java
@@ -18,6 +18,9 @@
 package org.apache.flink.autoscaler.realizer;
 
 import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.configuration.Configuration;
+
+import lombok.Getter;
 
 import java.util.LinkedList;
 import java.util.Map;
@@ -29,28 +32,45 @@ public class TestingScalingRealizer<KEY, Context extends 
JobAutoScalerContext<KE
     public final LinkedList<Event<KEY, Context>> events = new LinkedList<>();
 
     @Override
-    public void realize(Context context, Map<String, String> 
parallelismOverrides) {
+    public void realizeParallelismOverrides(
+            Context context, Map<String, String> parallelismOverrides) {
         events.add(new Event<>(context, parallelismOverrides));
     }
 
+    @Override
+    public void realizeConfigOverrides(Context context, Configuration 
configOverrides) {
+        events.add(new Event<>(context, configOverrides));
+    }
+
     /** The collected event. */
     public static class Event<KEY, Context extends JobAutoScalerContext<KEY>> {
 
-        private final Context context;
+        @Getter private final Context context;
+
+        @Getter private Map<String, String> parallelismOverrides;
 
-        private final Map<String, String> parallelismOverrides;
+        @Getter private Configuration configOverrides;
 
         public Event(Context context, Map<String, String> 
parallelismOverrides) {
             this.context = context;
             this.parallelismOverrides = parallelismOverrides;
         }
 
-        public Context getContext() {
-            return context;
+        public Event(Context context, Configuration configOverrides) {
+            this.context = context;
+            this.configOverrides = configOverrides;
         }
 
-        public Map<String, String> getParallelismOverrides() {
-            return parallelismOverrides;
+        @Override
+        public String toString() {
+            return "Event{"
+                    + "context="
+                    + context
+                    + ", parallelismOverrides="
+                    + parallelismOverrides
+                    + ", configOverrides="
+                    + configOverrides
+                    + '}';
         }
     }
 }
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java
index dcab0c4c..11db27c0 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java
@@ -21,6 +21,7 @@ import org.apache.flink.autoscaler.JobAutoScalerContext;
 import org.apache.flink.autoscaler.ScalingSummary;
 import org.apache.flink.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import org.junit.jupiter.api.Assertions;
@@ -183,21 +184,26 @@ public abstract class AbstractAutoScalerStateStoreTest<
                         new JobVertexID(),
                         new TreeMap<>(Map.of(Instant.now(), new 
ScalingSummary()))));
         stateStore.storeParallelismOverrides(ctx, Map.of(new 
JobVertexID().toHexString(), "23"));
+        stateStore.storeConfigOverrides(
+                ctx, Configuration.fromMap(Map.of("config.value", "value")));
 
         assertThat(stateStore.getCollectedMetrics(ctx)).isNotEmpty();
         assertThat(stateStore.getScalingHistory(ctx)).isNotEmpty();
         assertThat(stateStore.getParallelismOverrides(ctx)).isNotEmpty();
+        assertThat(stateStore.getConfigOverrides(ctx).toMap()).isNotEmpty();
 
         stateStore.flush(ctx);
 
         assertThat(stateStore.getCollectedMetrics(ctx)).isNotEmpty();
         assertThat(stateStore.getScalingHistory(ctx)).isNotEmpty();
         assertThat(stateStore.getParallelismOverrides(ctx)).isNotEmpty();
+        assertThat(stateStore.getConfigOverrides(ctx).toMap()).isNotEmpty();
 
         stateStore.clearAll(ctx);
 
         assertThat(stateStore.getCollectedMetrics(ctx)).isEmpty();
         assertThat(stateStore.getScalingHistory(ctx)).isEmpty();
         assertThat(stateStore.getParallelismOverrides(ctx)).isEmpty();
+        assertThat(stateStore.getConfigOverrides(ctx).toMap()).isEmpty();
     }
 }
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/MemoryTuningTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/MemoryTuningTest.java
new file mode 100644
index 00000000..a94b4902
--- /dev/null
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/MemoryTuningTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.utils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.TestingAutoscalerUtils;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.event.TestingEventCollector;
+import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.tuning.MemoryTuning;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Map;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+/** Tests for {@link MemoryTuning}. */
+public class MemoryTuningTest {
+
+    TestingEventCollector<JobID, JobAutoScalerContext<JobID>> eventHandler =
+            new TestingEventCollector<>();
+
+    @Test
+    void testMemoryTuning() {
+        var context = TestingAutoscalerUtils.createResourceAwareContext();
+        var config = context.getConfiguration();
+        config.set(AutoScalerOptions.MEMORY_TUNING_ENABLED, true);
+        config.set(TaskManagerOptions.NUM_TASK_SLOTS, 5);
+        config.set(AutoScalerOptions.SCALING_EVENT_INTERVAL, Duration.ZERO);
+        MemorySize totalMemory = MemorySize.parse("30 gb");
+        config.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, totalMemory);
+
+        var jobVertex1 = new JobVertexID();
+        var jobVertex2 = new JobVertexID();
+        var vertexMetrics =
+                Map.of(
+                        jobVertex1,
+                        Map.of(
+                                ScalingMetric.CURRENT_PROCESSING_RATE,
+                                        EvaluatedScalingMetric.avg(100),
+                                ScalingMetric.EXPECTED_PROCESSING_RATE,
+                                        EvaluatedScalingMetric.of(50),
+                                ScalingMetric.PARALLELISM, 
EvaluatedScalingMetric.of(50)),
+                        jobVertex2,
+                        Map.of(
+                                ScalingMetric.CURRENT_PROCESSING_RATE,
+                                        EvaluatedScalingMetric.avg(100),
+                                ScalingMetric.EXPECTED_PROCESSING_RATE,
+                                        EvaluatedScalingMetric.of(50),
+                                ScalingMetric.PARALLELISM, 
EvaluatedScalingMetric.of(50)));
+
+        var globalMetrics =
+                Map.of(
+                        ScalingMetric.HEAP_USED,
+                        
EvaluatedScalingMetric.avg(MemorySize.ofMebiBytes(5096).getBytes()));
+
+        var metrics = new EvaluatedMetrics(vertexMetrics, globalMetrics);
+
+        Map<String, String> overrides =
+                MemoryTuning.tuneTaskManagerHeapMemory(context, metrics, 
eventHandler).toMap();
+        // Test reducing overall memory
+        assertThat(overrides)
+                .containsExactlyInAnyOrderEntriesOf(
+                        Map.of(
+                                TaskManagerOptions.TASK_HEAP_MEMORY.key(),
+                                "6412251955 bytes",
+                                TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key(),
+                                "0 bytes",
+                                TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
+                                "12348031160 bytes",
+                                TaskManagerOptions.JVM_OVERHEAD_FRACTION.key(),
+                                "0.046",
+                                
TaskManagerOptions.NETWORK_MEMORY_FRACTION.key(),
+                                "0.14",
+                                TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(),
+                                "23323685913 bytes"));
+
+        assertThat(eventHandler.events.poll().getMessage())
+                .startsWith(
+                        "Memory tuning recommends the following configuration 
(automatic tuning is enabled):");
+
+        // Test giving back memory to RocksDB
+        config.set(AutoScalerOptions.MEMORY_TUNING_TRANSFER_HEAP_TO_MANAGED, 
true);
+        config.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
+        overrides = MemoryTuning.tuneTaskManagerHeapMemory(context, metrics, 
eventHandler).toMap();
+        assertThat(overrides)
+                .containsExactlyInAnyOrderEntriesOf(
+                        Map.of(
+                                TaskManagerOptions.TASK_HEAP_MEMORY.key(),
+                                "6412251955 bytes",
+                                TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key(),
+                                "0 bytes",
+                                TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
+                                "21236599967 bytes",
+                                TaskManagerOptions.JVM_OVERHEAD_FRACTION.key(),
+                                "0.033",
+                                
TaskManagerOptions.NETWORK_MEMORY_FRACTION.key(),
+                                "0.14",
+                                TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(),
+                                totalMemory.toString()));
+
+        assertThat(eventHandler.events.poll().getMessage())
+                .startsWith(
+                        "Memory tuning recommends the following configuration 
(automatic tuning is enabled):");
+
+        // Test tuning disabled
+        config.set(AutoScalerOptions.MEMORY_TUNING_ENABLED, false);
+        assertThat(MemoryTuning.tuneTaskManagerHeapMemory(context, metrics, 
eventHandler).toMap())
+                .isEmpty();
+
+        assertThat(eventHandler.events.poll().getMessage())
+                .startsWith(
+                        "Memory tuning recommends the following configuration 
(automatic tuning is disabled):");
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContext.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContext.java
index 45a11c3e..797436ad 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContext.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContext.java
@@ -22,8 +22,6 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.autoscaler.JobAutoScalerContext;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MemorySize;
-import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
@@ -56,14 +54,16 @@ public class KubernetesJobAutoScalerContext extends 
JobAutoScalerContext<Resourc
                 jobStatus,
                 configuration,
                 metricGroup,
-                
Optional.ofNullable(configuration.get(KubernetesConfigOptions.TASK_MANAGER_CPU))
-                        .orElse(0.),
-                
Optional.ofNullable(configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY))
-                        .orElse(MemorySize.ZERO),
                 restClientSupplier);
         this.resourceContext = resourceContext;
     }
 
+    @Override
+    public Optional<Double> getTaskManagerCpu() {
+        return Optional.ofNullable(
+                
getConfiguration().get(KubernetesConfigOptions.TASK_MANAGER_CPU));
+    }
+
     public AbstractFlinkResource<?, ?> getResource() {
         return resourceContext.getResource();
     }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java
index d20f783d..8eb6ebdc 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java
@@ -18,10 +18,18 @@
 package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.autoscaler.realizer.ScalingRealizer;
+import org.apache.flink.autoscaler.tuning.MemoryTuning;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.spec.Resource;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder;
 
 import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
@@ -31,8 +39,10 @@ import java.util.Map;
 public class KubernetesScalingRealizer
         implements ScalingRealizer<ResourceID, KubernetesJobAutoScalerContext> 
{
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesScalingRealizer.class);
+
     @Override
-    public void realize(
+    public void realizeParallelismOverrides(
             KubernetesJobAutoScalerContext context, Map<String, String> 
parallelismOverrides) {
 
         context.getResource()
@@ -43,6 +53,34 @@ public class KubernetesScalingRealizer
                         getOverrideString(context, parallelismOverrides));
     }
 
+    @Override
+    public void realizeConfigOverrides(
+            KubernetesJobAutoScalerContext context, Configuration 
configOverrides) {
+        if (!(context.getResource() instanceof FlinkDeployment)) {
+            // We can't adjust the configuration of non-job deployments.
+            return;
+        }
+        FlinkDeployment flinkDeployment = ((FlinkDeployment) 
context.getResource());
+        // Apply config overrides
+        
flinkDeployment.getSpec().getFlinkConfiguration().putAll(configOverrides.toMap());
+
+        // Update total memory in spec
+        var totalMemoryOverride = MemoryTuning.getTotalMemory(configOverrides, 
context);
+        if (totalMemoryOverride.compareTo(MemorySize.ZERO) <= 0) {
+            LOG.warn("Total memory override {} is not valid", 
totalMemoryOverride);
+            return;
+        }
+        Resource tmResource = 
flinkDeployment.getSpec().getTaskManager().getResource();
+        // Make sure to parse in the same way as the original deploy code path.
+        var currentMemory =
+                MemorySize.parse(
+                        
FlinkConfigBuilder.parseResourceMemoryString(tmResource.getMemory()));
+        if (!totalMemoryOverride.equals(currentMemory)) {
+            // Adjust the resource memory to change the total TM memory
+            
tmResource.setMemory(String.valueOf(totalMemoryOverride.getBytes()));
+        }
+    }
+
     @Nullable
     private static String getOverrideString(
             KubernetesJobAutoScalerContext context, Map<String, String> 
newOverrides) {
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
index 2e516ae6..53ba7eb4 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
@@ -23,6 +23,7 @@ import org.apache.flink.autoscaler.ScalingTracking;
 import org.apache.flink.autoscaler.metrics.CollectedMetrics;
 import org.apache.flink.autoscaler.state.AutoScalerStateStore;
 import org.apache.flink.autoscaler.utils.AutoScalerSerDeModule;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import 
org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -38,6 +39,7 @@ import 
org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.LoaderOptions;
 import io.javaoperatorsdk.operator.processing.event.ResourceID;
 import lombok.SneakyThrows;
 import org.apache.commons.io.IOUtils;
+import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,8 +68,11 @@ public class KubernetesAutoScalerStateStore
     @VisibleForTesting protected static final String COLLECTED_METRICS_KEY = 
"collectedMetrics";
 
     @VisibleForTesting
+    /* Be careful with changing this field name or the internal structure. 
Otherwise the parallelism of all autoscaled pipelines might get reset! */
     protected static final String PARALLELISM_OVERRIDES_KEY = 
"parallelismOverrides";
 
+    protected static final String CONFIG_OVERRIDES_KEY = "configOverrides";
+
     @VisibleForTesting protected static final int MAX_CM_BYTES = 1000000;
 
     protected static final ObjectMapper YAML_MAPPER =
@@ -188,6 +193,27 @@ public class KubernetesAutoScalerStateStore
                 .orElse(new HashMap<>());
     }
 
+    @NotNull
+    @Override
+    public Configuration getConfigOverrides(KubernetesJobAutoScalerContext 
jobContext) {
+        return configMapStore
+                .getSerializedState(jobContext, CONFIG_OVERRIDES_KEY)
+                
.map(KubernetesAutoScalerStateStore::deserializeConfigOverrides)
+                .orElse(new Configuration());
+    }
+
+    @Override
+    public void storeConfigOverrides(
+            KubernetesJobAutoScalerContext jobContext, Configuration 
overrides) {
+        configMapStore.putSerializedState(
+                jobContext, CONFIG_OVERRIDES_KEY, 
serializeConfigOverrides(overrides));
+    }
+
+    @Override
+    public void removeConfigOverrides(KubernetesJobAutoScalerContext 
jobContext) {
+        configMapStore.removeSerializedState(jobContext, CONFIG_OVERRIDES_KEY);
+    }
+
     @Override
     public void removeParallelismOverrides(KubernetesJobAutoScalerContext 
jobContext) {
         configMapStore.removeSerializedState(jobContext, 
PARALLELISM_OVERRIDES_KEY);
@@ -250,6 +276,14 @@ public class KubernetesAutoScalerStateStore
         return ConfigurationUtils.convertValue(overrides, Map.class);
     }
 
+    private static String serializeConfigOverrides(Configuration overrides) {
+        return ConfigurationUtils.convertValue(overrides.toMap(), 
String.class);
+    }
+
+    private static Configuration deserializeConfigOverrides(String overrides) {
+        return 
Configuration.fromMap(ConfigurationUtils.convertValue(overrides, Map.class));
+    }
+
     @VisibleForTesting
     protected void trimHistoryToMaxCmSize(KubernetesJobAutoScalerContext 
context) {
         int scalingHistorySize =
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
index 339dfaa3..835b2200 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
@@ -443,7 +443,6 @@ public class FlinkConfigBuilder {
         }
     }
 
-    // Using the K8s units specification for the JM and TM memory settings
     public static String parseResourceMemoryString(String memory) {
         try {
             return MemorySize.parse(memory).toString();
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java
index 41b029ea..0d5c5510 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java
@@ -65,7 +65,7 @@ public abstract class FlinkResourceContext<CR extends 
AbstractFlinkResource<?, ?
     }
 
     private KubernetesJobAutoScalerContext createJobAutoScalerContext() {
-        Configuration conf = new Configuration(getObserveConfig());
+        Configuration conf = new 
Configuration(getDeployConfig(resource.getSpec()));
         conf.set(
                 AutoScalerOptions.FLINK_CLIENT_TIMEOUT,
                 getOperatorConfig().getFlinkClientTimeout());
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContextTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContextTest.java
index 09d19299..a1416385 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContextTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContextTest.java
@@ -33,6 +33,8 @@ import 
org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import org.junit.jupiter.api.Test;
 
+import java.util.Optional;
+
 import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
 
 /** Tests for {@link KubernetesJobAutoScalerContext}. */
@@ -63,7 +65,8 @@ public class KubernetesJobAutoScalerContextTest {
                                 new FlinkConfigManager(new Configuration()),
                                 null));
 
-        assertThat(context.getTaskManagerCpu()).isEqualTo(23.);
-        
assertThat(context.getTaskManagerMemory()).isEqualTo(MemorySize.parse("1024mb"));
+        assertThat(context.getTaskManagerCpu()).isEqualTo(Optional.of(23.));
+        assertThat(context.getTaskManagerMemory())
+                .isEqualTo(Optional.of(MemorySize.parse("1024mb")));
     }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java
index be6f38c3..9a448304 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java
@@ -17,7 +17,10 @@
 
 package org.apache.flink.kubernetes.operator.autoscaler;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 
 import org.junit.jupiter.api.Test;
@@ -35,7 +38,8 @@ public class KubernetesScalingRealizerTest {
         KubernetesJobAutoScalerContext ctx =
                 TestingKubernetesAutoscalerUtils.createContext("test", null);
 
-        new KubernetesScalingRealizer().realize(ctx, Map.of("a", "1", "b", 
"2"));
+        new KubernetesScalingRealizer()
+                .realizeParallelismOverrides(ctx, Map.of("a", "1", "b", "2"));
 
         assertThat(
                         ctx.getResource()
@@ -59,6 +63,26 @@ public class KubernetesScalingRealizerTest {
         assertOverridesDoNotChange("b:2,a:1", newOverrides);
     }
 
+    @Test
+    public void testApplyMemoryOverrides() {
+        KubernetesJobAutoScalerContext ctx =
+                TestingKubernetesAutoscalerUtils.createContext("test", null);
+
+        var overrides = new Configuration();
+        MemorySize memoryOverride = MemorySize.ofMebiBytes(4096);
+        overrides.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, memoryOverride);
+        new KubernetesScalingRealizer().realizeConfigOverrides(ctx, overrides);
+
+        assertThat(ctx.getResource()).isInstanceOf(FlinkDeployment.class);
+        assertThat(
+                        ((FlinkDeployment) ctx.getResource())
+                                .getSpec()
+                                .getTaskManager()
+                                .getResource()
+                                .getMemory())
+                .isEqualTo(String.valueOf(memoryOverride.getBytes()));
+    }
+
     private void assertOverridesDoNotChange(
             String currentOverrides, LinkedHashMap<String, String> 
newOverrides) {
 
@@ -77,7 +101,7 @@ public class KubernetesScalingRealizerTest {
                 .getFlinkConfiguration()
                 .remove(PipelineOptions.PARALLELISM_OVERRIDES.key());
 
-        new KubernetesScalingRealizer().realize(ctx, newOverrides);
+        new KubernetesScalingRealizer().realizeParallelismOverrides(ctx, 
newOverrides);
 
         assertThat(
                         ctx.getResource()

Reply via email to