This is an automated email from the ASF dual-hosted git repository.
mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 48692b72 [FLINK-34152] Tune heap memory of autoscaled jobs (#762)
48692b72 is described below
commit 48692b7243086a2ccf2cb4c64a6b00306fa1d65f
Author: Maximilian Michels <[email protected]>
AuthorDate: Thu Feb 15 12:46:06 2024 +0100
[FLINK-34152] Tune heap memory of autoscaled jobs (#762)
---
.../generated/auto_scaler_configuration.html | 30 +++
.../jdbc/state/JdbcAutoScalerStateStore.java | 32 +++
.../flink/autoscaler/jdbc/state/StateType.java | 3 +-
.../flinkcluster/FlinkClusterJobListFetcher.java | 3 -
.../realizer/RescaleApiScalingRealizer.java | 12 +-
.../StandaloneAutoscalerExecutorTest.java | 10 +-
.../realizer/RescaleApiScalingRealizerTest.java | 18 +-
.../flink/autoscaler/JobAutoScalerContext.java | 24 +-
.../apache/flink/autoscaler/JobAutoScalerImpl.java | 22 +-
.../flink/autoscaler/RestApiMetricsCollector.java | 6 +-
.../apache/flink/autoscaler/ScalingExecutor.java | 21 +-
.../flink/autoscaler/ScalingMetricCollector.java | 2 +-
.../flink/autoscaler/ScalingMetricEvaluator.java | 14 +-
.../flink/autoscaler/config/AutoScalerOptions.java | 43 ++++
.../autoscaler/metrics/EvaluatedScalingMetric.java | 8 +-
.../flink/autoscaler/metrics/ScalingMetric.java | 5 +-
.../flink/autoscaler/metrics/ScalingMetrics.java | 19 +-
.../flink/autoscaler/realizer/ScalingRealizer.java | 6 +-
.../autoscaler/state/AutoScalerStateStore.java | 8 +
.../state/InMemoryAutoScalerStateStore.java | 25 ++
.../flink/autoscaler/tuning/MemoryTuning.java | 251 +++++++++++++++++++++
.../flink/autoscaler/JobAutoScalerImplTest.java | 57 ++++-
.../flink/autoscaler/JobVertexScalerTest.java | 5 +-
.../autoscaler/RecommendedParallelismTest.java | 5 -
.../autoscaler/RestApiMetricsCollectorTest.java | 5 -
.../flink/autoscaler/ScalingExecutorTest.java | 58 ++++-
.../autoscaler/ScalingMetricEvaluatorTest.java | 24 +-
.../flink/autoscaler/TestingAutoscalerUtils.java | 13 +-
.../autoscaler/metrics/ScalingMetricsTest.java | 36 ++-
.../realizer/TestingScalingRealizer.java | 34 ++-
.../state/AbstractAutoScalerStateStoreTest.java | 6 +
.../flink/autoscaler/utils/MemoryTuningTest.java | 139 ++++++++++++
.../autoscaler/KubernetesJobAutoScalerContext.java | 12 +-
.../autoscaler/KubernetesScalingRealizer.java | 40 +++-
.../state/KubernetesAutoScalerStateStore.java | 34 +++
.../operator/config/FlinkConfigBuilder.java | 1 -
.../operator/controller/FlinkResourceContext.java | 2 +-
.../KubernetesJobAutoScalerContextTest.java | 7 +-
.../autoscaler/KubernetesScalingRealizerTest.java | 28 ++-
39 files changed, 951 insertions(+), 117 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
index aa5ff525..ee6fb731 100644
--- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
+++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
@@ -62,6 +62,36 @@
<td>Double</td>
<td>Max allowed percentage of heap usage during scaling
operations. Autoscaling will be paused if the heap usage exceeds this
threshold.</td>
</tr>
+ <tr>
+ <td><h5>job.autoscaler.memory.tuning.enabled</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>If enabled, the initial amount of memory specified for
TaskManagers will be reduced according to the observed needs.</td>
+ </tr>
+ <tr>
+ <td><h5>job.autoscaler.memory.tuning.heap.min</h5></td>
+ <td style="word-wrap: break-word;">512 mb</td>
+ <td>MemorySize</td>
+ <td>The minimum amount of TaskManager heap memory, if memory
tuning is enabled.</td>
+ </tr>
+ <tr>
+ <td><h5>job.autoscaler.memory.tuning.heap.overhead</h5></td>
+ <td style="word-wrap: break-word;">0.2</td>
+ <td>Double</td>
+ <td>Overhead to add to heap tuning decisions (0-1). This ensures
spare capacity and allows the memory to grow beyond the dynamically computed
limits, but never beyond the original memory limits.</td>
+ </tr>
+ <tr>
+ <td><h5>job.autoscaler.memory.tuning.heap.target-usage</h5></td>
+ <td style="word-wrap: break-word;">MAX</td>
+ <td><p>Enum</p></td>
+ <td>The heap size to target. Average usage (AVG) will yield better
savings. Max usage will yield more conservative savings.<br /><br />Possible
values:<ul><li>"AVG"</li><li>"MAX"</li></ul></td>
+ </tr>
+ <tr>
+
<td><h5>job.autoscaler.memory.tuning.heap.transfer-to-managed</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>If enabled, any reduction of heap memory will increase the
managed memory used by RocksDB.</td>
+ </tr>
<tr>
<td><h5>job.autoscaler.metrics.busy-time.aggregator</h5></td>
<td style="word-wrap: break-word;">MAX</td>
diff --git
a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java
index 7b459ef4..3f2af617 100644
---
a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java
+++
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java
@@ -25,6 +25,7 @@ import org.apache.flink.autoscaler.ScalingTracking;
import org.apache.flink.autoscaler.metrics.CollectedMetrics;
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
import org.apache.flink.autoscaler.utils.AutoScalerSerDeModule;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -46,6 +47,7 @@ import java.util.SortedMap;
import java.util.TreeMap;
import static
org.apache.flink.autoscaler.jdbc.state.StateType.COLLECTED_METRICS;
+import static
org.apache.flink.autoscaler.jdbc.state.StateType.CONFIG_OVERRIDES;
import static
org.apache.flink.autoscaler.jdbc.state.StateType.PARALLELISM_OVERRIDES;
import static org.apache.flink.autoscaler.jdbc.state.StateType.SCALING_HISTORY;
import static
org.apache.flink.autoscaler.jdbc.state.StateType.SCALING_TRACKING;
@@ -189,6 +191,28 @@ public class JdbcAutoScalerStateStore<KEY, Context extends
JobAutoScalerContext<
jdbcStateStore.removeSerializedState(getSerializeKey(jobContext),
PARALLELISM_OVERRIDES);
}
+ @Override
+ public void storeConfigOverrides(Context jobContext, Configuration
configOverrides) {
+ jdbcStateStore.putSerializedState(
+ getSerializeKey(jobContext),
+ CONFIG_OVERRIDES,
+ serializeConfigOverrides(configOverrides));
+ }
+
+ @Nonnull
+ @Override
+ public Configuration getConfigOverrides(Context jobContext) {
+ return jdbcStateStore
+ .getSerializedState(getSerializeKey(jobContext),
CONFIG_OVERRIDES)
+ .map(JdbcAutoScalerStateStore::deserializeConfigOverrides)
+ .orElse(new Configuration());
+ }
+
+ @Override
+ public void removeConfigOverrides(Context jobContext) {
+ jdbcStateStore.removeSerializedState(getSerializeKey(jobContext),
CONFIG_OVERRIDES);
+ }
+
@Override
public void clearAll(Context jobContext) {
jdbcStateStore.clearAll(getSerializeKey(jobContext));
@@ -251,4 +275,12 @@ public class JdbcAutoScalerStateStore<KEY, Context extends
JobAutoScalerContext<
private static Map<String, String> deserializeParallelismOverrides(String
overrides) {
return ConfigurationUtils.convertValue(overrides, Map.class);
}
+
+ private static String serializeConfigOverrides(Configuration overrides) {
+ return ConfigurationUtils.convertValue(overrides.toMap(),
String.class);
+ }
+
+ private static Configuration deserializeConfigOverrides(String overrides) {
+ return
Configuration.fromMap(ConfigurationUtils.convertValue(overrides, Map.class));
+ }
}
diff --git
a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/StateType.java
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/StateType.java
index 728afaa2..ca2864f4 100644
---
a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/StateType.java
+++
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/StateType.java
@@ -26,7 +26,8 @@ public enum StateType {
SCALING_HISTORY("scalingHistory"),
SCALING_TRACKING("scalingTracking"),
COLLECTED_METRICS("collectedMetrics"),
- PARALLELISM_OVERRIDES("parallelismOverrides");
+ PARALLELISM_OVERRIDES("parallelismOverrides"),
+ CONFIG_OVERRIDES("configOverrides");
/**
* The identifier of each state type, it will be used to store. Please
ensure the identifier is
diff --git
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcher.java
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcher.java
index 63d79cd4..e99424c5 100644
---
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcher.java
+++
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcher.java
@@ -22,7 +22,6 @@ import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.standalone.JobListFetcher;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MemorySize;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
@@ -83,8 +82,6 @@ public class FlinkClusterJobListFetcher
jobStatusMessage.getJobState(),
conf,
new UnregisteredMetricsGroup(),
- 0,
- MemorySize.ZERO,
() -> restClientGetter.apply(conf));
}
diff --git
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizer.java
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizer.java
index d04e6d3e..8cba81d5 100644
---
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizer.java
+++
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizer.java
@@ -64,7 +64,8 @@ public class RescaleApiScalingRealizer<KEY, Context extends
JobAutoScalerContext
}
@Override
- public void realize(Context context, Map<String, String>
parallelismOverrides) {
+ public void realizeParallelismOverrides(
+ Context context, Map<String, String> parallelismOverrides) {
Configuration conf = context.getConfiguration();
if (!conf.get(JobManagerOptions.SCHEDULER)
.equals(JobManagerOptions.SchedulerType.Adaptive)) {
@@ -124,6 +125,15 @@ public class RescaleApiScalingRealizer<KEY, Context
extends JobAutoScalerContext
}
}
+ @Override
+ public void realizeConfigOverrides(Context context, Configuration
configOverrides) {
+ // Not currently supported
+ LOG.warn(
+ "{} does not support updating the TaskManager configuration
({})",
+ getClass().getSimpleName(),
+ configOverrides);
+ }
+
private Map<JobVertexID, JobVertexResourceRequirements> getVertexResources(
RestClusterClient<String> client, JobID jobID, Duration
restClientTimeout)
throws Exception {
diff --git
a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java
b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java
index d2e50092..ca905071 100644
---
a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java
+++
b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.autoscaler.JobAutoScaler;
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.event.TestingEventCollector;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MemorySize;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -231,13 +230,6 @@ class StandaloneAutoscalerExecutorTest {
private static JobAutoScalerContext<JobID> createJobAutoScalerContext() {
var jobID = new JobID();
return new JobAutoScalerContext<>(
- jobID,
- jobID,
- JobStatus.RUNNING,
- new Configuration(),
- null,
- 0,
- MemorySize.ZERO,
- null);
+ jobID, jobID, JobStatus.RUNNING, new Configuration(), null,
null);
}
}
diff --git
a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizerTest.java
b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizerTest.java
index 6b209eb0..b16cbdac 100644
---
a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizerTest.java
+++
b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizerTest.java
@@ -24,7 +24,6 @@ import
org.apache.flink.autoscaler.event.TestingEventCollector;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.configuration.MemorySize;
import
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -92,7 +91,7 @@ class RescaleApiScalingRealizerTest {
assertThat(updatedRequirements).isNotDone();
assertThat(closeFuture).isNotDone();
- scalingRealizer.realize(jobContext, newResourceRequirements);
+ scalingRealizer.realizeParallelismOverrides(jobContext,
newResourceRequirements);
// The ResourceRequirements should be updated when the
newResourceRequirements is changed.
if (resourceIsChanged) {
@@ -128,7 +127,7 @@ class RescaleApiScalingRealizerTest {
RescaleApiScalingRealizer<JobID, JobAutoScalerContext<JobID>>
scalingRealizer =
new RescaleApiScalingRealizer<>(eventCollector);
- scalingRealizer.realize(jobContext, resourceRequirements);
+ scalingRealizer.realizeParallelismOverrides(jobContext,
resourceRequirements);
assertThat(eventCollector.events).isEmpty();
}
@@ -149,8 +148,6 @@ class RescaleApiScalingRealizerTest {
JobStatus.CANCELLING,
conf,
null,
- 0,
- MemorySize.ZERO,
() ->
fail(
"The rest client shouldn't be created
if the job isn't running."));
@@ -159,7 +156,7 @@ class RescaleApiScalingRealizerTest {
RescaleApiScalingRealizer<JobID, JobAutoScalerContext<JobID>>
scalingRealizer =
new RescaleApiScalingRealizer<>(eventCollector);
- scalingRealizer.realize(jobContext, resourceRequirements);
+ scalingRealizer.realizeParallelismOverrides(jobContext,
resourceRequirements);
assertThat(eventCollector.events).isEmpty();
}
@@ -167,14 +164,7 @@ class RescaleApiScalingRealizerTest {
JobID jobID,
SupplierWithException<RestClusterClient<String>, Exception>
restClientSupplier) {
return new JobAutoScalerContext<>(
- jobID,
- jobID,
- JobStatus.RUNNING,
- new Configuration(),
- null,
- 0,
- MemorySize.ZERO,
- restClientSupplier);
+ jobID, jobID, JobStatus.RUNNING, new Configuration(), null,
restClientSupplier);
}
private JobResourceRequirements createResourceRequirements(
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerContext.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerContext.java
index 026e0fb3..9c5e614c 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerContext.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerContext.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.util.function.SupplierWithException;
@@ -33,6 +34,8 @@ import lombok.ToString;
import javax.annotation.Nullable;
+import java.util.Optional;
+
/**
* The job autoscaler context, it includes all details related to the current
job.
*
@@ -52,19 +55,28 @@ public class JobAutoScalerContext<KEY> {
@Nullable @Getter private final JobStatus jobStatus;
+ /**
+ * The configuration based on the latest user-provided spec. This is not
the already deployed /
+ * observed configuration.
+ */
@Getter private final Configuration configuration;
@Getter private final MetricGroup metricGroup;
- /** Task manager CPU as a fraction (if available). */
- @Getter private final double taskManagerCpu;
-
- /** Task manager memory size (if available). */
- @Getter @Nullable private final MemorySize taskManagerMemory;
-
@ToString.Exclude
private final SupplierWithException<RestClusterClient<String>, Exception>
restClientSupplier;
+ /** Retrieve the currently configured TaskManager CPU. */
+ public Optional<Double> getTaskManagerCpu() {
+ // Not supported by default
+ return Optional.empty();
+ }
+
+ /** Retrieve the currently configured TaskManager memory. */
+ public Optional<MemorySize> getTaskManagerMemory() {
+ return
Optional.ofNullable(getConfiguration().get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+ }
+
public RestClusterClient<String> getRestClusterClient() throws Exception {
return restClientSupplier.get();
}
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
index b3839ff4..91b18248 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
@@ -26,6 +26,7 @@ import
org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics;
import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
import org.apache.flink.autoscaler.realizer.ScalingRealizer;
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.util.Preconditions;
@@ -106,7 +107,13 @@ public class JobAutoScalerImpl<KEY, Context extends
JobAutoScalerContext<KEY>>
} catch (Throwable e) {
onError(ctx, autoscalerMetrics, e);
} finally {
- applyParallelismOverrides(ctx);
+ try {
+ applyParallelismOverrides(ctx);
+ applyConfigOverrides(ctx);
+ } catch (Exception e) {
+ LOG.error("Error applying overrides.", e);
+ onError(ctx, autoscalerMetrics, e);
+ }
}
}
@@ -151,7 +158,18 @@ public class JobAutoScalerImpl<KEY, Context extends
JobAutoScalerContext<KEY>>
userOverrides.put(k, v);
}
});
- scalingRealizer.realize(ctx, userOverrides);
+ scalingRealizer.realizeParallelismOverrides(ctx, userOverrides);
+ }
+
+ @VisibleForTesting
+ void applyConfigOverrides(Context ctx) throws Exception {
+ if
(!ctx.getConfiguration().get(AutoScalerOptions.MEMORY_TUNING_ENABLED)) {
+ return;
+ }
+
+ Configuration configOverrides = stateStore.getConfigOverrides(ctx);
+ LOG.info("Applying config overrides: {}", configOverrides);
+ scalingRealizer.realizeConfigOverrides(ctx, configOverrides);
}
private void runScalingLogic(Context ctx, AutoscalerFlinkMetrics
autoscalerMetrics)
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java
index 572bec9b..a5b2cf02 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java
@@ -187,7 +187,11 @@ public class RestApiMetricsCollector<KEY, Context extends
JobAutoScalerContext<K
MetricsAggregationParameter aggregationParameter =
(MetricsAggregationParameter) queryParamIt.next();
-
aggregationParameter.resolve(List.of(MetricsAggregationParameter.AggregationMode.MAX));
+ aggregationParameter.resolve(
+ List.of(
+ MetricsAggregationParameter.AggregationMode.MIN,
+ MetricsAggregationParameter.AggregationMode.MAX,
+ MetricsAggregationParameter.AggregationMode.AVG));
var responseBody =
restClient
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
index 094d4680..fdbb3c17 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
@@ -26,6 +26,7 @@ import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.autoscaler.resources.NoopResourceCheck;
import org.apache.flink.autoscaler.resources.ResourceCheck;
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.autoscaler.tuning.MemoryTuning;
import org.apache.flink.autoscaler.utils.CalendarUtils;
import org.apache.flink.autoscaler.utils.ResourceCheckUtils;
import org.apache.flink.configuration.Configuration;
@@ -117,7 +118,12 @@ public class ScalingExecutor<KEY, Context extends
JobAutoScalerContext<KEY>> {
return false;
}
- if (scalingWouldExceedClusterResources(evaluatedMetrics,
scalingSummaries, context)) {
+ var tuningConfig =
+ MemoryTuning.tuneTaskManagerHeapMemory(
+ context, evaluatedMetrics, autoScalerEventHandler);
+
+ if (scalingWouldExceedClusterResources(
+ tuningConfig, evaluatedMetrics, scalingSummaries, context)) {
return false;
}
@@ -132,6 +138,8 @@ public class ScalingExecutor<KEY, Context extends
JobAutoScalerContext<KEY>> {
getVertexParallelismOverrides(
evaluatedMetrics.getVertexMetrics(),
scalingSummaries));
+ autoScalerStateStore.storeConfigOverrides(context, tuningConfig);
+
return true;
}
@@ -243,7 +251,7 @@ public class ScalingExecutor<KEY, Context extends
JobAutoScalerContext<KEY>> {
return true;
}
- var heapUsage =
evaluatedMetrics.get(ScalingMetric.HEAP_USAGE).getAverage();
+ var heapUsage =
evaluatedMetrics.get(ScalingMetric.HEAP_MAX_USAGE_RATIO).getAverage();
if (heapUsage > conf.get(AutoScalerOptions.HEAP_USAGE_THRESHOLD)) {
autoScalerEventHandler.handleEvent(
ctx,
@@ -259,16 +267,15 @@ public class ScalingExecutor<KEY, Context extends
JobAutoScalerContext<KEY>> {
}
private boolean scalingWouldExceedClusterResources(
+ Configuration tuningConfig,
EvaluatedMetrics evaluatedMetrics,
Map<JobVertexID, ScalingSummary> scalingSummaries,
JobAutoScalerContext<?> ctx) {
- final double taskManagerCpu = ctx.getTaskManagerCpu();
- final MemorySize taskManagerMemory = ctx.getTaskManagerMemory();
+ final double taskManagerCpu = ctx.getTaskManagerCpu().orElse(0.);
+ final MemorySize taskManagerMemory =
MemoryTuning.getTotalMemory(tuningConfig, ctx);
- if (taskManagerCpu <= 0
- || taskManagerMemory == null
- || taskManagerMemory.compareTo(MemorySize.ZERO) <= 0) {
+ if (taskManagerCpu <= 0 ||
taskManagerMemory.compareTo(MemorySize.ZERO) <= 0) {
// We can't extract the requirements, we can't make any assumptions
return false;
}
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
index ed5a2e47..c1926541 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
@@ -324,7 +324,7 @@ public abstract class ScalingMetricCollector<KEY, Context
extends JobAutoScalerC
LOG.debug("Output ratios: {}", outputRatios);
var globalMetrics =
- ScalingMetrics.computeGlobalMetrics(collectedJmMetrics,
collectedTmMetrics);
+ ScalingMetrics.computeGlobalMetrics(collectedJmMetrics,
collectedTmMetrics, conf);
LOG.debug("Global metrics: {}", globalMetrics);
return new CollectedMetrics(out, outputRatios, globalMetrics);
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
index 08700235..6b156cc8 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
@@ -50,7 +50,8 @@ import static
org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZ
import static
org.apache.flink.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
import static
org.apache.flink.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.GC_PRESSURE;
-import static org.apache.flink.autoscaler.metrics.ScalingMetric.HEAP_USAGE;
+import static
org.apache.flink.autoscaler.metrics.ScalingMetric.HEAP_MAX_USAGE_RATIO;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.HEAP_USED;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.LAG;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.LOAD;
import static
org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
@@ -314,13 +315,18 @@ public class ScalingMetricEvaluator {
var out = new HashMap<ScalingMetric, EvaluatedScalingMetric>();
var gcPressure = latest.getOrDefault(GC_PRESSURE, Double.NaN);
- var lastHeapUsage = latest.getOrDefault(HEAP_USAGE, Double.NaN);
+ var lastHeapUsage = latest.getOrDefault(HEAP_MAX_USAGE_RATIO,
Double.NaN);
out.put(GC_PRESSURE, EvaluatedScalingMetric.of(gcPressure));
out.put(
- HEAP_USAGE,
+ HEAP_MAX_USAGE_RATIO,
new EvaluatedScalingMetric(
- lastHeapUsage, getAverageGlobalMetric(HEAP_USAGE,
metricHistory)));
+ lastHeapUsage,
+ getAverageGlobalMetric(HEAP_MAX_USAGE_RATIO,
metricHistory)));
+
+ var latestObservation = latest.getOrDefault(HEAP_USED, Double.NaN);
+ double heapSizeAverage = getAverageGlobalMetric(HEAP_USED,
metricHistory);
+ out.put(HEAP_USED, new EvaluatedScalingMetric(latestObservation,
heapSizeAverage));
out.put(
NUM_TASK_SLOTS_USED,
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
index 269dddd2..58bdfce0 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
@@ -18,8 +18,10 @@
package org.apache.flink.autoscaler.config;
import org.apache.flink.autoscaler.metrics.MetricAggregator;
+import org.apache.flink.autoscaler.tuning.MemoryTuning;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.MemorySize;
import java.time.Duration;
import java.util.List;
@@ -250,6 +252,47 @@ public class AutoScalerOptions {
.withDescription(
"Max allowed percentage of heap usage during
scaling operations. Autoscaling will be paused if the heap usage exceeds this
threshold.");
+ public static final ConfigOption<Boolean> MEMORY_TUNING_ENABLED =
+ autoScalerConfig("memory.tuning.enabled")
+ .booleanType()
+ .defaultValue(false)
+
.withFallbackKeys(oldOperatorConfigKey("memory.tuning.enabled"))
+ .withDescription(
+ "If enabled, the initial amount of memory
specified for TaskManagers will be reduced according to the observed needs.");
+
+ public static final ConfigOption<MemoryTuning.HeapUsageTarget>
MEMORY_TUNING_HEAP_TARGET =
+ autoScalerConfig("memory.tuning.heap.target-usage")
+ .enumType(MemoryTuning.HeapUsageTarget.class)
+ .defaultValue(MemoryTuning.HeapUsageTarget.MAX)
+
.withFallbackKeys(oldOperatorConfigKey("memory.tuning.heap.target-usage"))
+ .withDescription(
+ "The heap size to target. Average usage (AVG) will
yield better savings. Max usage will yield more conservative savings.");
+
+ public static final ConfigOption<Double> MEMORY_TUNING_HEAP_OVERHEAD =
+ autoScalerConfig("memory.tuning.heap.overhead")
+ .doubleType()
+ .defaultValue(0.2)
+
.withFallbackKeys(oldOperatorConfigKey("memory.tuning.heap.overhead"))
+ .withDescription(
+ "Overhead to add to heap tuning decisions (0-1).
This ensures spare capacity and allows the memory to grow beyond the
dynamically computed limits, but never beyond the original memory limits.");
+
+ public static final ConfigOption<MemorySize> MEMORY_TUNING_MIN_HEAP =
+ autoScalerConfig("memory.tuning.heap.min")
+ .memoryType()
+ .defaultValue(MemorySize.ofMebiBytes(512L))
+
.withFallbackKeys(oldOperatorConfigKey("memory.tuning.heap.min"))
+ .withDescription(
+ "The minimum amount of TaskManager heap memory, if
memory tuning is enabled.");
+
+ public static final ConfigOption<Boolean>
MEMORY_TUNING_TRANSFER_HEAP_TO_MANAGED =
+ autoScalerConfig("memory.tuning.heap.transfer-to-managed")
+ .booleanType()
+ .defaultValue(false)
+ .withFallbackKeys(
+
oldOperatorConfigKey("memory.tuning.heap.transfer-to-managed"))
+ .withDescription(
+ "If enabled, any reduction of heap memory will
increase the managed memory used by RocksDB.");
+
public static final ConfigOption<Integer> VERTEX_SCALING_HISTORY_COUNT =
autoScalerConfig("history.max.count")
.intType()
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/EvaluatedScalingMetric.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/EvaluatedScalingMetric.java
index c60adf18..cc013dd7 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/EvaluatedScalingMetric.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/EvaluatedScalingMetric.java
@@ -33,7 +33,11 @@ public class EvaluatedScalingMetric {
this.average = ScalingMetrics.roundMetric(average);
}
- public static EvaluatedScalingMetric of(double value) {
- return new EvaluatedScalingMetric(value, Double.NaN);
+ public static EvaluatedScalingMetric of(double current) {
+ return new EvaluatedScalingMetric(current, Double.NaN);
+ }
+
+ public static EvaluatedScalingMetric avg(double average) {
+ return new EvaluatedScalingMetric(Double.NaN, average);
}
}
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
index 1ab1bd3d..dfc8276b 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
@@ -77,8 +77,11 @@ public enum ScalingMetric {
*/
GC_PRESSURE(false),
+ /** Measured used heap size in bytes. */
+ HEAP_USED(true),
+
/** Percentage of max heap used (between 0 and 1). */
- HEAP_USAGE(true),
+ HEAP_MAX_USAGE_RATIO(true),
NUM_TASK_SLOTS_USED(false);
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
index 516e0046..c39427d4 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
@@ -19,6 +19,7 @@ package org.apache.flink.autoscaler.metrics;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.tuning.MemoryTuning;
import org.apache.flink.autoscaler.utils.AutoScalerUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -163,7 +164,8 @@ public class ScalingMetrics {
public static Map<ScalingMetric, Double> computeGlobalMetrics(
Map<FlinkMetric, Metric> collectedJmMetrics,
- Map<FlinkMetric, AggregatedMetric> collectedTmMetrics) {
+ Map<FlinkMetric, AggregatedMetric> collectedTmMetrics,
+ Configuration conf) {
if (collectedTmMetrics == null) {
return null;
}
@@ -192,7 +194,19 @@ public class ScalingMetrics {
var heapMax = collectedTmMetrics.get(FlinkMetric.HEAP_MAX);
var heapUsed = collectedTmMetrics.get(FlinkMetric.HEAP_USED);
if (heapMax != null && heapUsed != null) {
- out.put(ScalingMetric.HEAP_USAGE, heapUsed.getMax() /
heapMax.getMax());
+ MemoryTuning.HeapUsageTarget heapTarget =
+ conf.get(AutoScalerOptions.MEMORY_TUNING_HEAP_TARGET);
+ switch (heapTarget) {
+ case AVG:
+ out.put(ScalingMetric.HEAP_USED, heapUsed.getAvg());
+ break;
+ case MAX:
+ out.put(ScalingMetric.HEAP_USED, heapUsed.getMax());
+ break;
+ default:
+ LOG.warn("Unknown value {} for heap target", heapTarget);
+ }
+ out.put(ScalingMetric.HEAP_MAX_USAGE_RATIO, heapUsed.getMax() /
heapMax.getMax());
}
return out;
@@ -209,6 +223,7 @@ public class ScalingMetrics {
}
}
+ // TODO: FLINK-34213: Consider using accumulated busy time instead of
busyMsPerSecond
private static double getBusyTimeMsPerSecond(
Map<FlinkMetric, AggregatedMetric> flinkMetrics,
Configuration conf,
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/ScalingRealizer.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/ScalingRealizer.java
index b4895648..aba94157 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/ScalingRealizer.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/ScalingRealizer.java
@@ -19,6 +19,7 @@ package org.apache.flink.autoscaler.realizer;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.configuration.Configuration;
import java.util.Map;
@@ -33,5 +34,8 @@ import java.util.Map;
public interface ScalingRealizer<KEY, Context extends
JobAutoScalerContext<KEY>> {
/** Update job's parallelism to parallelismOverrides. */
- void realize(Context context, Map<String, String> parallelismOverrides);
+ void realizeParallelismOverrides(Context context, Map<String, String>
parallelismOverrides);
+
+ /** Updates the TaskManager memory configuration. */
+ void realizeConfigOverrides(Context context, Configuration
configOverrides);
}
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
index 2d2c60de..30be323f 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
@@ -22,6 +22,7 @@ import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.ScalingSummary;
import org.apache.flink.autoscaler.ScalingTracking;
import org.apache.flink.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import javax.annotation.Nonnull;
@@ -69,6 +70,13 @@ public interface AutoScalerStateStore<KEY, Context extends
JobAutoScalerContext<
void removeParallelismOverrides(Context jobContext) throws Exception;
+ void storeConfigOverrides(Context jobContext, Configuration
configOverrides) throws Exception;
+
+ @Nonnull
+ Configuration getConfigOverrides(Context jobContext) throws Exception;
+
+ void removeConfigOverrides(Context jobContext) throws Exception;
+
/** Removes all data from this context. Flush stil needs to be called. */
void clearAll(Context jobContext) throws Exception;
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
index 6c5b7a29..60a9342e 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
@@ -21,8 +21,11 @@ import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.ScalingSummary;
import org.apache.flink.autoscaler.ScalingTracking;
import org.apache.flink.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import javax.annotation.Nonnull;
+
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
@@ -47,6 +50,8 @@ public class InMemoryAutoScalerStateStore<KEY, Context
extends JobAutoScalerCont
private final Map<KEY, Map<String, String>> parallelismOverridesStore;
+ private final Map<KEY, Configuration> tmConfigOverrides;
+
private final Map<KEY, ScalingTracking> scalingTrackingStore;
public InMemoryAutoScalerStateStore() {
@@ -54,6 +59,7 @@ public class InMemoryAutoScalerStateStore<KEY, Context
extends JobAutoScalerCont
collectedMetricsStore = new ConcurrentHashMap<>();
parallelismOverridesStore = new ConcurrentHashMap<>();
scalingTrackingStore = new ConcurrentHashMap<>();
+ tmConfigOverrides = new ConcurrentHashMap<>();
}
@Override
@@ -115,6 +121,24 @@ public class InMemoryAutoScalerStateStore<KEY, Context
extends JobAutoScalerCont
.orElse(new HashMap<>());
}
+ @Override
+ public void storeConfigOverrides(Context jobContext, Configuration
configOverrides)
+ throws Exception {
+ tmConfigOverrides.put(jobContext.getJobKey(), configOverrides);
+ }
+
+ @Nonnull
+ @Override
+ public Configuration getConfigOverrides(Context jobContext) {
+ return
Optional.ofNullable(tmConfigOverrides.get(jobContext.getJobKey()))
+ .orElse(new Configuration());
+ }
+
+ @Override
+ public void removeConfigOverrides(Context jobContext) {
+ tmConfigOverrides.remove(jobContext.getJobKey());
+ }
+
@Override
public void removeParallelismOverrides(Context jobContext) {
parallelismOverridesStore.remove(jobContext.getJobKey());
@@ -125,6 +149,7 @@ public class InMemoryAutoScalerStateStore<KEY, Context
extends JobAutoScalerCont
scalingHistoryStore.remove(jobContext.getJobKey());
parallelismOverridesStore.remove(jobContext.getJobKey());
collectedMetricsStore.remove(jobContext.getJobKey());
+ tmConfigOverrides.remove(jobContext.getJobKey());
}
@Override
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java
new file mode 100644
index 00000000..b4cd00e5
--- /dev/null
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.tuning;
+
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.runtime.util.config.memory.CommonProcessMemorySpec;
+import
org.apache.flink.runtime.util.config.memory.JvmMetaspaceAndOverheadOptions;
+import org.apache.flink.runtime.util.config.memory.ProcessMemoryOptions;
+import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils;
+import
org.apache.flink.runtime.util.config.memory.taskmanager.TaskExecutorFlinkMemory;
+import
org.apache.flink.runtime.util.config.memory.taskmanager.TaskExecutorFlinkMemoryUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Map;
+
+/** Tunes the TaskManager memory. */
+public class MemoryTuning {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MemoryTuning.class);
+ public static final ProcessMemoryUtils<TaskExecutorFlinkMemory>
FLINK_MEMORY_UTILS =
+ new ProcessMemoryUtils<>(getMemoryOptions(), new
TaskExecutorFlinkMemoryUtils());
+
+ private static final Configuration EMPTY_CONFIG = new Configuration();
+
+ /** What memory usage to target. */
+ public enum HeapUsageTarget {
+ AVG,
+ MAX
+ }
+
+ /**
+ * Emits a Configuration which contains overrides for the current
configuration. We are not
+ * modifying the config directly, but we are emitting a new configuration
which contains any
+ * overrides. This config is persisted separately and applied by the
autoscaler. That way we can
+ * clear any applied overrides if auto-tuning is disabled.
+ */
+ public static Configuration tuneTaskManagerHeapMemory(
+ JobAutoScalerContext<?> context,
+ EvaluatedMetrics evaluatedMetrics,
+ AutoScalerEventHandler eventHandler) {
+
+ // Please note that this config is the original configuration created
from the user spec.
+ // It does not contain any already applied overrides.
+ var config = new UnmodifiableConfiguration(context.getConfiguration());
+
+ // Gather original memory configuration from the user spec
+ CommonProcessMemorySpec<TaskExecutorFlinkMemory> memSpecs;
+ try {
+ memSpecs = FLINK_MEMORY_UTILS.memoryProcessSpecFromConfig(config);
+ } catch (IllegalConfigurationException e) {
+ LOG.warn("Current memory configuration is not valid. Aborting
memory tuning.");
+ return EMPTY_CONFIG;
+ }
+
+ var maxHeapSize = memSpecs.getFlinkMemory().getJvmHeapMemorySize();
+ LOG.debug("Current configured heap size: {}", maxHeapSize);
+
+ MemorySize newHeapSize = determineNewHeapSize(evaluatedMetrics,
config, maxHeapSize);
+ LOG.info("New TM heap memory {}", newHeapSize.toHumanReadableString());
+
+ // Diff can be negative (memory shrinks) or positive (memory grows)
+ final long heapDiffBytes = newHeapSize.getBytes() -
maxHeapSize.getBytes();
+
+ final MemorySize totalMemory = adjustTotalTmMemory(context,
heapDiffBytes);
+ if (totalMemory.equals(MemorySize.ZERO)) {
+ return EMPTY_CONFIG;
+ }
+
+ // Prepare the tuning config for new configuration values
+ var tuningConfig = new Configuration();
+ // Update total memory according to new heap size
+ // Adjust the total container memory and the JVM heap size accordingly.
+ tuningConfig.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, totalMemory);
+ // Framework and Task heap memory configs add up together yield the
max heap memory.
+ // To simplify the calculation, set the framework heap memory to zero.
+ tuningConfig.set(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY,
MemorySize.ZERO);
+ tuningConfig.set(TaskManagerOptions.TASK_HEAP_MEMORY, newHeapSize);
+
+ // All memory options which can be configured via fractions need to be
set to their
+ // absolute values or, if there is no absolute setting, the fractions
need to be
+ // re-calculated.
+ MemorySize managedMemory = memSpecs.getFlinkMemory().getManaged();
+ if (shouldTransferHeapToManagedMemory(config, heapDiffBytes)) {
+ // If RocksDB is configured, give back the heap memory as managed
memory to RocksDB
+ MemorySize newManagedMemory =
+ new MemorySize(managedMemory.getBytes() +
Math.abs(heapDiffBytes));
+ LOG.info(
+ "Increasing managed memory size from {} to {}",
+ managedMemory,
+ newManagedMemory);
+ tuningConfig.set(TaskManagerOptions.MANAGED_MEMORY_SIZE,
newManagedMemory);
+ } else {
+ tuningConfig.set(TaskManagerOptions.MANAGED_MEMORY_SIZE,
managedMemory);
+ }
+
+ tuningConfig.set(
+ TaskManagerOptions.NETWORK_MEMORY_FRACTION,
+ getFraction(
+ memSpecs.getFlinkMemory().getNetwork(),
+ new MemorySize(
+ memSpecs.getTotalFlinkMemorySize().getBytes()
+ heapDiffBytes)));
+ tuningConfig.set(
+ TaskManagerOptions.JVM_OVERHEAD_FRACTION,
+ getFraction(memSpecs.getJvmOverheadSize(), totalMemory));
+
+ eventHandler.handleEvent(
+ context,
+ AutoScalerEventHandler.Type.Normal,
+ "Configuration recommendation",
+ String.format(
+ "Memory tuning recommends the following configuration
(automatic tuning is %s):\n%s",
+ config.get(AutoScalerOptions.MEMORY_TUNING_ENABLED)
+ ? "enabled"
+ : "disabled",
+ formatConfig(tuningConfig)),
+ "MemoryTuning",
+ config.get(AutoScalerOptions.SCALING_EVENT_INTERVAL));
+
+ if
(!context.getConfiguration().get(AutoScalerOptions.MEMORY_TUNING_ENABLED)) {
+ return EMPTY_CONFIG;
+ }
+
+ return tuningConfig;
+ }
+
+ private static MemorySize determineNewHeapSize(
+ EvaluatedMetrics evaluatedMetrics, Configuration config,
MemorySize maxHeapSize) {
+
+ double overheadFactor = 1 +
config.get(AutoScalerOptions.MEMORY_TUNING_HEAP_OVERHEAD);
+ long heapTargetSizeBytes =
+ (long) (getHeapUsed(evaluatedMetrics).getBytes() *
overheadFactor);
+
+ // Apply min/max heap size limits
+ heapTargetSizeBytes =
+ Math.min(
+ Math.max(
+ // Lower limit is the minimum configured heap
size
+
config.get(AutoScalerOptions.MEMORY_TUNING_MIN_HEAP).getBytes(),
+ heapTargetSizeBytes),
+ // Upper limit is the original max heap size in the
spec
+ maxHeapSize.getBytes());
+
+ return new MemorySize(heapTargetSizeBytes);
+ }
+
+ private static MemorySize getHeapUsed(EvaluatedMetrics evaluatedMetrics) {
+ var globalMetrics = evaluatedMetrics.getGlobalMetrics();
+ MemorySize heapUsed =
+ new MemorySize((long)
globalMetrics.get(ScalingMetric.HEAP_USED).getAverage());
+ LOG.info("TM heap used size: {}", heapUsed);
+ return heapUsed;
+ }
+
+ private static boolean shouldTransferHeapToManagedMemory(
+ Configuration config, long heapDiffBytes) {
+ return
config.get(AutoScalerOptions.MEMORY_TUNING_TRANSFER_HEAP_TO_MANAGED)
+ && heapDiffBytes < 0
+ &&
"rocksdb".equalsIgnoreCase(config.get(StateBackendOptions.STATE_BACKEND));
+ }
+
+ public static MemorySize getTotalMemory(Configuration config,
JobAutoScalerContext<?> ctx) {
+ MemorySize overrideSize =
config.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY);
+ if (overrideSize != null) {
+ return overrideSize;
+ }
+ return ctx.getTaskManagerMemory().orElse(MemorySize.ZERO);
+ }
+
+ private static MemorySize adjustTotalTmMemory(JobAutoScalerContext<?> ctx,
long heapDiffBytes) {
+
+ var specTaskManagerMemory =
ctx.getTaskManagerMemory().orElse(MemorySize.ZERO);
+ if (specTaskManagerMemory.compareTo(MemorySize.ZERO) <= 0) {
+ LOG.warn("Spec TaskManager memory size could not be determined.");
+ return MemorySize.ZERO;
+ }
+
+ if (shouldTransferHeapToManagedMemory(ctx.getConfiguration(),
heapDiffBytes)) {
+ // Total size does not change
+ return specTaskManagerMemory;
+ }
+
+ long newTotalMemBytes = specTaskManagerMemory.getBytes() +
heapDiffBytes;
+ // TM container memory can never grow beyond the user-specified max
memory
+ newTotalMemBytes = Math.min(newTotalMemBytes,
specTaskManagerMemory.getBytes());
+
+ MemorySize totalMemory = new MemorySize(newTotalMemBytes);
+ LOG.info("Setting new total TaskManager memory to {}", totalMemory);
+ return totalMemory;
+ }
+
+ private static ProcessMemoryOptions getMemoryOptions() {
+ return new ProcessMemoryOptions(
+ Arrays.asList(
+ TaskManagerOptions.TASK_HEAP_MEMORY,
+ TaskManagerOptions.MANAGED_MEMORY_SIZE),
+ TaskManagerOptions.TOTAL_FLINK_MEMORY,
+ TaskManagerOptions.TOTAL_PROCESS_MEMORY,
+ new JvmMetaspaceAndOverheadOptions(
+ TaskManagerOptions.JVM_METASPACE,
+ TaskManagerOptions.JVM_OVERHEAD_MIN,
+ TaskManagerOptions.JVM_OVERHEAD_MAX,
+ TaskManagerOptions.JVM_OVERHEAD_FRACTION));
+ }
+
+ private static float getFraction(MemorySize enumerator, MemorySize
denominator) {
+ // Round to three decimal places
+ return (float)
+ (Math.round(enumerator.getBytes() / (double)
denominator.getBytes() * 1000)
+ / 1000.);
+ }
+
+ /** Format config such that it can be directly used as a Flink
configuration. */
+ private static String formatConfig(Configuration config) {
+ var sb = new StringBuilder();
+ for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
+ sb.append(entry.getKey())
+ .append(": ")
+ .append(entry.getValue())
+ .append(System.lineSeparator());
+ }
+ return sb.toString();
+ }
+}
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
index a7d80ee2..bb1962f5 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
@@ -31,7 +31,10 @@ import
org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
import org.apache.flink.autoscaler.topology.JobTopology;
import org.apache.flink.autoscaler.topology.VertexInfo;
import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
@@ -44,6 +47,8 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import javax.annotation.Nullable;
+
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
@@ -53,6 +58,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import static java.util.Map.entry;
import static
org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext;
import static
org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
import static
org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
@@ -288,6 +294,33 @@ public class JobAutoScalerImplTest {
assertParallelismOverrides(Map.of(v1.toString(), "1", v2.toString(),
"4"));
}
+ @Test
+ void testApplyConfigOverrides() throws Exception {
+
context.getConfiguration().set(AutoScalerOptions.MEMORY_TUNING_ENABLED, true);
+ var autoscaler =
+ new JobAutoScalerImpl<>(
+ null, null, null, eventCollector, scalingRealizer,
stateStore);
+
+ // Initially we should return empty overrides, do not crate any state
+ assertThat(stateStore.getConfigOverrides(context).toMap()).isEmpty();
+
+ var config = new Configuration();
+ config.set(TaskManagerOptions.TASK_HEAP_MEMORY, new MemorySize(42));
+ stateStore.storeConfigOverrides(context, config);
+ stateStore.flush(context);
+
+ autoscaler.applyConfigOverrides(context);
+ assertThat(getEvent().getConfigOverrides().toMap())
+
.containsExactly(entry(TaskManagerOptions.TASK_HEAP_MEMORY.key(), "42 bytes"));
+ assertThat(stateStore.getConfigOverrides(context)).isEqualTo(config);
+
+ // Disabling autoscaler should clear overrides
+ context.getConfiguration().setString(AUTOSCALER_ENABLED.key(),
"false");
+ autoscaler.scale(context);
+ autoscaler.applyConfigOverrides(context);
+ assertThat(getEvent().getConfigOverrides().toMap()).isEmpty();
+ }
+
@Test
void testAutoscalerDisabled() throws Exception {
context.getConfiguration().setBoolean(AUTOSCALER_ENABLED, false);
@@ -320,12 +353,30 @@ public class JobAutoScalerImplTest {
private void assertParallelismOverrides(Map<String, String>
expectedOverrides) {
TestingScalingRealizer.Event<JobID, JobAutoScalerContext<JobID>>
scalingEvent;
- scalingEvent = scalingRealizer.events.poll();
- if (expectedOverrides == null) {
- assertThat(scalingEvent).isNull();
+ do {
+ scalingEvent = getEvent();
+ } while (scalingEvent != null &&
scalingEvent.getParallelismOverrides() == null);
+
+ if (scalingEvent == null) {
return;
}
assertThat(scalingEvent).isNotNull();
assertEquals(expectedOverrides,
scalingEvent.getParallelismOverrides());
}
+
+ @Nullable
+ private TestingScalingRealizer.Event<JobID, JobAutoScalerContext<JobID>>
getEvent(
+ Map<String, String> expectedOverrides) {
+ var scalingEvent = getEvent();
+ if (expectedOverrides == null) {
+ assertThat(scalingEvent).isNull();
+ return null;
+ }
+ return scalingEvent;
+ }
+
+ @Nullable
+ private TestingScalingRealizer.Event<JobID, JobAutoScalerContext<JobID>>
getEvent() {
+ return scalingRealizer.events.poll();
+ }
}
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
index 46f85489..456105d2 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
@@ -24,7 +24,6 @@ import
org.apache.flink.autoscaler.event.TestingEventCollector;
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MemorySize;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -72,8 +71,6 @@ public class JobVertexScalerTest {
JobStatus.RUNNING,
conf,
new UnregisteredMetricsGroup(),
- 0,
- MemorySize.ZERO,
null);
restartTime = conf.get(AutoScalerOptions.RESTART_TIME);
}
@@ -503,7 +500,7 @@ public class JobVertexScalerTest {
ScalingMetric.TRUE_PROCESSING_RATE,
new EvaluatedScalingMetric(trueProcessingRate,
trueProcessingRate));
metrics.put(ScalingMetric.GC_PRESSURE,
EvaluatedScalingMetric.of(Double.NaN));
- metrics.put(ScalingMetric.HEAP_USAGE,
EvaluatedScalingMetric.of(Double.NaN));
+ metrics.put(ScalingMetric.HEAP_MAX_USAGE_RATIO,
EvaluatedScalingMetric.of(Double.NaN));
ScalingMetricEvaluator.computeProcessingRateThresholds(metrics, conf,
false, restartTime);
return metrics;
}
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
index cf866c01..9f10e19f 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
@@ -29,7 +29,6 @@ import org.apache.flink.autoscaler.state.AutoScalerStateStore;
import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
import org.apache.flink.autoscaler.topology.JobTopology;
import org.apache.flink.autoscaler.topology.VertexInfo;
-import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
@@ -252,8 +251,6 @@ public class RecommendedParallelismTest {
JobStatus.CREATED,
context.getConfiguration(),
context.getMetricGroup(),
- 0,
- MemorySize.ZERO,
getRestClusterClientSupplier());
}
@@ -266,8 +263,6 @@ public class RecommendedParallelismTest {
JobStatus.RUNNING,
context.getConfiguration(),
context.getMetricGroup(),
- 0,
- MemorySize.ZERO,
getRestClusterClientSupplier());
}
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java
index 462fd21c..863950af 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobStatus;
import org.apache.flink.autoscaler.metrics.FlinkMetric;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MemorySize;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -115,8 +114,6 @@ public class RestApiMetricsCollectorTest {
JobStatus.RUNNING,
conf,
new UnregisteredMetricsGroup(),
- 0,
- MemorySize.ZERO,
() -> restClusterClient);
var jobVertexIDMapMap = collector.queryAllAggregatedMetrics(context,
metrics);
@@ -228,8 +225,6 @@ public class RestApiMetricsCollectorTest {
JobStatus.RUNNING,
conf,
new UnregisteredMetricsGroup(),
- 0,
- MemorySize.ZERO,
() -> client);
var collector = new RestApiMetricsCollector<JobID,
JobAutoScalerContext<JobID>>();
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
index c3ef9296..c5591a76 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.autoscaler.state.AutoScalerStateStore;
import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.junit.jupiter.api.BeforeEach;
@@ -68,7 +69,7 @@ public class ScalingExecutorTest {
private static final Map<ScalingMetric, EvaluatedScalingMetric>
dummyGlobalMetrics =
Map.of(
ScalingMetric.GC_PRESSURE,
EvaluatedScalingMetric.of(Double.NaN),
- ScalingMetric.HEAP_USAGE,
EvaluatedScalingMetric.of(Double.NaN));
+ ScalingMetric.HEAP_MAX_USAGE_RATIO,
EvaluatedScalingMetric.of(Double.NaN));
@BeforeEach
public void setup() {
@@ -223,7 +224,7 @@ public class ScalingExecutorTest {
EvaluatedScalingMetric.of(9),
ScalingMetric.GC_PRESSURE,
EvaluatedScalingMetric.of(Double.NaN),
- ScalingMetric.HEAP_USAGE,
+ ScalingMetric.HEAP_MAX_USAGE_RATIO,
EvaluatedScalingMetric.of(Double.NaN)));
// Would normally scale without resource usage check
@@ -256,6 +257,43 @@ public class ScalingExecutorTest {
now));
}
+ @Test
+ public void testMemoryTuning() throws Exception {
+ context = TestingAutoscalerUtils.createResourceAwareContext();
+
context.getConfiguration().set(AutoScalerOptions.MEMORY_TUNING_ENABLED, true);
+ context.getConfiguration().set(TaskManagerOptions.NUM_TASK_SLOTS, 5);
+ context.getConfiguration()
+ .set(TaskManagerOptions.TOTAL_PROCESS_MEMORY,
MemorySize.parse("30 gb"));
+
+ var source = new JobVertexID();
+ var sink = new JobVertexID();
+ var now = Instant.now();
+
+ var globalMetrics =
+ Map.of(
+ ScalingMetric.NUM_TASK_SLOTS_USED,
+ EvaluatedScalingMetric.of(9),
+ ScalingMetric.HEAP_USED,
+ EvaluatedScalingMetric.avg(MemorySize.parse("5
Gb").getBytes()),
+ ScalingMetric.HEAP_MAX_USAGE_RATIO,
+ EvaluatedScalingMetric.of(Double.NaN),
+ ScalingMetric.GC_PRESSURE,
+ EvaluatedScalingMetric.of(Double.NaN));
+ var vertexMetrics =
+ Map.of(source, evaluated(10, 50, 100, 50, 0), sink,
evaluated(10, 50, 100, 50, 0));
+ var metrics = new EvaluatedMetrics(vertexMetrics, globalMetrics);
+
+ assertTrue(
+ scalingDecisionExecutor.scaleResource(
+ context, metrics, new HashMap<>(), new
ScalingTracking(), now));
+ assertEquals(
+ "6.000gb (6442450944 bytes)",
+ stateStore
+ .getConfigOverrides(context)
+ .get(TaskManagerOptions.TASK_HEAP_MEMORY)
+ .toHumanReadableString());
+ }
+
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testScalingEventsWith0IntervalConfig(boolean scalingEnabled)
throws Exception {
@@ -350,7 +388,7 @@ public class ScalingExecutorTest {
Map.of(
ScalingMetric.GC_PRESSURE,
EvaluatedScalingMetric.of(Double.NaN),
- ScalingMetric.HEAP_USAGE,
+ ScalingMetric.HEAP_MAX_USAGE_RATIO,
EvaluatedScalingMetric.of(Double.NaN)));
// Baseline, no GC/Heap metrics
@@ -365,7 +403,7 @@ public class ScalingExecutorTest {
Map.of(
ScalingMetric.GC_PRESSURE,
EvaluatedScalingMetric.of(0.49),
- ScalingMetric.HEAP_USAGE,
+ ScalingMetric.HEAP_MAX_USAGE_RATIO,
new EvaluatedScalingMetric(0.9, 0.79)));
assertTrue(
scalingDecisionExecutor.scaleResource(
@@ -380,7 +418,7 @@ public class ScalingExecutorTest {
Map.of(
ScalingMetric.GC_PRESSURE,
EvaluatedScalingMetric.of(0.51),
- ScalingMetric.HEAP_USAGE,
+ ScalingMetric.HEAP_MAX_USAGE_RATIO,
new EvaluatedScalingMetric(0.9, 0.79)));
assertFalse(
scalingDecisionExecutor.scaleResource(
@@ -395,7 +433,7 @@ public class ScalingExecutorTest {
Map.of(
ScalingMetric.GC_PRESSURE,
EvaluatedScalingMetric.of(0.49),
- ScalingMetric.HEAP_USAGE,
+ ScalingMetric.HEAP_MAX_USAGE_RATIO,
new EvaluatedScalingMetric(0.6, 0.81)));
assertFalse(
scalingDecisionExecutor.scaleResource(
@@ -405,10 +443,11 @@ public class ScalingExecutorTest {
}
private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(
- int parallelism, double target, double procRate, double
catchupRate) {
+ int parallelism, double current, double target, double procRate,
double catchupRate) {
var metrics = new HashMap<ScalingMetric, EvaluatedScalingMetric>();
metrics.put(ScalingMetric.PARALLELISM,
EvaluatedScalingMetric.of(parallelism));
metrics.put(ScalingMetric.MAX_PARALLELISM,
EvaluatedScalingMetric.of(720));
+ metrics.put(ScalingMetric.CURRENT_PROCESSING_RATE,
EvaluatedScalingMetric.avg(current));
metrics.put(ScalingMetric.TARGET_DATA_RATE, new
EvaluatedScalingMetric(target, target));
metrics.put(ScalingMetric.CATCH_UP_DATA_RATE,
EvaluatedScalingMetric.of(catchupRate));
metrics.put(
@@ -420,6 +459,11 @@ public class ScalingExecutorTest {
return metrics;
}
+ private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(
+ int parallelism, double target, double procRate, double
catchupRate) {
+ return evaluated(parallelism, Double.NaN, target, procRate,
catchupRate);
+ }
+
private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(
int parallelism, double target, double procRate) {
return evaluated(parallelism, target, procRate, 0.);
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
index 0bfbd989..cbbc2b0e 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
@@ -576,10 +576,12 @@ public class ScalingMetricEvaluatorTest {
assertEquals(
Map.of(
- ScalingMetric.HEAP_USAGE,
+ ScalingMetric.HEAP_MAX_USAGE_RATIO,
EvaluatedScalingMetric.of(Double.NaN),
ScalingMetric.GC_PRESSURE,
EvaluatedScalingMetric.of(Double.NaN),
+ ScalingMetric.HEAP_USED,
+ EvaluatedScalingMetric.of(Double.NaN),
ScalingMetric.NUM_TASK_SLOTS_USED,
EvaluatedScalingMetric.of(Double.NaN)),
ScalingMetricEvaluator.evaluateGlobalMetrics(globalMetrics));
@@ -589,13 +591,21 @@ public class ScalingMetricEvaluatorTest {
new CollectedMetrics(
Map.of(),
Map.of(),
- Map.of(ScalingMetric.HEAP_USAGE, 0.5,
ScalingMetric.GC_PRESSURE, 0.6)));
+ Map.of(
+ ScalingMetric.HEAP_MAX_USAGE_RATIO,
+ 0.5,
+ ScalingMetric.GC_PRESSURE,
+ 0.6,
+ ScalingMetric.HEAP_USED,
+ 512.)));
assertEquals(
Map.of(
- ScalingMetric.HEAP_USAGE,
+ ScalingMetric.HEAP_MAX_USAGE_RATIO,
new EvaluatedScalingMetric(0.5, 0.5),
ScalingMetric.GC_PRESSURE,
EvaluatedScalingMetric.of(0.6),
+ ScalingMetric.HEAP_USED,
+ new EvaluatedScalingMetric(512, 512),
ScalingMetric.NUM_TASK_SLOTS_USED,
EvaluatedScalingMetric.of(Double.NaN)),
ScalingMetricEvaluator.evaluateGlobalMetrics(globalMetrics));
@@ -606,18 +616,22 @@ public class ScalingMetricEvaluatorTest {
Map.of(),
Map.of(),
Map.of(
- ScalingMetric.HEAP_USAGE,
+ ScalingMetric.HEAP_MAX_USAGE_RATIO,
0.7,
ScalingMetric.GC_PRESSURE,
0.8,
+ ScalingMetric.HEAP_USED,
+ 1024.,
ScalingMetric.NUM_TASK_SLOTS_USED,
42.)));
assertEquals(
Map.of(
- ScalingMetric.HEAP_USAGE,
+ ScalingMetric.HEAP_MAX_USAGE_RATIO,
new EvaluatedScalingMetric(0.7, 0.6),
ScalingMetric.GC_PRESSURE,
EvaluatedScalingMetric.of(0.8),
+ ScalingMetric.HEAP_USED,
+ new EvaluatedScalingMetric(1024., 768.),
ScalingMetric.NUM_TASK_SLOTS_USED,
EvaluatedScalingMetric.of(42.)),
ScalingMetricEvaluator.evaluateGlobalMetrics(globalMetrics));
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/TestingAutoscalerUtils.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/TestingAutoscalerUtils.java
index 163134d9..9869e4a3 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/TestingAutoscalerUtils.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/TestingAutoscalerUtils.java
@@ -32,6 +32,7 @@ import org.apache.flink.util.function.SupplierWithException;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
/** The utils for test autoscaler. */
@@ -47,8 +48,6 @@ public class TestingAutoscalerUtils {
JobStatus.RUNNING,
new Configuration(),
metricGroup,
- 0,
- MemorySize.ZERO,
getRestClusterClientSupplier());
}
@@ -62,17 +61,15 @@ public class TestingAutoscalerUtils {
JobStatus.RUNNING,
new Configuration(),
metricGroup,
- 0,
- MemorySize.ZERO,
TestingAutoscalerUtils.getRestClusterClientSupplier()) {
@Override
- public double getTaskManagerCpu() {
- return 100;
+ public Optional<Double> getTaskManagerCpu() {
+ return Optional.of(100.);
}
@Override
- public MemorySize getTaskManagerMemory() {
- return MemorySize.parse("65536 bytes");
+ public Optional<MemorySize> getTaskManagerMemory() {
+ return Optional.of(MemorySize.parse("30 gb"));
}
};
}
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
index 9fb8b8b9..fe640711 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.autoscaler.metrics;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.topology.JobTopology;
import org.apache.flink.autoscaler.topology.VertexInfo;
+import org.apache.flink.autoscaler.tuning.MemoryTuning;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
@@ -482,22 +483,43 @@ public class ScalingMetricsTest {
@Test
public void testGlobalMetrics() {
- assertEquals(Map.of(), ScalingMetrics.computeGlobalMetrics(Map.of(),
Map.of()));
+ Configuration conf = new Configuration();
+ conf.set(AutoScalerOptions.MEMORY_TUNING_HEAP_TARGET,
MemoryTuning.HeapUsageTarget.AVG);
+ assertEquals(Map.of(), ScalingMetrics.computeGlobalMetrics(Map.of(),
Map.of(), conf));
assertEquals(
Map.of(),
ScalingMetrics.computeGlobalMetrics(
- Map.of(), Map.of(FlinkMetric.HEAP_USED, aggMax(100))));
+ Map.of(), Map.of(FlinkMetric.HEAP_USED, aggMax(100)),
conf));
assertEquals(
- Map.of(ScalingMetric.HEAP_USAGE, 0.5,
ScalingMetric.GC_PRESSURE, 0.25),
+ Map.of(
+ ScalingMetric.HEAP_MAX_USAGE_RATIO,
+ 0.5,
+ ScalingMetric.GC_PRESSURE,
+ 0.25,
+ ScalingMetric.HEAP_USED,
+ 75.),
ScalingMetrics.computeGlobalMetrics(
Map.of(),
Map.of(
FlinkMetric.HEAP_USED,
- aggMax(100),
+ aggAvgMax(75, 100),
FlinkMetric.HEAP_MAX,
aggMax(200.),
FlinkMetric.TOTAL_GC_TIME_PER_SEC,
- aggMax(250.))));
+ aggMax(250.)),
+ conf));
+
+ conf.set(AutoScalerOptions.MEMORY_TUNING_HEAP_TARGET,
MemoryTuning.HeapUsageTarget.MAX);
+ assertEquals(
+ Map.of(ScalingMetric.HEAP_MAX_USAGE_RATIO, 0.5,
ScalingMetric.HEAP_USED, 100.),
+ ScalingMetrics.computeGlobalMetrics(
+ Map.of(),
+ Map.of(
+ FlinkMetric.HEAP_USED,
+ aggAvgMax(75, 100),
+ FlinkMetric.HEAP_MAX,
+ aggMax(200.)),
+ conf));
}
private static AggregatedMetric aggSum(double sum) {
@@ -507,4 +529,8 @@ public class ScalingMetricsTest {
private static AggregatedMetric aggMax(double max) {
return new AggregatedMetric("", Double.NaN, max, Double.NaN,
Double.NaN);
}
+
+ private static AggregatedMetric aggAvgMax(double avg, double max) {
+ return new AggregatedMetric("", Double.NaN, max, avg, Double.NaN);
+ }
}
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/realizer/TestingScalingRealizer.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/realizer/TestingScalingRealizer.java
index 1e5031c1..d46c5bba 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/realizer/TestingScalingRealizer.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/realizer/TestingScalingRealizer.java
@@ -18,6 +18,9 @@
package org.apache.flink.autoscaler.realizer;
import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.configuration.Configuration;
+
+import lombok.Getter;
import java.util.LinkedList;
import java.util.Map;
@@ -29,28 +32,45 @@ public class TestingScalingRealizer<KEY, Context extends
JobAutoScalerContext<KE
public final LinkedList<Event<KEY, Context>> events = new LinkedList<>();
@Override
- public void realize(Context context, Map<String, String>
parallelismOverrides) {
+ public void realizeParallelismOverrides(
+ Context context, Map<String, String> parallelismOverrides) {
events.add(new Event<>(context, parallelismOverrides));
}
+ @Override
+ public void realizeConfigOverrides(Context context, Configuration
configOverrides) {
+ events.add(new Event<>(context, configOverrides));
+ }
+
/** The collected event. */
public static class Event<KEY, Context extends JobAutoScalerContext<KEY>> {
- private final Context context;
+ @Getter private final Context context;
+
+ @Getter private Map<String, String> parallelismOverrides;
- private final Map<String, String> parallelismOverrides;
+ @Getter private Configuration configOverrides;
public Event(Context context, Map<String, String>
parallelismOverrides) {
this.context = context;
this.parallelismOverrides = parallelismOverrides;
}
- public Context getContext() {
- return context;
+ public Event(Context context, Configuration configOverrides) {
+ this.context = context;
+ this.configOverrides = configOverrides;
}
- public Map<String, String> getParallelismOverrides() {
- return parallelismOverrides;
+ @Override
+ public String toString() {
+ return "Event{"
+ + "context="
+ + context
+ + ", parallelismOverrides="
+ + parallelismOverrides
+ + ", configOverrides="
+ + configOverrides
+ + '}';
}
}
}
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java
index dcab0c4c..11db27c0 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java
@@ -21,6 +21,7 @@ import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.ScalingSummary;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.junit.jupiter.api.Assertions;
@@ -183,21 +184,26 @@ public abstract class AbstractAutoScalerStateStoreTest<
new JobVertexID(),
new TreeMap<>(Map.of(Instant.now(), new
ScalingSummary()))));
stateStore.storeParallelismOverrides(ctx, Map.of(new
JobVertexID().toHexString(), "23"));
+ stateStore.storeConfigOverrides(
+ ctx, Configuration.fromMap(Map.of("config.value", "value")));
assertThat(stateStore.getCollectedMetrics(ctx)).isNotEmpty();
assertThat(stateStore.getScalingHistory(ctx)).isNotEmpty();
assertThat(stateStore.getParallelismOverrides(ctx)).isNotEmpty();
+ assertThat(stateStore.getConfigOverrides(ctx).toMap()).isNotEmpty();
stateStore.flush(ctx);
assertThat(stateStore.getCollectedMetrics(ctx)).isNotEmpty();
assertThat(stateStore.getScalingHistory(ctx)).isNotEmpty();
assertThat(stateStore.getParallelismOverrides(ctx)).isNotEmpty();
+ assertThat(stateStore.getConfigOverrides(ctx).toMap()).isNotEmpty();
stateStore.clearAll(ctx);
assertThat(stateStore.getCollectedMetrics(ctx)).isEmpty();
assertThat(stateStore.getScalingHistory(ctx)).isEmpty();
assertThat(stateStore.getParallelismOverrides(ctx)).isEmpty();
+ assertThat(stateStore.getConfigOverrides(ctx).toMap()).isEmpty();
}
}
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/MemoryTuningTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/MemoryTuningTest.java
new file mode 100644
index 00000000..a94b4902
--- /dev/null
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/MemoryTuningTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.utils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.TestingAutoscalerUtils;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.event.TestingEventCollector;
+import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.tuning.MemoryTuning;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Map;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+/** Tests for {@link MemoryTuning}. */
+public class MemoryTuningTest {
+
+ TestingEventCollector<JobID, JobAutoScalerContext<JobID>> eventHandler =
+ new TestingEventCollector<>();
+
+ @Test
+ void testMemoryTuning() {
+ var context = TestingAutoscalerUtils.createResourceAwareContext();
+ var config = context.getConfiguration();
+ config.set(AutoScalerOptions.MEMORY_TUNING_ENABLED, true);
+ config.set(TaskManagerOptions.NUM_TASK_SLOTS, 5);
+ config.set(AutoScalerOptions.SCALING_EVENT_INTERVAL, Duration.ZERO);
+ MemorySize totalMemory = MemorySize.parse("30 gb");
+ config.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, totalMemory);
+
+ var jobVertex1 = new JobVertexID();
+ var jobVertex2 = new JobVertexID();
+ var vertexMetrics =
+ Map.of(
+ jobVertex1,
+ Map.of(
+ ScalingMetric.CURRENT_PROCESSING_RATE,
+ EvaluatedScalingMetric.avg(100),
+ ScalingMetric.EXPECTED_PROCESSING_RATE,
+ EvaluatedScalingMetric.of(50),
+ ScalingMetric.PARALLELISM,
EvaluatedScalingMetric.of(50)),
+ jobVertex2,
+ Map.of(
+ ScalingMetric.CURRENT_PROCESSING_RATE,
+ EvaluatedScalingMetric.avg(100),
+ ScalingMetric.EXPECTED_PROCESSING_RATE,
+ EvaluatedScalingMetric.of(50),
+ ScalingMetric.PARALLELISM,
EvaluatedScalingMetric.of(50)));
+
+ var globalMetrics =
+ Map.of(
+ ScalingMetric.HEAP_USED,
+
EvaluatedScalingMetric.avg(MemorySize.ofMebiBytes(5096).getBytes()));
+
+ var metrics = new EvaluatedMetrics(vertexMetrics, globalMetrics);
+
+ Map<String, String> overrides =
+ MemoryTuning.tuneTaskManagerHeapMemory(context, metrics,
eventHandler).toMap();
+ // Test reducing overall memory
+ assertThat(overrides)
+ .containsExactlyInAnyOrderEntriesOf(
+ Map.of(
+ TaskManagerOptions.TASK_HEAP_MEMORY.key(),
+ "6412251955 bytes",
+ TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key(),
+ "0 bytes",
+ TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
+ "12348031160 bytes",
+ TaskManagerOptions.JVM_OVERHEAD_FRACTION.key(),
+ "0.046",
+
TaskManagerOptions.NETWORK_MEMORY_FRACTION.key(),
+ "0.14",
+ TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(),
+ "23323685913 bytes"));
+
+ assertThat(eventHandler.events.poll().getMessage())
+ .startsWith(
+ "Memory tuning recommends the following configuration
(automatic tuning is enabled):");
+
+ // Test giving back memory to RocksDB
+ config.set(AutoScalerOptions.MEMORY_TUNING_TRANSFER_HEAP_TO_MANAGED,
true);
+ config.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
+ overrides = MemoryTuning.tuneTaskManagerHeapMemory(context, metrics,
eventHandler).toMap();
+ assertThat(overrides)
+ .containsExactlyInAnyOrderEntriesOf(
+ Map.of(
+ TaskManagerOptions.TASK_HEAP_MEMORY.key(),
+ "6412251955 bytes",
+ TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key(),
+ "0 bytes",
+ TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
+ "21236599967 bytes",
+ TaskManagerOptions.JVM_OVERHEAD_FRACTION.key(),
+ "0.033",
+
TaskManagerOptions.NETWORK_MEMORY_FRACTION.key(),
+ "0.14",
+ TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(),
+ totalMemory.toString()));
+
+ assertThat(eventHandler.events.poll().getMessage())
+ .startsWith(
+ "Memory tuning recommends the following configuration
(automatic tuning is enabled):");
+
+ // Test tuning disabled
+ config.set(AutoScalerOptions.MEMORY_TUNING_ENABLED, false);
+ assertThat(MemoryTuning.tuneTaskManagerHeapMemory(context, metrics,
eventHandler).toMap())
+ .isEmpty();
+
+ assertThat(eventHandler.events.poll().getMessage())
+ .startsWith(
+ "Memory tuning recommends the following configuration
(automatic tuning is disabled):");
+ }
+}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContext.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContext.java
index 45a11c3e..797436ad 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContext.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContext.java
@@ -22,8 +22,6 @@ import org.apache.flink.api.common.JobStatus;
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MemorySize;
-import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
@@ -56,14 +54,16 @@ public class KubernetesJobAutoScalerContext extends
JobAutoScalerContext<Resourc
jobStatus,
configuration,
metricGroup,
-
Optional.ofNullable(configuration.get(KubernetesConfigOptions.TASK_MANAGER_CPU))
- .orElse(0.),
-
Optional.ofNullable(configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY))
- .orElse(MemorySize.ZERO),
restClientSupplier);
this.resourceContext = resourceContext;
}
+ @Override
+ public Optional<Double> getTaskManagerCpu() {
+ return Optional.ofNullable(
+
getConfiguration().get(KubernetesConfigOptions.TASK_MANAGER_CPU));
+ }
+
public AbstractFlinkResource<?, ?> getResource() {
return resourceContext.getResource();
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java
index d20f783d..8eb6ebdc 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java
@@ -18,10 +18,18 @@
package org.apache.flink.kubernetes.operator.autoscaler;
import org.apache.flink.autoscaler.realizer.ScalingRealizer;
+import org.apache.flink.autoscaler.tuning.MemoryTuning;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.spec.Resource;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
@@ -31,8 +39,10 @@ import java.util.Map;
public class KubernetesScalingRealizer
implements ScalingRealizer<ResourceID, KubernetesJobAutoScalerContext>
{
+ private static final Logger LOG =
LoggerFactory.getLogger(KubernetesScalingRealizer.class);
+
@Override
- public void realize(
+ public void realizeParallelismOverrides(
KubernetesJobAutoScalerContext context, Map<String, String>
parallelismOverrides) {
context.getResource()
@@ -43,6 +53,34 @@ public class KubernetesScalingRealizer
getOverrideString(context, parallelismOverrides));
}
+ @Override
+ public void realizeConfigOverrides(
+ KubernetesJobAutoScalerContext context, Configuration
configOverrides) {
+ if (!(context.getResource() instanceof FlinkDeployment)) {
+ // We can't adjust the configuration of non-job deployments.
+ return;
+ }
+ FlinkDeployment flinkDeployment = ((FlinkDeployment)
context.getResource());
+ // Apply config overrides
+
flinkDeployment.getSpec().getFlinkConfiguration().putAll(configOverrides.toMap());
+
+ // Update total memory in spec
+ var totalMemoryOverride = MemoryTuning.getTotalMemory(configOverrides,
context);
+ if (totalMemoryOverride.compareTo(MemorySize.ZERO) <= 0) {
+ LOG.warn("Total memory override {} is not valid",
totalMemoryOverride);
+ return;
+ }
+ Resource tmResource =
flinkDeployment.getSpec().getTaskManager().getResource();
+ // Make sure to parse in the same way as the original deploy code path.
+ var currentMemory =
+ MemorySize.parse(
+
FlinkConfigBuilder.parseResourceMemoryString(tmResource.getMemory()));
+ if (!totalMemoryOverride.equals(currentMemory)) {
+ // Adjust the resource memory to change the total TM memory
+
tmResource.setMemory(String.valueOf(totalMemoryOverride.getBytes()));
+ }
+ }
+
@Nullable
private static String getOverrideString(
KubernetesJobAutoScalerContext context, Map<String, String>
newOverrides) {
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
index 2e516ae6..53ba7eb4 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
@@ -23,6 +23,7 @@ import org.apache.flink.autoscaler.ScalingTracking;
import org.apache.flink.autoscaler.metrics.CollectedMetrics;
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
import org.apache.flink.autoscaler.utils.AutoScalerSerDeModule;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import
org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -38,6 +39,7 @@ import
org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.LoaderOptions;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import lombok.SneakyThrows;
import org.apache.commons.io.IOUtils;
+import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,8 +68,11 @@ public class KubernetesAutoScalerStateStore
@VisibleForTesting protected static final String COLLECTED_METRICS_KEY =
"collectedMetrics";
@VisibleForTesting
+ /* Be careful with changing this field name or the internal structure.
Otherwise the parallelism of all autoscaled pipelines might get reset! */
protected static final String PARALLELISM_OVERRIDES_KEY =
"parallelismOverrides";
+ protected static final String CONFIG_OVERRIDES_KEY = "configOverrides";
+
@VisibleForTesting protected static final int MAX_CM_BYTES = 1000000;
protected static final ObjectMapper YAML_MAPPER =
@@ -188,6 +193,27 @@ public class KubernetesAutoScalerStateStore
.orElse(new HashMap<>());
}
+ @NotNull
+ @Override
+ public Configuration getConfigOverrides(KubernetesJobAutoScalerContext
jobContext) {
+ return configMapStore
+ .getSerializedState(jobContext, CONFIG_OVERRIDES_KEY)
+
.map(KubernetesAutoScalerStateStore::deserializeConfigOverrides)
+ .orElse(new Configuration());
+ }
+
+ @Override
+ public void storeConfigOverrides(
+ KubernetesJobAutoScalerContext jobContext, Configuration
overrides) {
+ configMapStore.putSerializedState(
+ jobContext, CONFIG_OVERRIDES_KEY,
serializeConfigOverrides(overrides));
+ }
+
+ @Override
+ public void removeConfigOverrides(KubernetesJobAutoScalerContext
jobContext) {
+ configMapStore.removeSerializedState(jobContext, CONFIG_OVERRIDES_KEY);
+ }
+
@Override
public void removeParallelismOverrides(KubernetesJobAutoScalerContext
jobContext) {
configMapStore.removeSerializedState(jobContext,
PARALLELISM_OVERRIDES_KEY);
@@ -250,6 +276,14 @@ public class KubernetesAutoScalerStateStore
return ConfigurationUtils.convertValue(overrides, Map.class);
}
+ private static String serializeConfigOverrides(Configuration overrides) {
+ return ConfigurationUtils.convertValue(overrides.toMap(),
String.class);
+ }
+
+ private static Configuration deserializeConfigOverrides(String overrides) {
+ return
Configuration.fromMap(ConfigurationUtils.convertValue(overrides, Map.class));
+ }
+
@VisibleForTesting
protected void trimHistoryToMaxCmSize(KubernetesJobAutoScalerContext
context) {
int scalingHistorySize =
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
index 339dfaa3..835b2200 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
@@ -443,7 +443,6 @@ public class FlinkConfigBuilder {
}
}
- // Using the K8s units specification for the JM and TM memory settings
public static String parseResourceMemoryString(String memory) {
try {
return MemorySize.parse(memory).toString();
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java
index 41b029ea..0d5c5510 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java
@@ -65,7 +65,7 @@ public abstract class FlinkResourceContext<CR extends
AbstractFlinkResource<?, ?
}
private KubernetesJobAutoScalerContext createJobAutoScalerContext() {
- Configuration conf = new Configuration(getObserveConfig());
+ Configuration conf = new
Configuration(getDeployConfig(resource.getSpec()));
conf.set(
AutoScalerOptions.FLINK_CLIENT_TIMEOUT,
getOperatorConfig().getFlinkClientTimeout());
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContextTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContextTest.java
index 09d19299..a1416385 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContextTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContextTest.java
@@ -33,6 +33,8 @@ import
org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.junit.jupiter.api.Test;
+import java.util.Optional;
+
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
/** Tests for {@link KubernetesJobAutoScalerContext}. */
@@ -63,7 +65,8 @@ public class KubernetesJobAutoScalerContextTest {
new FlinkConfigManager(new Configuration()),
null));
- assertThat(context.getTaskManagerCpu()).isEqualTo(23.);
-
assertThat(context.getTaskManagerMemory()).isEqualTo(MemorySize.parse("1024mb"));
+ assertThat(context.getTaskManagerCpu()).isEqualTo(Optional.of(23.));
+ assertThat(context.getTaskManagerMemory())
+ .isEqualTo(Optional.of(MemorySize.parse("1024mb")));
}
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java
index be6f38c3..9a448304 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java
@@ -17,7 +17,10 @@
package org.apache.flink.kubernetes.operator.autoscaler;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.junit.jupiter.api.Test;
@@ -35,7 +38,8 @@ public class KubernetesScalingRealizerTest {
KubernetesJobAutoScalerContext ctx =
TestingKubernetesAutoscalerUtils.createContext("test", null);
- new KubernetesScalingRealizer().realize(ctx, Map.of("a", "1", "b",
"2"));
+ new KubernetesScalingRealizer()
+ .realizeParallelismOverrides(ctx, Map.of("a", "1", "b", "2"));
assertThat(
ctx.getResource()
@@ -59,6 +63,26 @@ public class KubernetesScalingRealizerTest {
assertOverridesDoNotChange("b:2,a:1", newOverrides);
}
+ @Test
+ public void testApplyMemoryOverrides() {
+ KubernetesJobAutoScalerContext ctx =
+ TestingKubernetesAutoscalerUtils.createContext("test", null);
+
+ var overrides = new Configuration();
+ MemorySize memoryOverride = MemorySize.ofMebiBytes(4096);
+ overrides.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, memoryOverride);
+ new KubernetesScalingRealizer().realizeConfigOverrides(ctx, overrides);
+
+ assertThat(ctx.getResource()).isInstanceOf(FlinkDeployment.class);
+ assertThat(
+ ((FlinkDeployment) ctx.getResource())
+ .getSpec()
+ .getTaskManager()
+ .getResource()
+ .getMemory())
+ .isEqualTo(String.valueOf(memoryOverride.getBytes()));
+ }
+
private void assertOverridesDoNotChange(
String currentOverrides, LinkedHashMap<String, String>
newOverrides) {
@@ -77,7 +101,7 @@ public class KubernetesScalingRealizerTest {
.getFlinkConfiguration()
.remove(PipelineOptions.PARALLELISM_OVERRIDES.key());
- new KubernetesScalingRealizer().realize(ctx, newOverrides);
+ new KubernetesScalingRealizer().realizeParallelismOverrides(ctx,
newOverrides);
assertThat(
ctx.getResource()