This is an automated email from the ASF dual-hosted git repository.
mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 3ead906f [FLINK-34152] Tune all directly observable memory types
(except network memory) (#778)
3ead906f is described below
commit 3ead906f33a4b3790fa5c12f2018e09db9443a09
Author: Maximilian Michels <[email protected]>
AuthorDate: Thu Feb 22 13:42:40 2024 +0100
[FLINK-34152] Tune all directly observable memory types (except network
memory) (#778)
---
.../generated/auto_scaler_configuration.html | 26 +--
.../jdbc/state/JdbcAutoScalerStateStore.java | 33 ++-
.../realizer/RescaleApiScalingRealizer.java | 5 +-
.../apache/flink/autoscaler/JobAutoScalerImpl.java | 8 +-
.../flink/autoscaler/RestApiMetricsCollector.java | 22 +-
.../apache/flink/autoscaler/ScalingExecutor.java | 13 +-
.../flink/autoscaler/ScalingMetricEvaluator.java | 30 ++-
.../flink/autoscaler/config/AutoScalerOptions.java | 37 +---
.../flink/autoscaler/metrics/FlinkMetric.java | 8 +-
.../flink/autoscaler/metrics/ScalingMetric.java | 10 +-
.../flink/autoscaler/metrics/ScalingMetrics.java | 29 ++-
.../flink/autoscaler/realizer/ScalingRealizer.java | 4 +-
.../autoscaler/state/AutoScalerStateStore.java | 8 +-
.../state/InMemoryAutoScalerStateStore.java | 17 +-
.../flink/autoscaler/tuning/ConfigChanges.java | 67 ++++++
.../flink/autoscaler/tuning/MemoryBudget.java | 42 ++++
.../flink/autoscaler/tuning/MemoryTuning.java | 235 ++++++++++++---------
.../flink/autoscaler/JobAutoScalerImplTest.java | 34 ++-
.../autoscaler/RestApiMetricsCollectorTest.java | 24 ++-
.../flink/autoscaler/ScalingExecutorTest.java | 27 ++-
.../autoscaler/ScalingMetricEvaluatorTest.java | 32 ++-
.../autoscaler/metrics/ScalingMetricsTest.java | 37 ++--
.../realizer/TestingScalingRealizer.java | 14 +-
.../state/AbstractAutoScalerStateStoreTest.java | 12 +-
.../flink/autoscaler/utils/MemoryTuningTest.java | 97 ++++++---
.../autoscaler/KubernetesScalingRealizer.java | 12 +-
.../state/KubernetesAutoScalerStateStore.java | 34 ++-
.../autoscaler/KubernetesScalingRealizerTest.java | 6 +-
28 files changed, 582 insertions(+), 341 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
index ee6fb731..65fb594d 100644
--- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
+++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
@@ -66,31 +66,19 @@
<td><h5>job.autoscaler.memory.tuning.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
- <td>If enabled, the initial amount of memory specified for
TaskManagers will be reduced according to the observed needs.</td>
+ <td>If enabled, the initial amount of memory specified for
TaskManagers will be reduced/increased according to the observed needs.</td>
</tr>
<tr>
- <td><h5>job.autoscaler.memory.tuning.heap.min</h5></td>
- <td style="word-wrap: break-word;">512 mb</td>
- <td>MemorySize</td>
- <td>The minimum amount of TaskManager heap memory, if memory
tuning is enabled.</td>
+
<td><h5>job.autoscaler.memory.tuning.maximize-managed-memory</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>If enabled and managed memory is used (e.g. RocksDB turned
on), any reduction of heap, network, or metaspace memory will increase the
managed memory.</td>
</tr>
<tr>
- <td><h5>job.autoscaler.memory.tuning.heap.overhead</h5></td>
+ <td><h5>job.autoscaler.memory.tuning.overhead</h5></td>
<td style="word-wrap: break-word;">0.2</td>
<td>Double</td>
- <td>Overhead to add to heap tuning decisions (0-1). This ensures
spare capacity and allows the memory to grow beyond the dynamically computed
limits, but never beyond the original memory limits.</td>
- </tr>
- <tr>
- <td><h5>job.autoscaler.memory.tuning.heap.target-usage</h5></td>
- <td style="word-wrap: break-word;">MAX</td>
- <td><p>Enum</p></td>
- <td>The heap size to target. Average usage (AVG) will yield better
savings. Max usage will yield more conservative savings.<br /><br />Possible
values:<ul><li>"AVG"</li><li>"MAX"</li></ul></td>
- </tr>
- <tr>
-
<td><h5>job.autoscaler.memory.tuning.heap.transfer-to-managed</h5></td>
- <td style="word-wrap: break-word;">false</td>
- <td>Boolean</td>
- <td>If enabled, any reduction of heap memory will increase the
managed memory used by RocksDB.</td>
+ <td>Overhead to add to tuning decisions (0-1). This ensures spare
capacity and allows the memory to grow beyond the dynamically computed limits,
but never beyond the original memory limits.</td>
</tr>
<tr>
<td><h5>job.autoscaler.metrics.busy-time.aggregator</h5></td>
diff --git
a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java
index 3f2af617..1224e0d4 100644
---
a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java
+++
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java
@@ -24,8 +24,8 @@ import org.apache.flink.autoscaler.ScalingSummary;
import org.apache.flink.autoscaler.ScalingTracking;
import org.apache.flink.autoscaler.metrics.CollectedMetrics;
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.autoscaler.tuning.ConfigChanges;
import org.apache.flink.autoscaler.utils.AutoScalerSerDeModule;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -38,6 +38,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import java.time.Instant;
import java.util.HashMap;
@@ -192,24 +193,24 @@ public class JdbcAutoScalerStateStore<KEY, Context
extends JobAutoScalerContext<
}
@Override
- public void storeConfigOverrides(Context jobContext, Configuration
configOverrides) {
+ public void storeConfigChanges(Context jobContext, ConfigChanges
configChanges) {
jdbcStateStore.putSerializedState(
getSerializeKey(jobContext),
CONFIG_OVERRIDES,
- serializeConfigOverrides(configOverrides));
+ serializeConfigOverrides(configChanges));
}
@Nonnull
@Override
- public Configuration getConfigOverrides(Context jobContext) {
+ public ConfigChanges getConfigChanges(Context jobContext) {
return jdbcStateStore
.getSerializedState(getSerializeKey(jobContext),
CONFIG_OVERRIDES)
.map(JdbcAutoScalerStateStore::deserializeConfigOverrides)
- .orElse(new Configuration());
+ .orElse(new ConfigChanges());
}
@Override
- public void removeConfigOverrides(Context jobContext) {
+ public void removeConfigChanges(Context jobContext) {
jdbcStateStore.removeSerializedState(getSerializeKey(jobContext),
CONFIG_OVERRIDES);
}
@@ -276,11 +277,23 @@ public class JdbcAutoScalerStateStore<KEY, Context
extends JobAutoScalerContext<
return ConfigurationUtils.convertValue(overrides, Map.class);
}
- private static String serializeConfigOverrides(Configuration overrides) {
- return ConfigurationUtils.convertValue(overrides.toMap(),
String.class);
+ @Nullable
+ private static String serializeConfigOverrides(ConfigChanges
configChanges) {
+ try {
+ return YAML_MAPPER.writeValueAsString(configChanges);
+ } catch (Exception e) {
+ LOG.error("Failed to serialize ConfigOverrides", e);
+ return null;
+ }
}
- private static Configuration deserializeConfigOverrides(String overrides) {
- return
Configuration.fromMap(ConfigurationUtils.convertValue(overrides, Map.class));
+ @Nullable
+ private static ConfigChanges deserializeConfigOverrides(String
configOverrides) {
+ try {
+ return YAML_MAPPER.readValue(configOverrides, new
TypeReference<>() {});
+ } catch (Exception e) {
+ LOG.error("Failed to deserialize ConfigOverrides", e);
+ return null;
+ }
}
}
diff --git
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizer.java
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizer.java
index 8cba81d5..abf0f95b 100644
---
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizer.java
+++
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizer.java
@@ -24,6 +24,7 @@ import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
import org.apache.flink.autoscaler.realizer.ScalingRealizer;
+import org.apache.flink.autoscaler.tuning.ConfigChanges;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
@@ -126,12 +127,12 @@ public class RescaleApiScalingRealizer<KEY, Context
extends JobAutoScalerContext
}
@Override
- public void realizeConfigOverrides(Context context, Configuration
configOverrides) {
+ public void realizeConfigOverrides(Context context, ConfigChanges
configChanges) {
// Not currently supported
LOG.warn(
"{} does not support updating the TaskManager configuration
({})",
getClass().getSimpleName(),
- configOverrides);
+ configChanges);
}
private Map<JobVertexID, JobVertexResourceRequirements> getVertexResources(
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
index 6f3d4937..5bb7b791 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
@@ -26,7 +26,7 @@ import
org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics;
import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
import org.apache.flink.autoscaler.realizer.ScalingRealizer;
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.autoscaler.tuning.ConfigChanges;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.util.Preconditions;
@@ -167,9 +167,9 @@ public class JobAutoScalerImpl<KEY, Context extends
JobAutoScalerContext<KEY>>
return;
}
- Configuration configOverrides = stateStore.getConfigOverrides(ctx);
- LOG.info("Applying config overrides: {}", configOverrides);
- scalingRealizer.realizeConfigOverrides(ctx, configOverrides);
+ ConfigChanges configChanges = stateStore.getConfigChanges(ctx);
+ LOG.debug("Applying config overrides: {}", configChanges);
+ scalingRealizer.realizeConfigOverrides(ctx, configChanges);
}
private void runScalingLogic(Context ctx, AutoscalerFlinkMetrics
autoscalerMetrics)
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java
index a5b2cf02..09b9bb65 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java
@@ -35,6 +35,8 @@ import
org.apache.flink.runtime.rest.messages.job.metrics.Metric;
import
org.apache.flink.runtime.rest.messages.job.metrics.MetricsAggregationParameter;
import
org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
+
import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
@@ -44,8 +46,10 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import static org.apache.flink.autoscaler.metrics.FlinkMetric.HEAP_MAX;
-import static org.apache.flink.autoscaler.metrics.FlinkMetric.HEAP_USED;
+import static org.apache.flink.autoscaler.metrics.FlinkMetric.HEAP_MEMORY_MAX;
+import static org.apache.flink.autoscaler.metrics.FlinkMetric.HEAP_MEMORY_USED;
+import static
org.apache.flink.autoscaler.metrics.FlinkMetric.MANAGED_MEMORY_USED;
+import static
org.apache.flink.autoscaler.metrics.FlinkMetric.METASPACE_MEMORY_USED;
import static
org.apache.flink.autoscaler.metrics.FlinkMetric.TOTAL_GC_TIME_PER_SEC;
/** Metric collector using flink rest api. */
@@ -55,13 +59,15 @@ public class RestApiMetricsCollector<KEY, Context extends
JobAutoScalerContext<K
private static final Map<String, FlinkMetric> COMMON_TM_METRIC_NAMES =
Map.of(
- "Status.JVM.Memory.Heap.Max", HEAP_MAX,
- "Status.JVM.Memory.Heap.Used", HEAP_USED);
+ "Status.JVM.Memory.Heap.Max", HEAP_MEMORY_MAX,
+ "Status.JVM.Memory.Heap.Used", HEAP_MEMORY_USED,
+ "Status.Flink.Memory.Managed.Used", MANAGED_MEMORY_USED,
+ "Status.JVM.Memory.Metaspace.Used", METASPACE_MEMORY_USED);
private static final Map<String, FlinkMetric> TM_METRIC_NAMES_WITH_GC =
- Map.of(
- "Status.JVM.Memory.Heap.Max", HEAP_MAX,
- "Status.JVM.Memory.Heap.Used", HEAP_USED,
- "Status.JVM.GarbageCollector.All.TimeMsPerSecond",
TOTAL_GC_TIME_PER_SEC);
+ ImmutableMap.<String, FlinkMetric>builder()
+ .putAll(COMMON_TM_METRIC_NAMES)
+ .put("Status.JVM.GarbageCollector.All.TimeMsPerSecond",
TOTAL_GC_TIME_PER_SEC)
+ .build();
@Override
protected Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>>
queryAllAggregatedMetrics(
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
index fdbb3c17..4ff9ae36 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
@@ -118,12 +118,15 @@ public class ScalingExecutor<KEY, Context extends
JobAutoScalerContext<KEY>> {
return false;
}
- var tuningConfig =
+ var configOverrides =
MemoryTuning.tuneTaskManagerHeapMemory(
context, evaluatedMetrics, autoScalerEventHandler);
if (scalingWouldExceedClusterResources(
- tuningConfig, evaluatedMetrics, scalingSummaries, context)) {
+ configOverrides.applyOverrides(conf),
+ evaluatedMetrics,
+ scalingSummaries,
+ context)) {
return false;
}
@@ -138,7 +141,7 @@ public class ScalingExecutor<KEY, Context extends
JobAutoScalerContext<KEY>> {
getVertexParallelismOverrides(
evaluatedMetrics.getVertexMetrics(),
scalingSummaries));
- autoScalerStateStore.storeConfigOverrides(context, tuningConfig);
+ autoScalerStateStore.storeConfigChanges(context, configOverrides);
return true;
}
@@ -267,13 +270,13 @@ public class ScalingExecutor<KEY, Context extends
JobAutoScalerContext<KEY>> {
}
private boolean scalingWouldExceedClusterResources(
- Configuration tuningConfig,
+ Configuration tunedConfig,
EvaluatedMetrics evaluatedMetrics,
Map<JobVertexID, ScalingSummary> scalingSummaries,
JobAutoScalerContext<?> ctx) {
final double taskManagerCpu = ctx.getTaskManagerCpu().orElse(0.);
- final MemorySize taskManagerMemory =
MemoryTuning.getTotalMemory(tuningConfig, ctx);
+ final MemorySize taskManagerMemory =
MemoryTuning.getTotalMemory(tunedConfig, ctx);
if (taskManagerCpu <= 0 ||
taskManagerMemory.compareTo(MemorySize.ZERO) <= 0) {
// We can't extract the requirements, we can't make any assumptions
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
index b2a79092..6512758b 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
@@ -49,10 +49,12 @@ import static
org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZ
import static
org.apache.flink.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.GC_PRESSURE;
import static
org.apache.flink.autoscaler.metrics.ScalingMetric.HEAP_MAX_USAGE_RATIO;
-import static org.apache.flink.autoscaler.metrics.ScalingMetric.HEAP_USED;
+import static
org.apache.flink.autoscaler.metrics.ScalingMetric.HEAP_MEMORY_USED;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.LAG;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.LOAD;
+import static
org.apache.flink.autoscaler.metrics.ScalingMetric.MANAGED_MEMORY_USED;
import static
org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static
org.apache.flink.autoscaler.metrics.ScalingMetric.METASPACE_MEMORY_USED;
import static
org.apache.flink.autoscaler.metrics.ScalingMetric.NUM_TASK_SLOTS_USED;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.OBSERVED_TPR;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
@@ -365,18 +367,12 @@ public class ScalingMetricEvaluator {
var out = new HashMap<ScalingMetric, EvaluatedScalingMetric>();
var gcPressure = latest.getOrDefault(GC_PRESSURE, Double.NaN);
- var lastHeapUsage = latest.getOrDefault(HEAP_MAX_USAGE_RATIO,
Double.NaN);
-
out.put(GC_PRESSURE, EvaluatedScalingMetric.of(gcPressure));
- out.put(
- HEAP_MAX_USAGE_RATIO,
- new EvaluatedScalingMetric(
- lastHeapUsage,
- getAverageGlobalMetric(HEAP_MAX_USAGE_RATIO,
metricHistory)));
- var latestObservation = latest.getOrDefault(HEAP_USED, Double.NaN);
- double heapSizeAverage = getAverageGlobalMetric(HEAP_USED,
metricHistory);
- out.put(HEAP_USED, new EvaluatedScalingMetric(latestObservation,
heapSizeAverage));
+ populateMetric(HEAP_MAX_USAGE_RATIO, metricHistory, out);
+ populateMetric(HEAP_MEMORY_USED, metricHistory, out);
+ populateMetric(MANAGED_MEMORY_USED, metricHistory, out);
+ populateMetric(METASPACE_MEMORY_USED, metricHistory, out);
out.put(
NUM_TASK_SLOTS_USED,
@@ -385,6 +381,18 @@ public class ScalingMetricEvaluator {
return out;
}
+ private static void populateMetric(
+ ScalingMetric scalingMetric,
+ SortedMap<Instant, CollectedMetrics> metricHistory,
+ Map<ScalingMetric, EvaluatedScalingMetric> out) {
+ var latestMetrics =
metricHistory.get(metricHistory.lastKey()).getGlobalMetrics();
+
+ var latestObservation = latestMetrics.getOrDefault(scalingMetric,
Double.NaN);
+ double value = getAverageGlobalMetric(scalingMetric, metricHistory);
+
+ out.put(scalingMetric, new EvaluatedScalingMetric(latestObservation,
value));
+ }
+
private static double getAverageGlobalMetric(
ScalingMetric metric, SortedMap<Instant, CollectedMetrics>
metricsHistory) {
return getAverage(metric, null, metricsHistory);
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
index 58bdfce0..52c0e2a0 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
@@ -18,10 +18,8 @@
package org.apache.flink.autoscaler.config;
import org.apache.flink.autoscaler.metrics.MetricAggregator;
-import org.apache.flink.autoscaler.tuning.MemoryTuning;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.configuration.MemorySize;
import java.time.Duration;
import java.util.List;
@@ -258,40 +256,23 @@ public class AutoScalerOptions {
.defaultValue(false)
.withFallbackKeys(oldOperatorConfigKey("memory.tuning.enabled"))
.withDescription(
- "If enabled, the initial amount of memory
specified for TaskManagers will be reduced according to the observed needs.");
+ "If enabled, the initial amount of memory
specified for TaskManagers will be reduced/increased according to the observed
needs.");
- public static final ConfigOption<MemoryTuning.HeapUsageTarget>
MEMORY_TUNING_HEAP_TARGET =
- autoScalerConfig("memory.tuning.heap.target-usage")
- .enumType(MemoryTuning.HeapUsageTarget.class)
- .defaultValue(MemoryTuning.HeapUsageTarget.MAX)
-
.withFallbackKeys(oldOperatorConfigKey("memory.tuning.heap.target-usage"))
- .withDescription(
- "The heap size to target. Average usage (AVG) will
yield better savings. Max usage will yield more conservative savings.");
-
- public static final ConfigOption<Double> MEMORY_TUNING_HEAP_OVERHEAD =
- autoScalerConfig("memory.tuning.heap.overhead")
+ public static final ConfigOption<Double> MEMORY_TUNING_OVERHEAD =
+ autoScalerConfig("memory.tuning.overhead")
.doubleType()
.defaultValue(0.2)
-
.withFallbackKeys(oldOperatorConfigKey("memory.tuning.heap.overhead"))
+
.withFallbackKeys(oldOperatorConfigKey("memory.tuning.overhead"))
.withDescription(
- "Overhead to add to heap tuning decisions (0-1).
This ensures spare capacity and allows the memory to grow beyond the
dynamically computed limits, but never beyond the original memory limits.");
+ "Overhead to add to tuning decisions (0-1). This
ensures spare capacity and allows the memory to grow beyond the dynamically
computed limits, but never beyond the original memory limits.");
- public static final ConfigOption<MemorySize> MEMORY_TUNING_MIN_HEAP =
- autoScalerConfig("memory.tuning.heap.min")
- .memoryType()
- .defaultValue(MemorySize.ofMebiBytes(512L))
-
.withFallbackKeys(oldOperatorConfigKey("memory.tuning.heap.min"))
- .withDescription(
- "The minimum amount of TaskManager heap memory, if
memory tuning is enabled.");
-
- public static final ConfigOption<Boolean>
MEMORY_TUNING_TRANSFER_HEAP_TO_MANAGED =
- autoScalerConfig("memory.tuning.heap.transfer-to-managed")
+ public static final ConfigOption<Boolean>
MEMORY_TUNING_MAXIMIZE_MANAGED_MEMORY =
+ autoScalerConfig("memory.tuning.maximize-managed-memory")
.booleanType()
.defaultValue(false)
- .withFallbackKeys(
-
oldOperatorConfigKey("memory.tuning.heap.transfer-to-managed"))
+
.withFallbackKeys(oldOperatorConfigKey("memory.tuning.maximize-managed-memory"))
.withDescription(
- "If enabled, any reduction of heap memory will
increase the managed memory used by RocksDB.");
+ "If enabled and managed memory is used (e.g.
RocksDB turned on), any reduction of heap, network, or metaspace memory will
increase the managed memory.");
public static final ConfigOption<Integer> VERTEX_SCALING_HISTORY_COUNT =
autoScalerConfig("history.max.count")
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
index 5cf84c91..32abd946 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
@@ -41,9 +41,13 @@ public enum FlinkMetric {
PENDING_RECORDS(s -> s.endsWith(".pendingRecords")),
BACKPRESSURE_TIME_PER_SEC(s -> s.equals("backPressuredTimeMsPerSecond")),
- HEAP_MAX(s -> s.equals("Status.JVM.Memory.Heap.Max")),
- HEAP_USED(s -> s.equals("Status.JVM.Memory.Heap.Used")),
+ HEAP_MEMORY_MAX(s -> s.equals("Status.JVM.Memory.Heap.Max")),
+ HEAP_MEMORY_USED(s -> s.equals("Status.JVM.Memory.Heap.Used")),
+ MANAGED_MEMORY_USED(s -> s.equals("Status.Flink.Memory.Managed.Used")),
+ METASPACE_MEMORY_USED(s -> s.equals("Status.JVM.Memory.Metaspace.Used")),
+
TOTAL_GC_TIME_PER_SEC(s ->
s.equals("Status.JVM.GarbageCollector.All.TimeMsPerSecond")),
+
NUM_TASK_SLOTS_TOTAL(s -> s.equals("taskSlotsTotal")),
NUM_TASK_SLOTS_AVAILABLE(s -> s.equals("taskSlotsAvailable"));
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
index 4f9e3083..5466f691 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
@@ -74,8 +74,14 @@ public enum ScalingMetric {
*/
GC_PRESSURE(false),
- /** Measured used heap size in bytes. */
- HEAP_USED(true),
+ /** Measured max used heap size in bytes. */
+ HEAP_MEMORY_USED(true),
+
+ /** Measured max managed memory size in bytes. */
+ MANAGED_MEMORY_USED(true),
+
+ /** Measured max metaspace memory size in bytes. */
+ METASPACE_MEMORY_USED(true),
/** Percentage of max heap used (between 0 and 1). */
HEAP_MAX_USAGE_RATIO(true),
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
index 0fbcc64b..84996235 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
@@ -20,7 +20,6 @@ package org.apache.flink.autoscaler.metrics;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.topology.IOMetrics;
import org.apache.flink.autoscaler.topology.JobTopology;
-import org.apache.flink.autoscaler.tuning.MemoryTuning;
import org.apache.flink.autoscaler.utils.AutoScalerUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -170,24 +169,24 @@ public class ScalingMetrics {
out.put(ScalingMetric.GC_PRESSURE, gcTime.getMax() / 1000);
}
- var heapMax = collectedTmMetrics.get(FlinkMetric.HEAP_MAX);
- var heapUsed = collectedTmMetrics.get(FlinkMetric.HEAP_USED);
+ var heapMax = collectedTmMetrics.get(FlinkMetric.HEAP_MEMORY_MAX);
+ var heapUsed = collectedTmMetrics.get(FlinkMetric.HEAP_MEMORY_USED);
+
if (heapMax != null && heapUsed != null) {
- MemoryTuning.HeapUsageTarget heapTarget =
- conf.get(AutoScalerOptions.MEMORY_TUNING_HEAP_TARGET);
- switch (heapTarget) {
- case AVG:
- out.put(ScalingMetric.HEAP_USED, heapUsed.getAvg());
- break;
- case MAX:
- out.put(ScalingMetric.HEAP_USED, heapUsed.getMax());
- break;
- default:
- LOG.warn("Unknown value {} for heap target", heapTarget);
- }
+ out.put(ScalingMetric.HEAP_MEMORY_USED, heapUsed.getMax());
out.put(ScalingMetric.HEAP_MAX_USAGE_RATIO, heapUsed.getMax() /
heapMax.getMax());
}
+ var managedMemory =
collectedTmMetrics.get(FlinkMetric.MANAGED_MEMORY_USED);
+ if (managedMemory != null) {
+ out.put(ScalingMetric.MANAGED_MEMORY_USED, managedMemory.getMax());
+ }
+
+ var metaspaceMemory =
collectedTmMetrics.get(FlinkMetric.METASPACE_MEMORY_USED);
+ if (metaspaceMemory != null) {
+ out.put(ScalingMetric.METASPACE_MEMORY_USED,
metaspaceMemory.getMax());
+ }
+
return out;
}
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/ScalingRealizer.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/ScalingRealizer.java
index aba94157..12b7ca36 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/ScalingRealizer.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/ScalingRealizer.java
@@ -19,7 +19,7 @@ package org.apache.flink.autoscaler.realizer;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.autoscaler.JobAutoScalerContext;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.autoscaler.tuning.ConfigChanges;
import java.util.Map;
@@ -37,5 +37,5 @@ public interface ScalingRealizer<KEY, Context extends
JobAutoScalerContext<KEY>>
void realizeParallelismOverrides(Context context, Map<String, String>
parallelismOverrides);
/** Updates the TaskManager memory configuration. */
- void realizeConfigOverrides(Context context, Configuration
configOverrides);
+ void realizeConfigOverrides(Context context, ConfigChanges configChanges);
}
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
index 30be323f..7fb369cf 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
@@ -22,7 +22,7 @@ import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.ScalingSummary;
import org.apache.flink.autoscaler.ScalingTracking;
import org.apache.flink.autoscaler.metrics.CollectedMetrics;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.autoscaler.tuning.ConfigChanges;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import javax.annotation.Nonnull;
@@ -70,12 +70,12 @@ public interface AutoScalerStateStore<KEY, Context extends
JobAutoScalerContext<
void removeParallelismOverrides(Context jobContext) throws Exception;
- void storeConfigOverrides(Context jobContext, Configuration
configOverrides) throws Exception;
+ void storeConfigChanges(Context jobContext, ConfigChanges configChanges)
throws Exception;
@Nonnull
- Configuration getConfigOverrides(Context jobContext) throws Exception;
+ ConfigChanges getConfigChanges(Context jobContext) throws Exception;
- void removeConfigOverrides(Context jobContext) throws Exception;
+ void removeConfigChanges(Context jobContext) throws Exception;
/** Removes all data from this context. Flush stil needs to be called. */
void clearAll(Context jobContext) throws Exception;
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
index 60a9342e..35084db3 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
@@ -21,7 +21,7 @@ import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.ScalingSummary;
import org.apache.flink.autoscaler.ScalingTracking;
import org.apache.flink.autoscaler.metrics.CollectedMetrics;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.autoscaler.tuning.ConfigChanges;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import javax.annotation.Nonnull;
@@ -50,7 +50,7 @@ public class InMemoryAutoScalerStateStore<KEY, Context
extends JobAutoScalerCont
private final Map<KEY, Map<String, String>> parallelismOverridesStore;
- private final Map<KEY, Configuration> tmConfigOverrides;
+ private final Map<KEY, ConfigChanges> tmConfigOverrides;
private final Map<KEY, ScalingTracking> scalingTrackingStore;
@@ -59,7 +59,7 @@ public class InMemoryAutoScalerStateStore<KEY, Context
extends JobAutoScalerCont
collectedMetricsStore = new ConcurrentHashMap<>();
parallelismOverridesStore = new ConcurrentHashMap<>();
scalingTrackingStore = new ConcurrentHashMap<>();
- tmConfigOverrides = new ConcurrentHashMap<>();
+ tmConfigOverrides = new ConcurrentHashMap<KEY, ConfigChanges>();
}
@Override
@@ -122,20 +122,19 @@ public class InMemoryAutoScalerStateStore<KEY, Context
extends JobAutoScalerCont
}
@Override
- public void storeConfigOverrides(Context jobContext, Configuration
configOverrides)
- throws Exception {
- tmConfigOverrides.put(jobContext.getJobKey(), configOverrides);
+ public void storeConfigChanges(Context jobContext, ConfigChanges
configChanges) {
+ tmConfigOverrides.put(jobContext.getJobKey(), configChanges);
}
@Nonnull
@Override
- public Configuration getConfigOverrides(Context jobContext) {
+ public ConfigChanges getConfigChanges(Context jobContext) {
return
Optional.ofNullable(tmConfigOverrides.get(jobContext.getJobKey()))
- .orElse(new Configuration());
+ .orElse(new ConfigChanges());
}
@Override
- public void removeConfigOverrides(Context jobContext) {
+ public void removeConfigChanges(Context jobContext) {
tmConfigOverrides.remove(jobContext.getJobKey());
}
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/ConfigChanges.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/ConfigChanges.java
new file mode 100644
index 00000000..f76a22fe
--- /dev/null
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/ConfigChanges.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.tuning;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.FallbackKey;
+
+import lombok.Getter;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/** Holds the configuration overrides and removals for a Flink Configuration.
*/
+public class ConfigChanges {
+
+ /** Overrides which will be applied on top of the existing config. */
+ @Getter private final Map<String, String> overrides = new HashMap<>();
+
+ /** Removals which will be removed from the existing config. */
+ @Getter private final Set<String> removals = new HashSet<>();
+
+ public <T> ConfigChanges addOverride(ConfigOption<T> configOption, T
value) {
+ overrides.put(configOption.key(),
ConfigurationUtils.convertValue(value, String.class));
+ return this;
+ }
+
+ public ConfigChanges addOverride(String key, String value) {
+ overrides.put(key, value);
+ return this;
+ }
+
+ public ConfigChanges addRemoval(ConfigOption<?> configOption) {
+ removals.add(configOption.key());
+ for (FallbackKey fallbackKey : configOption.fallbackKeys()) {
+ removals.add(fallbackKey.getKey());
+ }
+ return this;
+ }
+
+ public Configuration applyOverrides(Configuration existingConfig) {
+ Configuration config = new Configuration(existingConfig);
+ for (String key : removals) {
+ config.removeKey(key);
+ }
+ config.addAll(Configuration.fromMap(overrides));
+ return config;
+ }
+}
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryBudget.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryBudget.java
new file mode 100644
index 00000000..4e18c74c
--- /dev/null
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryBudget.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.tuning;
+
+import org.apache.flink.util.Preconditions;
+
+/** Accounting for distributing memory over the available pools. */
+public class MemoryBudget {
+
+ private long remaining;
+
+ public MemoryBudget(long remaining) {
+ Preconditions.checkArgument(remaining >= 0);
+ this.remaining = remaining;
+ }
+
+ public long budget(long amount) {
+ Preconditions.checkArgument(amount >= 0);
+ var budgeted = Math.min(amount, remaining);
+ remaining -= budgeted;
+ return budgeted;
+ }
+
+ public long getRemaining() {
+ return remaining;
+ }
+}
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java
index b4cd00e5..8ca0dcc3 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java
@@ -21,11 +21,11 @@ import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.MemorySize;
-import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.util.config.memory.CommonProcessMemorySpec;
@@ -41,6 +41,10 @@ import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Map;
+import static
org.apache.flink.autoscaler.metrics.ScalingMetric.HEAP_MEMORY_USED;
+import static
org.apache.flink.autoscaler.metrics.ScalingMetric.MANAGED_MEMORY_USED;
+import static
org.apache.flink.autoscaler.metrics.ScalingMetric.METASPACE_MEMORY_USED;
+
/** Tunes the TaskManager memory. */
public class MemoryTuning {
@@ -48,13 +52,7 @@ public class MemoryTuning {
public static final ProcessMemoryUtils<TaskExecutorFlinkMemory>
FLINK_MEMORY_UTILS =
new ProcessMemoryUtils<>(getMemoryOptions(), new
TaskExecutorFlinkMemoryUtils());
- private static final Configuration EMPTY_CONFIG = new Configuration();
-
- /** What memory usage to target. */
- public enum HeapUsageTarget {
- AVG,
- MAX
- }
+ private static final ConfigChanges EMPTY_CONFIG = new ConfigChanges();
/**
* Emits a Configuration which contains overrides for the current
configuration. We are not
@@ -62,7 +60,7 @@ public class MemoryTuning {
* overrides. This config is persisted separately and applied by the
autoscaler. That way we can
* clear any applied overrides if auto-tuning is disabled.
*/
- public static Configuration tuneTaskManagerHeapMemory(
+ public static ConfigChanges tuneTaskManagerHeapMemory(
JobAutoScalerContext<?> context,
EvaluatedMetrics evaluatedMetrics,
AutoScalerEventHandler eventHandler) {
@@ -80,57 +78,97 @@ public class MemoryTuning {
return EMPTY_CONFIG;
}
- var maxHeapSize = memSpecs.getFlinkMemory().getJvmHeapMemorySize();
- LOG.debug("Current configured heap size: {}", maxHeapSize);
+ MemorySize specHeapSize =
memSpecs.getFlinkMemory().getJvmHeapMemorySize();
+ MemorySize specManagedSize = memSpecs.getFlinkMemory().getManaged();
+ MemorySize specNetworkSize = memSpecs.getFlinkMemory().getNetwork();
+ MemorySize specMetaspaceSize = memSpecs.getJvmMetaspaceSize();
+ LOG.info(
+ "Spec memory - heap: {}, managed: {}, network: {}, meta: {}",
+ specHeapSize.toHumanReadableString(),
+ specManagedSize.toHumanReadableString(),
+ specNetworkSize.toHumanReadableString(),
+ specMetaspaceSize.toHumanReadableString());
+
+ MemorySize maxMemoryBySpec =
context.getTaskManagerMemory().orElse(MemorySize.ZERO);
+ if (maxMemoryBySpec.compareTo(MemorySize.ZERO) <= 0) {
+ LOG.warn("Spec TaskManager memory size could not be determined.");
+ return EMPTY_CONFIG;
+ }
+ MemoryBudget memBudget = new MemoryBudget(maxMemoryBySpec.getBytes());
+ // Add these current settings from the budget
+
memBudget.budget(memSpecs.getFlinkMemory().getFrameworkOffHeap().getBytes());
+
memBudget.budget(memSpecs.getFlinkMemory().getTaskOffHeap().getBytes());
+ memBudget.budget(memSpecs.getJvmOverheadSize().getBytes());
- MemorySize newHeapSize = determineNewHeapSize(evaluatedMetrics,
config, maxHeapSize);
- LOG.info("New TM heap memory {}", newHeapSize.toHumanReadableString());
+ var globalMetrics = evaluatedMetrics.getGlobalMetrics();
+ // The order matters in case the memory usage is higher than the
maximum available memory.
+ // Managed memory comes last because it can grow arbitrary for RocksDB
jobs.
+ MemorySize newNetworkSize = adjustNetworkMemory(specNetworkSize,
memBudget);
+ MemorySize newHeapSize =
+ determineNewSize(getUsage(HEAP_MEMORY_USED, globalMetrics),
config, memBudget);
+ MemorySize newMetaspaceSize =
+ determineNewSize(getUsage(METASPACE_MEMORY_USED,
globalMetrics), config, memBudget);
+ MemorySize newManagedSize =
+ adjustManagedMemory(
+ getUsage(MANAGED_MEMORY_USED, globalMetrics),
+ specManagedSize,
+ config,
+ memBudget);
+ LOG.info(
+ "Optimized memory sizes: heap: {} managed: {}, network: {},
meta: {}",
+ newHeapSize.toHumanReadableString(),
+ newManagedSize.toHumanReadableString(),
+ newNetworkSize.toHumanReadableString(),
+ newMetaspaceSize.toHumanReadableString());
// Diff can be negative (memory shrinks) or positive (memory grows)
- final long heapDiffBytes = newHeapSize.getBytes() -
maxHeapSize.getBytes();
-
- final MemorySize totalMemory = adjustTotalTmMemory(context,
heapDiffBytes);
- if (totalMemory.equals(MemorySize.ZERO)) {
+ final long heapDiffBytes = newHeapSize.getBytes() -
specHeapSize.getBytes();
+ final long managedDiffBytes = newManagedSize.getBytes() -
specManagedSize.getBytes();
+ final long networkDiffBytes = newNetworkSize.getBytes() -
specNetworkSize.getBytes();
+ final long flinkMemoryDiffBytes = heapDiffBytes + managedDiffBytes +
networkDiffBytes;
+
+ // Update total memory according to memory diffs
+ final MemorySize totalMemory =
+ new MemorySize(maxMemoryBySpec.getBytes() -
memBudget.getRemaining());
+ if (totalMemory.compareTo(MemorySize.ZERO) <= 0) {
+ LOG.warn("Invalid total memory configuration: {}", totalMemory);
return EMPTY_CONFIG;
}
// Prepare the tuning config for new configuration values
- var tuningConfig = new Configuration();
- // Update total memory according to new heap size
- // Adjust the total container memory and the JVM heap size accordingly.
- tuningConfig.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, totalMemory);
- // Framework and Task heap memory configs add up together yield the
max heap memory.
- // To simplify the calculation, set the framework heap memory to zero.
- tuningConfig.set(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY,
MemorySize.ZERO);
- tuningConfig.set(TaskManagerOptions.TASK_HEAP_MEMORY, newHeapSize);
-
- // All memory options which can be configured via fractions need to be
set to their
- // absolute values or, if there is no absolute setting, the fractions
need to be
- // re-calculated.
- MemorySize managedMemory = memSpecs.getFlinkMemory().getManaged();
- if (shouldTransferHeapToManagedMemory(config, heapDiffBytes)) {
- // If RocksDB is configured, give back the heap memory as managed
memory to RocksDB
- MemorySize newManagedMemory =
- new MemorySize(managedMemory.getBytes() +
Math.abs(heapDiffBytes));
- LOG.info(
- "Increasing managed memory size from {} to {}",
- managedMemory,
- newManagedMemory);
- tuningConfig.set(TaskManagerOptions.MANAGED_MEMORY_SIZE,
newManagedMemory);
- } else {
- tuningConfig.set(TaskManagerOptions.MANAGED_MEMORY_SIZE,
managedMemory);
- }
-
- tuningConfig.set(
+ var tuningConfig = new ConfigChanges();
+ // Adjust the total container memory
+ tuningConfig.addOverride(TaskManagerOptions.TOTAL_PROCESS_MEMORY,
totalMemory);
+ // We do not set the framework/task heap memory because those are
automatically derived from
+ // setting the other mandatory memory options for managed memory,
network, metaspace and jvm
+ // overhead. However, we do precise accounting for heap memory above.
In contrast to other
+ // memory pools, there are no fractional variants for heap memory.
Setting the absolute heap
+ // memory options could cause invalid configuration states when users
adapt the total amount
+ // of memory. We also need to take care to remove any user-provided
overrides for those.
+ tuningConfig.addRemoval(TaskManagerOptions.TASK_HEAP_MEMORY);
+ // Set default to zero because we already account for heap via task
heap.
+ tuningConfig.addOverride(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY,
MemorySize.ZERO);
+
+ MemorySize flinkMemorySize =
+ new MemorySize(
+ memSpecs.getTotalFlinkMemorySize().getBytes() +
flinkMemoryDiffBytes);
+
+ // All memory options which can be configured via fractions need to be
re-calculated.
+ tuningConfig.addOverride(
+ TaskManagerOptions.MANAGED_MEMORY_FRACTION,
+ getFraction(newManagedSize, flinkMemorySize));
+ tuningConfig.addRemoval(TaskManagerOptions.MANAGED_MEMORY_SIZE);
+
+ tuningConfig.addOverride(
TaskManagerOptions.NETWORK_MEMORY_FRACTION,
- getFraction(
- memSpecs.getFlinkMemory().getNetwork(),
- new MemorySize(
- memSpecs.getTotalFlinkMemorySize().getBytes()
+ heapDiffBytes)));
- tuningConfig.set(
+ getFraction(newNetworkSize, flinkMemorySize));
+
+ tuningConfig.addOverride(
TaskManagerOptions.JVM_OVERHEAD_FRACTION,
getFraction(memSpecs.getJvmOverheadSize(), totalMemory));
+ tuningConfig.addOverride(TaskManagerOptions.JVM_METASPACE,
newMetaspaceSize);
+
eventHandler.handleEvent(
context,
AutoScalerEventHandler.Type.Normal,
@@ -151,39 +189,47 @@ public class MemoryTuning {
return tuningConfig;
}
- private static MemorySize determineNewHeapSize(
- EvaluatedMetrics evaluatedMetrics, Configuration config,
MemorySize maxHeapSize) {
+ private static MemorySize determineNewSize(
+ MemorySize usage, Configuration config, MemoryBudget memoryBudget)
{
- double overheadFactor = 1 +
config.get(AutoScalerOptions.MEMORY_TUNING_HEAP_OVERHEAD);
- long heapTargetSizeBytes =
- (long) (getHeapUsed(evaluatedMetrics).getBytes() *
overheadFactor);
+ double overheadFactor = 1 +
config.get(AutoScalerOptions.MEMORY_TUNING_OVERHEAD);
+ long targetSizeBytes = (long) (usage.getBytes() * overheadFactor);
- // Apply min/max heap size limits
- heapTargetSizeBytes =
- Math.min(
- Math.max(
- // Lower limit is the minimum configured heap
size
-
config.get(AutoScalerOptions.MEMORY_TUNING_MIN_HEAP).getBytes(),
- heapTargetSizeBytes),
- // Upper limit is the original max heap size in the
spec
- maxHeapSize.getBytes());
+ // Upper limit is the available memory budget
+ targetSizeBytes = memoryBudget.budget(targetSizeBytes);
- return new MemorySize(heapTargetSizeBytes);
+ return new MemorySize(targetSizeBytes);
}
- private static MemorySize getHeapUsed(EvaluatedMetrics evaluatedMetrics) {
- var globalMetrics = evaluatedMetrics.getGlobalMetrics();
- MemorySize heapUsed =
- new MemorySize((long)
globalMetrics.get(ScalingMetric.HEAP_USED).getAverage());
- LOG.info("TM heap used size: {}", heapUsed);
- return heapUsed;
+ private static MemorySize adjustManagedMemory(
+ MemorySize managedMemoryUsage,
+ MemorySize managedMemoryConfigured,
+ Configuration config,
+ MemoryBudget memBudget) {
+ // Managed memory usage can't accurately be measured yet.
+ // It is either zero (no usage) or an opaque amount of memory
(RocksDB).
+ if (managedMemoryUsage.compareTo(MemorySize.ZERO) <= 0) {
+ return MemorySize.ZERO;
+ } else if
(config.get(AutoScalerOptions.MEMORY_TUNING_MAXIMIZE_MANAGED_MEMORY)) {
+ long maxManagedMemorySize = memBudget.budget(Long.MAX_VALUE);
+ return new MemorySize(maxManagedMemorySize);
+ } else {
+ return managedMemoryConfigured;
+ }
+ }
+
+ private static MemorySize adjustNetworkMemory(MemorySize usage,
MemoryBudget memBudget) {
+ // TODO mxm: Follow-up to tune network memory via
+ // https://issues.apache.org/jira/browse/FLINK-34471
+ long networkBytes = memBudget.budget(usage.getBytes());
+ return new MemorySize(networkBytes);
}
- private static boolean shouldTransferHeapToManagedMemory(
- Configuration config, long heapDiffBytes) {
- return
config.get(AutoScalerOptions.MEMORY_TUNING_TRANSFER_HEAP_TO_MANAGED)
- && heapDiffBytes < 0
- &&
"rocksdb".equalsIgnoreCase(config.get(StateBackendOptions.STATE_BACKEND));
+ private static MemorySize getUsage(
+ ScalingMetric scalingMetric, Map<ScalingMetric,
EvaluatedScalingMetric> globalMetrics) {
+ MemorySize heapUsed = new MemorySize((long)
globalMetrics.get(scalingMetric).getAverage());
+ LOG.debug("{}: {}", scalingMetric, heapUsed);
+ return heapUsed;
}
public static MemorySize getTotalMemory(Configuration config,
JobAutoScalerContext<?> ctx) {
@@ -194,28 +240,6 @@ public class MemoryTuning {
return ctx.getTaskManagerMemory().orElse(MemorySize.ZERO);
}
- private static MemorySize adjustTotalTmMemory(JobAutoScalerContext<?> ctx,
long heapDiffBytes) {
-
- var specTaskManagerMemory =
ctx.getTaskManagerMemory().orElse(MemorySize.ZERO);
- if (specTaskManagerMemory.compareTo(MemorySize.ZERO) <= 0) {
- LOG.warn("Spec TaskManager memory size could not be determined.");
- return MemorySize.ZERO;
- }
-
- if (shouldTransferHeapToManagedMemory(ctx.getConfiguration(),
heapDiffBytes)) {
- // Total size does not change
- return specTaskManagerMemory;
- }
-
- long newTotalMemBytes = specTaskManagerMemory.getBytes() +
heapDiffBytes;
- // TM container memory can never grow beyond the user-specified max
memory
- newTotalMemBytes = Math.min(newTotalMemBytes,
specTaskManagerMemory.getBytes());
-
- MemorySize totalMemory = new MemorySize(newTotalMemBytes);
- LOG.info("Setting new total TaskManager memory to {}", totalMemory);
- return totalMemory;
- }
-
private static ProcessMemoryOptions getMemoryOptions() {
return new ProcessMemoryOptions(
Arrays.asList(
@@ -238,14 +262,27 @@ public class MemoryTuning {
}
/** Format config such that it can be directly used as a Flink
configuration. */
- private static String formatConfig(Configuration config) {
+ private static String formatConfig(ConfigChanges config) {
var sb = new StringBuilder();
- for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
+ for (Map.Entry<String, String> entry :
config.getOverrides().entrySet()) {
sb.append(entry.getKey())
.append(": ")
.append(entry.getValue())
.append(System.lineSeparator());
}
+ if (!config.getRemovals().isEmpty()) {
+ sb.append("Remove the following config entries if present: [");
+ boolean first = true;
+ for (String toRemove : config.getRemovals()) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(", ");
+ }
+ sb.append(toRemove);
+ }
+ sb.append("]");
+ }
return sb.toString();
}
}
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
index 80bbe75a..e3ed6625 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
@@ -30,9 +30,8 @@ import org.apache.flink.autoscaler.state.AutoScalerStateStore;
import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
import org.apache.flink.autoscaler.topology.JobTopology;
import org.apache.flink.autoscaler.topology.VertexInfo;
+import org.apache.flink.autoscaler.tuning.ConfigChanges;
import org.apache.flink.client.program.rest.RestClusterClient;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.metrics.Gauge;
@@ -306,23 +305,27 @@ public class JobAutoScalerImplTest {
null, null, null, eventCollector, scalingRealizer,
stateStore);
// Initially we should return empty overrides, do not crate any state
- assertThat(stateStore.getConfigOverrides(context).toMap()).isEmpty();
+
assertThat(stateStore.getConfigChanges(context).getOverrides()).isEmpty();
- var config = new Configuration();
- config.set(TaskManagerOptions.TASK_HEAP_MEMORY, new MemorySize(42));
- stateStore.storeConfigOverrides(context, config);
+ ConfigChanges config = new ConfigChanges();
+ config.addOverride(TaskManagerOptions.MANAGED_MEMORY_FRACTION, 0.42f);
+ config.addRemoval(TaskManagerOptions.TASK_HEAP_MEMORY);
+ stateStore.storeConfigChanges(context, config);
stateStore.flush(context);
autoscaler.applyConfigOverrides(context);
- assertThat(getEvent().getConfigOverrides().toMap())
-
.containsExactly(entry(TaskManagerOptions.TASK_HEAP_MEMORY.key(), "42 bytes"));
- assertThat(stateStore.getConfigOverrides(context)).isEqualTo(config);
+ var event = getEvent();
+ assertThat(event.getConfigChanges().getOverrides())
+
.containsExactly(entry(TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(),
"0.42"));
+ assertThat(event.getConfigChanges().getRemovals())
+ .containsExactly(TaskManagerOptions.TASK_HEAP_MEMORY.key());
+ assertThat(stateStore.getConfigChanges(context)).isEqualTo(config);
// Disabling autoscaler should clear overrides
context.getConfiguration().setString(AUTOSCALER_ENABLED.key(),
"false");
autoscaler.scale(context);
autoscaler.applyConfigOverrides(context);
- assertThat(getEvent().getConfigOverrides().toMap()).isEmpty();
+ assertThat(getEvent().getConfigChanges().getOverrides()).isEmpty();
}
@Test
@@ -368,17 +371,6 @@ public class JobAutoScalerImplTest {
assertEquals(expectedOverrides,
scalingEvent.getParallelismOverrides());
}
- @Nullable
- private TestingScalingRealizer.Event<JobID, JobAutoScalerContext<JobID>>
getEvent(
- Map<String, String> expectedOverrides) {
- var scalingEvent = getEvent();
- if (expectedOverrides == null) {
- assertThat(scalingEvent).isNull();
- return null;
- }
- return scalingEvent;
- }
-
@Nullable
private TestingScalingRealizer.Event<JobID, JobAutoScalerContext<JobID>>
getEvent() {
return scalingRealizer.events.poll();
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java
index 863950af..0cf5b15a 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java
@@ -60,6 +60,8 @@ public class RestApiMetricsCollectorTest {
private static final String GC_METRIC_NAME =
"Status.JVM.GarbageCollector.All.TimeMsPerSecond";
private static final String HEAP_MAX_NAME = "Status.JVM.Memory.Heap.Max";
private static final String HEAP_USED_NAME = "Status.JVM.Memory.Heap.Used";
+ private static final String MANAGED_MEMORY_NAME =
"Status.Flink.Memory.Managed.Used";
+ private static final String METASPACE_MEMORY_NAME =
"Status.JVM.Memory.Metaspace.Used";
@Test
public void testAggregateMultiplePendingRecordsMetricsPerSource() throws
Exception {
@@ -233,11 +235,23 @@ public class RestApiMetricsCollectorTest {
// Test only heap metrics available
var heapMax = new AggregatedMetric(HEAP_MAX_NAME, null, 100., null,
null);
var heapUsed = new AggregatedMetric(HEAP_USED_NAME, null, 50., null,
null);
+ var managedUsed = new AggregatedMetric(MANAGED_MEMORY_NAME, null, 42.,
null, null);
+ var metaspaceUsed = new AggregatedMetric(METASPACE_MEMORY_NAME, null,
11., null, null);
metricValues.put(HEAP_MAX_NAME, heapMax);
metricValues.put(HEAP_USED_NAME, heapUsed);
+ metricValues.put(MANAGED_MEMORY_NAME, managedUsed);
+ metricValues.put(METASPACE_MEMORY_NAME, metaspaceUsed);
assertMetricsEquals(
- Map.of(FlinkMetric.HEAP_MAX, heapMax, FlinkMetric.HEAP_USED,
heapUsed),
+ Map.of(
+ FlinkMetric.HEAP_MEMORY_MAX,
+ heapMax,
+ FlinkMetric.HEAP_MEMORY_USED,
+ heapUsed,
+ FlinkMetric.MANAGED_MEMORY_USED,
+ managedUsed,
+ FlinkMetric.METASPACE_MEMORY_USED,
+ metaspaceUsed),
collector.queryTmMetrics(context));
collector.cleanup(context.getJobKey());
@@ -247,10 +261,14 @@ public class RestApiMetricsCollectorTest {
assertMetricsEquals(
Map.of(
- FlinkMetric.HEAP_MAX,
+ FlinkMetric.HEAP_MEMORY_MAX,
heapMax,
- FlinkMetric.HEAP_USED,
+ FlinkMetric.HEAP_MEMORY_USED,
heapUsed,
+ FlinkMetric.MANAGED_MEMORY_USED,
+ managedUsed,
+ FlinkMetric.METASPACE_MEMORY_USED,
+ metaspaceUsed,
FlinkMetric.TOTAL_GC_TIME_PER_SEC,
gcTime),
collector.queryTmMetrics(context));
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
index da116223..ef5d99f2 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
@@ -273,8 +273,12 @@ public class ScalingExecutorTest {
Map.of(
ScalingMetric.NUM_TASK_SLOTS_USED,
EvaluatedScalingMetric.of(9),
- ScalingMetric.HEAP_USED,
+ ScalingMetric.HEAP_MEMORY_USED,
EvaluatedScalingMetric.avg(MemorySize.parse("5
Gb").getBytes()),
+ ScalingMetric.MANAGED_MEMORY_USED,
+ EvaluatedScalingMetric.avg(MemorySize.parse("2
Gb").getBytes()),
+ ScalingMetric.METASPACE_MEMORY_USED,
+ EvaluatedScalingMetric.avg(MemorySize.parse("300
mb").getBytes()),
ScalingMetric.HEAP_MAX_USAGE_RATIO,
EvaluatedScalingMetric.of(Double.NaN),
ScalingMetric.GC_PRESSURE,
@@ -286,12 +290,21 @@ public class ScalingExecutorTest {
assertTrue(
scalingDecisionExecutor.scaleResource(
context, metrics, new HashMap<>(), new
ScalingTracking(), now));
- assertEquals(
- "6.000gb (6442450944 bytes)",
- stateStore
- .getConfigOverrides(context)
- .get(TaskManagerOptions.TASK_HEAP_MEMORY)
- .toHumanReadableString());
+ assertThat(stateStore.getConfigChanges(context).getOverrides())
+ .containsExactlyInAnyOrderEntriesOf(
+ Map.of(
+
TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(),
+ "0.561",
+
TaskManagerOptions.NETWORK_MEMORY_FRACTION.key(),
+ "0.14",
+ TaskManagerOptions.JVM_METASPACE.key(),
+ "360 mb",
+ TaskManagerOptions.JVM_OVERHEAD_FRACTION.key(),
+ "0.097",
+ TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key(),
+ "0 bytes",
+ TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(),
+ "11114905646 bytes"));
}
@ParameterizedTest
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
index c6b594a4..bfba0c0b 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
@@ -503,7 +503,11 @@ public class ScalingMetricEvaluatorTest {
EvaluatedScalingMetric.of(Double.NaN),
ScalingMetric.GC_PRESSURE,
EvaluatedScalingMetric.of(Double.NaN),
- ScalingMetric.HEAP_USED,
+ ScalingMetric.HEAP_MEMORY_USED,
+ EvaluatedScalingMetric.of(Double.NaN),
+ ScalingMetric.MANAGED_MEMORY_USED,
+ EvaluatedScalingMetric.of(Double.NaN),
+ ScalingMetric.METASPACE_MEMORY_USED,
EvaluatedScalingMetric.of(Double.NaN),
ScalingMetric.NUM_TASK_SLOTS_USED,
EvaluatedScalingMetric.of(Double.NaN)),
@@ -518,16 +522,24 @@ public class ScalingMetricEvaluatorTest {
0.5,
ScalingMetric.GC_PRESSURE,
0.6,
- ScalingMetric.HEAP_USED,
- 512.)));
+ ScalingMetric.HEAP_MEMORY_USED,
+ 512.,
+ ScalingMetric.MANAGED_MEMORY_USED,
+ 420.,
+ ScalingMetric.METASPACE_MEMORY_USED,
+ 110.)));
assertEquals(
Map.of(
ScalingMetric.HEAP_MAX_USAGE_RATIO,
new EvaluatedScalingMetric(0.5, 0.5),
ScalingMetric.GC_PRESSURE,
EvaluatedScalingMetric.of(0.6),
- ScalingMetric.HEAP_USED,
+ ScalingMetric.HEAP_MEMORY_USED,
new EvaluatedScalingMetric(512, 512),
+ ScalingMetric.MANAGED_MEMORY_USED,
+ new EvaluatedScalingMetric(420, 420),
+ ScalingMetric.METASPACE_MEMORY_USED,
+ new EvaluatedScalingMetric(110, 110),
ScalingMetric.NUM_TASK_SLOTS_USED,
EvaluatedScalingMetric.of(Double.NaN)),
ScalingMetricEvaluator.evaluateGlobalMetrics(globalMetrics));
@@ -541,8 +553,12 @@ public class ScalingMetricEvaluatorTest {
0.7,
ScalingMetric.GC_PRESSURE,
0.8,
- ScalingMetric.HEAP_USED,
+ ScalingMetric.HEAP_MEMORY_USED,
1024.,
+ ScalingMetric.MANAGED_MEMORY_USED,
+ 840.,
+ ScalingMetric.METASPACE_MEMORY_USED,
+ 220.,
ScalingMetric.NUM_TASK_SLOTS_USED,
42.)));
assertEquals(
@@ -551,8 +567,12 @@ public class ScalingMetricEvaluatorTest {
new EvaluatedScalingMetric(0.7, 0.6),
ScalingMetric.GC_PRESSURE,
EvaluatedScalingMetric.of(0.8),
- ScalingMetric.HEAP_USED,
+ ScalingMetric.HEAP_MEMORY_USED,
new EvaluatedScalingMetric(1024., 768.),
+ ScalingMetric.MANAGED_MEMORY_USED,
+ new EvaluatedScalingMetric(840., 630.),
+ ScalingMetric.METASPACE_MEMORY_USED,
+ new EvaluatedScalingMetric(220., 165.),
ScalingMetric.NUM_TASK_SLOTS_USED,
EvaluatedScalingMetric.of(42.)),
ScalingMetricEvaluator.evaluateGlobalMetrics(globalMetrics));
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
index 2ce95f70..c689a0a9 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
@@ -21,7 +21,6 @@ import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.topology.IOMetrics;
import org.apache.flink.autoscaler.topology.JobTopology;
import org.apache.flink.autoscaler.topology.VertexInfo;
-import org.apache.flink.autoscaler.tuning.MemoryTuning;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
@@ -247,40 +246,32 @@ public class ScalingMetricsTest {
@Test
public void testGlobalMetrics() {
Configuration conf = new Configuration();
- conf.set(AutoScalerOptions.MEMORY_TUNING_HEAP_TARGET,
MemoryTuning.HeapUsageTarget.AVG);
assertEquals(Map.of(), ScalingMetrics.computeGlobalMetrics(Map.of(),
Map.of(), conf));
assertEquals(
Map.of(),
ScalingMetrics.computeGlobalMetrics(
- Map.of(), Map.of(FlinkMetric.HEAP_USED, aggMax(100)),
conf));
+ Map.of(), Map.of(FlinkMetric.HEAP_MEMORY_USED,
aggMax(100)), conf));
+
assertEquals(
Map.of(
ScalingMetric.HEAP_MAX_USAGE_RATIO,
0.5,
- ScalingMetric.GC_PRESSURE,
- 0.25,
- ScalingMetric.HEAP_USED,
- 75.),
- ScalingMetrics.computeGlobalMetrics(
- Map.of(),
- Map.of(
- FlinkMetric.HEAP_USED,
- aggAvgMax(75, 100),
- FlinkMetric.HEAP_MAX,
- aggMax(200.),
- FlinkMetric.TOTAL_GC_TIME_PER_SEC,
- aggMax(250.)),
- conf));
-
- conf.set(AutoScalerOptions.MEMORY_TUNING_HEAP_TARGET,
MemoryTuning.HeapUsageTarget.MAX);
- assertEquals(
- Map.of(ScalingMetric.HEAP_MAX_USAGE_RATIO, 0.5,
ScalingMetric.HEAP_USED, 100.),
+ ScalingMetric.HEAP_MEMORY_USED,
+ 100.,
+ ScalingMetric.MANAGED_MEMORY_USED,
+ 133.,
+ ScalingMetric.METASPACE_MEMORY_USED,
+ 22.),
ScalingMetrics.computeGlobalMetrics(
Map.of(),
Map.of(
- FlinkMetric.HEAP_USED,
+ FlinkMetric.HEAP_MEMORY_USED,
aggAvgMax(75, 100),
- FlinkMetric.HEAP_MAX,
+ FlinkMetric.MANAGED_MEMORY_USED,
+ aggAvgMax(128, 133),
+ FlinkMetric.METASPACE_MEMORY_USED,
+ aggAvgMax(11, 22),
+ FlinkMetric.HEAP_MEMORY_MAX,
aggMax(200.)),
conf));
}
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/realizer/TestingScalingRealizer.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/realizer/TestingScalingRealizer.java
index d46c5bba..df0d1d55 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/realizer/TestingScalingRealizer.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/realizer/TestingScalingRealizer.java
@@ -18,7 +18,7 @@
package org.apache.flink.autoscaler.realizer;
import org.apache.flink.autoscaler.JobAutoScalerContext;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.autoscaler.tuning.ConfigChanges;
import lombok.Getter;
@@ -38,8 +38,8 @@ public class TestingScalingRealizer<KEY, Context extends
JobAutoScalerContext<KE
}
@Override
- public void realizeConfigOverrides(Context context, Configuration
configOverrides) {
- events.add(new Event<>(context, configOverrides));
+ public void realizeConfigOverrides(Context context, ConfigChanges
configChanges) {
+ events.add(new Event<>(context, configChanges));
}
/** The collected event. */
@@ -49,16 +49,16 @@ public class TestingScalingRealizer<KEY, Context extends
JobAutoScalerContext<KE
@Getter private Map<String, String> parallelismOverrides;
- @Getter private Configuration configOverrides;
+ @Getter private ConfigChanges configChanges;
public Event(Context context, Map<String, String>
parallelismOverrides) {
this.context = context;
this.parallelismOverrides = parallelismOverrides;
}
- public Event(Context context, Configuration configOverrides) {
+ public Event(Context context, ConfigChanges configChanges) {
this.context = context;
- this.configOverrides = configOverrides;
+ this.configChanges = configChanges;
}
@Override
@@ -69,7 +69,7 @@ public class TestingScalingRealizer<KEY, Context extends
JobAutoScalerContext<KE
+ ", parallelismOverrides="
+ parallelismOverrides
+ ", configOverrides="
- + configOverrides
+ + configChanges
+ '}';
}
}
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java
index 11db27c0..da0f0c2c 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java
@@ -21,7 +21,7 @@ import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.ScalingSummary;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.metrics.CollectedMetrics;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.autoscaler.tuning.ConfigChanges;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.junit.jupiter.api.Assertions;
@@ -184,26 +184,26 @@ public abstract class AbstractAutoScalerStateStoreTest<
new JobVertexID(),
new TreeMap<>(Map.of(Instant.now(), new
ScalingSummary()))));
stateStore.storeParallelismOverrides(ctx, Map.of(new
JobVertexID().toHexString(), "23"));
- stateStore.storeConfigOverrides(
- ctx, Configuration.fromMap(Map.of("config.value", "value")));
+ stateStore.storeConfigChanges(
+ ctx, new ConfigChanges().addOverride("config.value", "value"));
assertThat(stateStore.getCollectedMetrics(ctx)).isNotEmpty();
assertThat(stateStore.getScalingHistory(ctx)).isNotEmpty();
assertThat(stateStore.getParallelismOverrides(ctx)).isNotEmpty();
- assertThat(stateStore.getConfigOverrides(ctx).toMap()).isNotEmpty();
+
assertThat(stateStore.getConfigChanges(ctx).getOverrides()).isNotEmpty();
stateStore.flush(ctx);
assertThat(stateStore.getCollectedMetrics(ctx)).isNotEmpty();
assertThat(stateStore.getScalingHistory(ctx)).isNotEmpty();
assertThat(stateStore.getParallelismOverrides(ctx)).isNotEmpty();
- assertThat(stateStore.getConfigOverrides(ctx).toMap()).isNotEmpty();
+
assertThat(stateStore.getConfigChanges(ctx).getOverrides()).isNotEmpty();
stateStore.clearAll(ctx);
assertThat(stateStore.getCollectedMetrics(ctx)).isEmpty();
assertThat(stateStore.getScalingHistory(ctx)).isEmpty();
assertThat(stateStore.getParallelismOverrides(ctx)).isEmpty();
- assertThat(stateStore.getConfigOverrides(ctx).toMap()).isEmpty();
+ assertThat(stateStore.getConfigChanges(ctx).getOverrides()).isEmpty();
}
}
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/MemoryTuningTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/MemoryTuningTest.java
index 1bbf518f..3e2583a1 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/MemoryTuningTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/MemoryTuningTest.java
@@ -25,15 +25,16 @@ import
org.apache.flink.autoscaler.event.TestingEventCollector;
import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.tuning.ConfigChanges;
import org.apache.flink.autoscaler.tuning.MemoryTuning;
import org.apache.flink.configuration.MemorySize;
-import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.junit.jupiter.api.Test;
import java.time.Duration;
+import java.util.HashMap;
import java.util.Map;
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
@@ -71,61 +72,93 @@ public class MemoryTuningTest {
var globalMetrics =
Map.of(
- ScalingMetric.HEAP_USED,
-
EvaluatedScalingMetric.avg(MemorySize.ofMebiBytes(5096).getBytes()));
+ ScalingMetric.HEAP_MEMORY_USED,
+
EvaluatedScalingMetric.avg(MemorySize.ofMebiBytes(5096).getBytes()),
+ ScalingMetric.MANAGED_MEMORY_USED,
+
EvaluatedScalingMetric.avg(MemorySize.ofMebiBytes(10000).getBytes()),
+ ScalingMetric.METASPACE_MEMORY_USED,
+
EvaluatedScalingMetric.avg(MemorySize.ofMebiBytes(100).getBytes()));
var metrics = new EvaluatedMetrics(vertexMetrics, globalMetrics);
- Map<String, String> overrides =
- MemoryTuning.tuneTaskManagerHeapMemory(context, metrics,
eventHandler).toMap();
+ ConfigChanges configChanges =
+ MemoryTuning.tuneTaskManagerHeapMemory(context, metrics,
eventHandler);
// Test reducing overall memory
- assertThat(overrides)
+ assertThat(configChanges.getOverrides())
.containsExactlyInAnyOrderEntriesOf(
Map.of(
- TaskManagerOptions.TASK_HEAP_MEMORY.key(),
- "6412251955 bytes",
- TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key(),
- "0 bytes",
- TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
- "12348031160 bytes",
- TaskManagerOptions.JVM_OVERHEAD_FRACTION.key(),
- "0.046",
+
TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(),
+ "0.562",
TaskManagerOptions.NETWORK_MEMORY_FRACTION.key(),
"0.14",
+ TaskManagerOptions.JVM_METASPACE.key(),
+ "120 mb",
+ TaskManagerOptions.JVM_OVERHEAD_FRACTION.key(),
+ "0.099",
+ TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key(),
+ "0 bytes",
TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(),
- "23323685913 bytes"));
+ "10833048417 bytes"));
+
+ assertThat(configChanges.getRemovals())
+ .containsExactlyInAnyOrder(
+ TaskManagerOptions.TASK_HEAP_MEMORY.key(),
+ TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
+ TaskManagerOptions.MANAGED_MEMORY_SIZE
+ .fallbackKeys()
+ .iterator()
+ .next()
+ .getKey());
assertThat(eventHandler.events.poll().getMessage())
.startsWith(
"Memory tuning recommends the following configuration
(automatic tuning is enabled):");
- // Test giving back memory to RocksDB
- config.set(AutoScalerOptions.MEMORY_TUNING_TRANSFER_HEAP_TO_MANAGED,
true);
- config.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
- overrides = MemoryTuning.tuneTaskManagerHeapMemory(context, metrics,
eventHandler).toMap();
- assertThat(overrides)
+ // Test maximize managed memory
+ config.set(AutoScalerOptions.MEMORY_TUNING_MAXIMIZE_MANAGED_MEMORY,
true);
+ configChanges = MemoryTuning.tuneTaskManagerHeapMemory(context,
metrics, eventHandler);
+ assertThat(configChanges.getOverrides())
.containsExactlyInAnyOrderEntriesOf(
Map.of(
- TaskManagerOptions.TASK_HEAP_MEMORY.key(),
- "6412251955 bytes",
- TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key(),
- "0 bytes",
- TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
- "21236599967 bytes",
+
TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(),
+ "0.689",
+
TaskManagerOptions.NETWORK_MEMORY_FRACTION.key(),
+ "0.1",
+ TaskManagerOptions.JVM_METASPACE.key(),
+ "120 mb",
TaskManagerOptions.JVM_OVERHEAD_FRACTION.key(),
"0.033",
-
TaskManagerOptions.NETWORK_MEMORY_FRACTION.key(),
- "0.14",
+ TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key(),
+ "0 bytes",
TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(),
totalMemory.toString()));
- assertThat(eventHandler.events.poll().getMessage())
- .startsWith(
- "Memory tuning recommends the following configuration
(automatic tuning is enabled):");
+ // Test managed memory is zero
+ metrics = new EvaluatedMetrics(vertexMetrics, new
HashMap<>(globalMetrics));
+ metrics.getGlobalMetrics()
+ .put(ScalingMetric.MANAGED_MEMORY_USED,
EvaluatedScalingMetric.avg(0));
+ configChanges = MemoryTuning.tuneTaskManagerHeapMemory(context,
metrics, eventHandler);
+ assertThat(configChanges.getOverrides())
+ .containsExactlyInAnyOrderEntriesOf(
+ Map.of(
+
TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(),
+ "0.0",
+
TaskManagerOptions.NETWORK_MEMORY_FRACTION.key(),
+ "0.32",
+ TaskManagerOptions.JVM_METASPACE.key(),
+ "120 mb",
+ TaskManagerOptions.JVM_OVERHEAD_FRACTION.key(),
+ "0.099",
+ TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key(),
+ "0 bytes",
+ TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(),
+ "10833048417 bytes"));
// Test tuning disabled
config.set(AutoScalerOptions.MEMORY_TUNING_ENABLED, false);
- assertThat(MemoryTuning.tuneTaskManagerHeapMemory(context, metrics,
eventHandler).toMap())
+ assertThat(
+ MemoryTuning.tuneTaskManagerHeapMemory(context,
metrics, eventHandler)
+ .getOverrides())
.isEmpty();
assertThat(eventHandler.events.poll().getMessage())
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java
index 8eb6ebdc..16455bf1 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java
@@ -18,6 +18,7 @@
package org.apache.flink.kubernetes.operator.autoscaler;
import org.apache.flink.autoscaler.realizer.ScalingRealizer;
+import org.apache.flink.autoscaler.tuning.ConfigChanges;
import org.apache.flink.autoscaler.tuning.MemoryTuning;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
@@ -55,17 +56,22 @@ public class KubernetesScalingRealizer
@Override
public void realizeConfigOverrides(
- KubernetesJobAutoScalerContext context, Configuration
configOverrides) {
+ KubernetesJobAutoScalerContext context, ConfigChanges
configChanges) {
if (!(context.getResource() instanceof FlinkDeployment)) {
// We can't adjust the configuration of non-job deployments.
return;
}
FlinkDeployment flinkDeployment = ((FlinkDeployment)
context.getResource());
// Apply config overrides
-
flinkDeployment.getSpec().getFlinkConfiguration().putAll(configOverrides.toMap());
+ Map<String, String> flinkConf =
flinkDeployment.getSpec().getFlinkConfiguration();
+ for (String keyToRemove : configChanges.getRemovals()) {
+ flinkConf.remove(keyToRemove);
+ }
+ flinkConf.putAll(configChanges.getOverrides());
// Update total memory in spec
- var totalMemoryOverride = MemoryTuning.getTotalMemory(configOverrides,
context);
+ var totalMemoryOverride =
+ MemoryTuning.getTotalMemory(Configuration.fromMap(flinkConf),
context);
if (totalMemoryOverride.compareTo(MemorySize.ZERO) <= 0) {
LOG.warn("Total memory override {} is not valid",
totalMemoryOverride);
return;
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
index 53ba7eb4..fe812bef 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
@@ -22,8 +22,8 @@ import org.apache.flink.autoscaler.ScalingSummary;
import org.apache.flink.autoscaler.ScalingTracking;
import org.apache.flink.autoscaler.metrics.CollectedMetrics;
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.autoscaler.tuning.ConfigChanges;
import org.apache.flink.autoscaler.utils.AutoScalerSerDeModule;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import
org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -43,6 +43,8 @@ import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -195,22 +197,22 @@ public class KubernetesAutoScalerStateStore
@NotNull
@Override
- public Configuration getConfigOverrides(KubernetesJobAutoScalerContext
jobContext) {
+ public ConfigChanges getConfigChanges(KubernetesJobAutoScalerContext
jobContext) {
return configMapStore
.getSerializedState(jobContext, CONFIG_OVERRIDES_KEY)
.map(KubernetesAutoScalerStateStore::deserializeConfigOverrides)
- .orElse(new Configuration());
+ .orElse(new ConfigChanges());
}
@Override
- public void storeConfigOverrides(
- KubernetesJobAutoScalerContext jobContext, Configuration
overrides) {
+ public void storeConfigChanges(
+ KubernetesJobAutoScalerContext jobContext, ConfigChanges
overrides) {
configMapStore.putSerializedState(
jobContext, CONFIG_OVERRIDES_KEY,
serializeConfigOverrides(overrides));
}
@Override
- public void removeConfigOverrides(KubernetesJobAutoScalerContext
jobContext) {
+ public void removeConfigChanges(KubernetesJobAutoScalerContext jobContext)
{
configMapStore.removeSerializedState(jobContext, CONFIG_OVERRIDES_KEY);
}
@@ -276,12 +278,24 @@ public class KubernetesAutoScalerStateStore
return ConfigurationUtils.convertValue(overrides, Map.class);
}
- private static String serializeConfigOverrides(Configuration overrides) {
- return ConfigurationUtils.convertValue(overrides.toMap(),
String.class);
+ @Nullable
+ private static String serializeConfigOverrides(ConfigChanges
configChanges) {
+ try {
+ return YAML_MAPPER.writeValueAsString(configChanges);
+ } catch (Exception e) {
+ LOG.error("Failed to serialize ConfigOverrides", e);
+ return null;
+ }
}
- private static Configuration deserializeConfigOverrides(String overrides) {
- return
Configuration.fromMap(ConfigurationUtils.convertValue(overrides, Map.class));
+ @Nullable
+ private static ConfigChanges deserializeConfigOverrides(String
configOverrides) {
+ try {
+ return YAML_MAPPER.readValue(configOverrides, new
TypeReference<>() {});
+ } catch (Exception e) {
+ LOG.error("Failed to deserialize ConfigOverrides", e);
+ return null;
+ }
}
@VisibleForTesting
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java
index 9a448304..994732ab 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java
@@ -17,7 +17,7 @@
package org.apache.flink.kubernetes.operator.autoscaler;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.autoscaler.tuning.ConfigChanges;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.TaskManagerOptions;
@@ -68,9 +68,9 @@ public class KubernetesScalingRealizerTest {
KubernetesJobAutoScalerContext ctx =
TestingKubernetesAutoscalerUtils.createContext("test", null);
- var overrides = new Configuration();
+ ConfigChanges overrides = new ConfigChanges();
MemorySize memoryOverride = MemorySize.ofMebiBytes(4096);
- overrides.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, memoryOverride);
+ overrides.addOverride(TaskManagerOptions.TOTAL_PROCESS_MEMORY,
memoryOverride);
new KubernetesScalingRealizer().realizeConfigOverrides(ctx, overrides);
assertThat(ctx.getResource()).isInstanceOf(FlinkDeployment.class);