This is an automated email from the ASF dual-hosted git repository.
mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 6a860651 [FLINK-34471] Tune network memory with memory tuning (#781)
6a860651 is described below
commit 6a8606517fb9f38f3a486c022f925368c9b4d9ae
Author: Maximilian Michels <[email protected]>
AuthorDate: Fri Feb 23 11:35:34 2024 +0100
[FLINK-34471] Tune network memory with memory tuning (#781)
---
.../apache/flink/autoscaler/JobAutoScalerImpl.java | 2 +-
.../apache/flink/autoscaler/ScalingExecutor.java | 10 +-
.../flink/autoscaler/tuning/MemoryTuning.java | 79 +++++++++++---
.../flink/autoscaler/utils/ResourceCheckUtils.java | 2 +-
.../MetricsCollectionAndEvaluationTest.java | 21 +++-
.../flink/autoscaler/ScalingExecutorTest.java | 114 +++++++++++++++++----
.../flink/autoscaler/utils/MemoryTuningTest.java | 64 +++++++++---
7 files changed, 236 insertions(+), 56 deletions(-)
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
index 5bb7b791..cc56eb3e 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
@@ -216,7 +216,7 @@ public class JobAutoScalerImpl<KEY, Context extends
JobAutoScalerContext<KEY>>
var parallelismChanged =
scalingExecutor.scaleResource(
- ctx, evaluatedMetrics, scalingHistory,
scalingTracking, now);
+ ctx, evaluatedMetrics, scalingHistory,
scalingTracking, now, jobTopology);
if (parallelismChanged) {
autoscalerMetrics.incrementScaling();
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
index 4ff9ae36..ef401ffe 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
@@ -26,6 +26,7 @@ import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.autoscaler.resources.NoopResourceCheck;
import org.apache.flink.autoscaler.resources.ResourceCheck;
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.autoscaler.topology.JobTopology;
import org.apache.flink.autoscaler.tuning.MemoryTuning;
import org.apache.flink.autoscaler.utils.CalendarUtils;
import org.apache.flink.autoscaler.utils.ResourceCheckUtils;
@@ -94,7 +95,8 @@ public class ScalingExecutor<KEY, Context extends
JobAutoScalerContext<KEY>> {
EvaluatedMetrics evaluatedMetrics,
Map<JobVertexID, SortedMap<Instant, ScalingSummary>>
scalingHistory,
ScalingTracking scalingTracking,
- Instant now)
+ Instant now,
+ JobTopology jobTopology)
throws Exception {
var conf = context.getConfiguration();
var restartTime = scalingTracking.getMaxRestartTimeOrDefault(conf);
@@ -120,7 +122,11 @@ public class ScalingExecutor<KEY, Context extends
JobAutoScalerContext<KEY>> {
var configOverrides =
MemoryTuning.tuneTaskManagerHeapMemory(
- context, evaluatedMetrics, autoScalerEventHandler);
+ context,
+ evaluatedMetrics,
+ jobTopology,
+ scalingSummaries,
+ autoScalerEventHandler);
if (scalingWouldExceedClusterResources(
configOverrides.applyOverrides(conf),
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java
index 8ca0dcc3..af5fd8a1 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java
@@ -18,16 +18,22 @@
package org.apache.flink.autoscaler.tuning;
import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.ScalingSummary;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.VertexInfo;
+import org.apache.flink.autoscaler.utils.ResourceCheckUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.util.config.memory.CommonProcessMemorySpec;
import
org.apache.flink.runtime.util.config.memory.JvmMetaspaceAndOverheadOptions;
import org.apache.flink.runtime.util.config.memory.ProcessMemoryOptions;
@@ -38,6 +44,8 @@ import
org.apache.flink.runtime.util.config.memory.taskmanager.TaskExecutorFlink
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
import java.util.Arrays;
import java.util.Map;
@@ -63,6 +71,8 @@ public class MemoryTuning {
public static ConfigChanges tuneTaskManagerHeapMemory(
JobAutoScalerContext<?> context,
EvaluatedMetrics evaluatedMetrics,
+ JobTopology jobTopology,
+ Map<JobVertexID, ScalingSummary> scalingSummaries,
AutoScalerEventHandler eventHandler) {
// Please note that this config is the original configuration created
from the user spec.
@@ -103,7 +113,13 @@ public class MemoryTuning {
var globalMetrics = evaluatedMetrics.getGlobalMetrics();
// The order matters in case the memory usage is higher than the
maximum available memory.
// Managed memory comes last because it can grow arbitrary for RocksDB
jobs.
- MemorySize newNetworkSize = adjustNetworkMemory(specNetworkSize,
memBudget);
+ MemorySize newNetworkSize =
+ adjustNetworkMemory(
+ jobTopology,
+ ResourceCheckUtils.computeNewParallelisms(
+ scalingSummaries,
evaluatedMetrics.getVertexMetrics()),
+ config,
+ memBudget);
MemorySize newHeapSize =
determineNewSize(getUsage(HEAP_MEMORY_USED, globalMetrics),
config, memBudget);
MemorySize newMetaspaceSize =
@@ -159,9 +175,8 @@ public class MemoryTuning {
getFraction(newManagedSize, flinkMemorySize));
tuningConfig.addRemoval(TaskManagerOptions.MANAGED_MEMORY_SIZE);
- tuningConfig.addOverride(
- TaskManagerOptions.NETWORK_MEMORY_FRACTION,
- getFraction(newNetworkSize, flinkMemorySize));
+ tuningConfig.addOverride(TaskManagerOptions.NETWORK_MEMORY_MIN,
newNetworkSize);
+ tuningConfig.addOverride(TaskManagerOptions.NETWORK_MEMORY_MAX,
newNetworkSize);
tuningConfig.addOverride(
TaskManagerOptions.JVM_OVERHEAD_FRACTION,
@@ -218,11 +233,48 @@ public class MemoryTuning {
}
}
- private static MemorySize adjustNetworkMemory(MemorySize usage,
MemoryBudget memBudget) {
- // TODO mxm: Follow-up to tune network memory via
- // https://issues.apache.org/jira/browse/FLINK-34471
- long networkBytes = memBudget.budget(usage.getBytes());
- return new MemorySize(networkBytes);
+ /* Calculate the maximum amount of memory for a TaskManager required by
all its subtask buffer pools.
+ *
+ * See
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/network_mem_tuning/#network-buffer-lifecycle
+ */
+ private static MemorySize adjustNetworkMemory(
+ JobTopology jobTopology,
+ Map<JobVertexID, Integer> updatedParallelisms,
+ Configuration config,
+ MemoryBudget memBudget) {
+
+ final long buffersPerChannel =
+
config.get(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL);
+ final long floatingBuffers =
+
config.get(NettyShuffleEnvironmentOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+ final long memorySegmentBytes =
+ config.get(TaskManagerOptions.MEMORY_SEGMENT_SIZE).getBytes();
+
+ long maxNetworkMemory = 0;
+ for (VertexInfo vertexInfo : jobTopology.getVertexInfos().values()) {
+ // Add max amount of memory for each input gate
+ for (JobVertexID input : vertexInfo.getInputs()) {
+ int inputParallelism = updatedParallelisms.get(input);
+ maxNetworkMemory +=
+ (inputParallelism * buffersPerChannel +
floatingBuffers)
+ * memorySegmentBytes;
+ }
+ // Add max amount of memory for each output gate
+ // Usually, there is just one output per task
+ for (JobVertexID output : vertexInfo.getOutputs()) {
+ int downstreamParallelism = updatedParallelisms.get(output);
+ maxNetworkMemory +=
+ (downstreamParallelism * buffersPerChannel +
floatingBuffers)
+ * memorySegmentBytes;
+ }
+ }
+
+ // Each task slot will potentially host all runtime subtasks if slot
sharing enabled.
+ // If slot sharing is disabled, this will use more memory than
necessary, we better
+ // overprovision slightly than failing with "Insufficient Network
buffers".
+ maxNetworkMemory *= config.get(TaskManagerOptions.NUM_TASK_SLOTS);
+
+ return new MemorySize(memBudget.budget(maxNetworkMemory));
}
private static MemorySize getUsage(
@@ -255,10 +307,11 @@ public class MemoryTuning {
}
private static float getFraction(MemorySize enumerator, MemorySize
denominator) {
- // Round to three decimal places
- return (float)
- (Math.round(enumerator.getBytes() / (double)
denominator.getBytes() * 1000)
- / 1000.);
+ // Round to three decimal places but make sure to round up values
+ // like 0.0002 to 0.001 instead of 0.0
+ return BigDecimal.valueOf(enumerator.getBytes() / (double)
denominator.getBytes())
+ .setScale(3, RoundingMode.CEILING)
+ .floatValue();
}
/** Format config such that it can be directly used as a Flink
configuration. */
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/ResourceCheckUtils.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/ResourceCheckUtils.java
index 18bf58c0..f8d3bade 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/ResourceCheckUtils.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/ResourceCheckUtils.java
@@ -46,7 +46,7 @@ public class ResourceCheckUtils {
}
}
- private static Map<JobVertexID, Integer> computeNewParallelisms(
+ public static Map<JobVertexID, Integer> computeNewParallelisms(
Map<JobVertexID, ScalingSummary> scalingSummaries,
Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>
vertexMetrics) {
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
index d5708e79..a44b0c17 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -174,7 +174,12 @@ public class MetricsCollectionAndEvaluationTest {
var evaluation = evaluator.evaluate(conf, collectedMetrics,
restartTime);
scalingExecutor.scaleResource(
- context, evaluation, new HashMap<>(), new ScalingTracking(),
clock.instant());
+ context,
+ evaluation,
+ new HashMap<>(),
+ new ScalingTracking(),
+ clock.instant(),
+ new JobTopology());
var scaledParallelism =
ScalingExecutorTest.getScaledParallelism(stateStore, context);
assertEquals(4, scaledParallelism.size());
@@ -387,7 +392,12 @@ public class MetricsCollectionAndEvaluationTest {
.getCurrent());
scalingExecutor.scaleResource(
- context, evaluation, new HashMap<>(), new ScalingTracking(),
clock.instant());
+ context,
+ evaluation,
+ new HashMap<>(),
+ new ScalingTracking(),
+ clock.instant(),
+ new JobTopology());
var scaledParallelism =
ScalingExecutorTest.getScaledParallelism(stateStore, context);
assertEquals(1, scaledParallelism.get(source1));
}
@@ -633,7 +643,12 @@ public class MetricsCollectionAndEvaluationTest {
.getCurrent());
scalingExecutor.scaleResource(
- context, evaluation, new HashMap<>(), new ScalingTracking(),
clock.instant());
+ context,
+ evaluation,
+ new HashMap<>(),
+ new ScalingTracking(),
+ clock.instant(),
+ new JobTopology());
var scaledParallelism =
ScalingExecutorTest.getScaledParallelism(stateStore, context);
assertEquals(1, scaledParallelism.get(source1));
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
index ef5d99f2..afb6773e 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
@@ -26,6 +26,8 @@ import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.autoscaler.resources.ResourceCheck;
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.VertexInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
@@ -43,6 +45,7 @@ import java.time.ZonedDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
import static
org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext;
@@ -166,12 +169,22 @@ public class ScalingExecutorTest {
var now = Instant.now();
assertFalse(
scalingDecisionExecutor.scaleResource(
- context, metrics, new HashMap<>(), new
ScalingTracking(), now));
+ context,
+ metrics,
+ new HashMap<>(),
+ new ScalingTracking(),
+ now,
+ new JobTopology()));
// filter operator should scale
conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, List.of());
assertTrue(
scalingDecisionExecutor.scaleResource(
- context, metrics, new HashMap<>(), new
ScalingTracking(), now));
+ context,
+ metrics,
+ new HashMap<>(),
+ new ScalingTracking(),
+ now,
+ new JobTopology()));
}
@Test
@@ -196,7 +209,12 @@ public class ScalingExecutorTest {
dummyGlobalMetrics);
assertFalse(
scalingDecisionExecutor.scaleResource(
- context, metrics, new HashMap<>(), new
ScalingTracking(), now));
+ context,
+ metrics,
+ new HashMap<>(),
+ new ScalingTracking(),
+ now,
+ new JobTopology()));
// scaling execution outside excluded periods
excludedPeriod =
new
StringBuilder(localTime.plusSeconds(100).toString().split("\\.")[0])
@@ -206,7 +224,12 @@ public class ScalingExecutorTest {
conf.set(AutoScalerOptions.EXCLUDED_PERIODS, List.of(excludedPeriod));
assertTrue(
scalingDecisionExecutor.scaleResource(
- context, metrics, new HashMap<>(), new
ScalingTracking(), now));
+ context,
+ metrics,
+ new HashMap<>(),
+ new ScalingTracking(),
+ now,
+ new JobTopology()));
}
@Test
@@ -230,7 +253,12 @@ public class ScalingExecutorTest {
// Would normally scale without resource usage check
assertTrue(
scalingDecisionExecutor.scaleResource(
- context, metrics, new HashMap<>(), new
ScalingTracking(), now));
+ context,
+ metrics,
+ new HashMap<>(),
+ new ScalingTracking(),
+ now,
+ new JobTopology()));
scalingDecisionExecutor =
new ScalingExecutor<>(
@@ -254,7 +282,8 @@ public class ScalingExecutorTest {
metrics,
new HashMap<>(),
new ScalingTracking(),
- now));
+ now,
+ new JobTopology()));
}
@Test
@@ -287,24 +316,36 @@ public class ScalingExecutorTest {
Map.of(source, evaluated(10, 100, 50, 0), sink, evaluated(10,
100, 50, 0));
var metrics = new EvaluatedMetrics(vertexMetrics, globalMetrics);
+ JobTopology jobTopology =
+ new JobTopology(
+ new VertexInfo(source, Set.of(), 10, 1000, false,
null),
+ new VertexInfo(sink, Set.of(source), 10, 1000, false,
null));
+
assertTrue(
scalingDecisionExecutor.scaleResource(
- context, metrics, new HashMap<>(), new
ScalingTracking(), now));
+ context,
+ metrics,
+ new HashMap<>(),
+ new ScalingTracking(),
+ now,
+ jobTopology));
assertThat(stateStore.getConfigChanges(context).getOverrides())
.containsExactlyInAnyOrderEntriesOf(
Map.of(
TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(),
- "0.561",
-
TaskManagerOptions.NETWORK_MEMORY_FRACTION.key(),
- "0.14",
+ "0.652",
+ TaskManagerOptions.NETWORK_MEMORY_MIN.key(),
+ "25 mb",
+ TaskManagerOptions.NETWORK_MEMORY_MAX.key(),
+ "25 mb",
TaskManagerOptions.JVM_METASPACE.key(),
"360 mb",
TaskManagerOptions.JVM_OVERHEAD_FRACTION.key(),
- "0.097",
+ "0.134",
TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key(),
"0 bytes",
TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(),
- "11114905646 bytes"));
+ "7681 mb"));
}
@ParameterizedTest
@@ -341,11 +382,21 @@ public class ScalingExecutorTest {
assertEquals(
scalingEnabled,
scalingDecisionExecutor.scaleResource(
- context, metrics, new HashMap<>(), new
ScalingTracking(), now));
+ context,
+ metrics,
+ new HashMap<>(),
+ new ScalingTracking(),
+ now,
+ new JobTopology()));
assertEquals(
scalingEnabled,
scalingDecisionExecutor.scaleResource(
- context, metrics, new HashMap<>(), new
ScalingTracking(), now));
+ context,
+ metrics,
+ new HashMap<>(),
+ new ScalingTracking(),
+ now,
+ new JobTopology()));
int expectedSize = (interval == null || interval.toMillis() > 0) &&
!scalingEnabled ? 1 : 2;
assertEquals(expectedSize, eventCollector.events.size());
@@ -379,7 +430,12 @@ public class ScalingExecutorTest {
assertEquals(
scalingEnabled,
scalingDecisionExecutor.scaleResource(
- context, metrics, new HashMap<>(), new
ScalingTracking(), now));
+ context,
+ metrics,
+ new HashMap<>(),
+ new ScalingTracking(),
+ now,
+ new JobTopology()));
var event2 = eventCollector.events.poll();
assertThat(event2).isNotNull();
assertThat(event2.getContext()).isSameAs(event.getContext());
@@ -407,7 +463,12 @@ public class ScalingExecutorTest {
// Baseline, no GC/Heap metrics
assertTrue(
scalingDecisionExecutor.scaleResource(
- context, metrics, new HashMap<>(), new
ScalingTracking(), Instant.now()));
+ context,
+ metrics,
+ new HashMap<>(),
+ new ScalingTracking(),
+ Instant.now(),
+ new JobTopology()));
// Just below the thresholds
metrics =
@@ -420,7 +481,12 @@ public class ScalingExecutorTest {
new EvaluatedScalingMetric(0.9, 0.79)));
assertTrue(
scalingDecisionExecutor.scaleResource(
- context, metrics, new HashMap<>(), new
ScalingTracking(), Instant.now()));
+ context,
+ metrics,
+ new HashMap<>(),
+ new ScalingTracking(),
+ Instant.now(),
+ new JobTopology()));
eventCollector.events.clear();
@@ -435,7 +501,12 @@ public class ScalingExecutorTest {
new EvaluatedScalingMetric(0.9, 0.79)));
assertFalse(
scalingDecisionExecutor.scaleResource(
- context, metrics, new HashMap<>(), new
ScalingTracking(), Instant.now()));
+ context,
+ metrics,
+ new HashMap<>(),
+ new ScalingTracking(),
+ Instant.now(),
+ new JobTopology()));
assertEquals("MemoryPressure",
eventCollector.events.poll().getReason());
assertTrue(eventCollector.events.isEmpty());
@@ -450,7 +521,12 @@ public class ScalingExecutorTest {
new EvaluatedScalingMetric(0.6, 0.81)));
assertFalse(
scalingDecisionExecutor.scaleResource(
- context, metrics, new HashMap<>(), new
ScalingTracking(), Instant.now()));
+ context,
+ metrics,
+ new HashMap<>(),
+ new ScalingTracking(),
+ Instant.now(),
+ new JobTopology()));
assertEquals("MemoryPressure",
eventCollector.events.poll().getReason());
assertTrue(eventCollector.events.isEmpty());
}
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/MemoryTuningTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/MemoryTuningTest.java
index 3e2583a1..2838e228 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/MemoryTuningTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/MemoryTuningTest.java
@@ -19,12 +19,15 @@ package org.apache.flink.autoscaler.utils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.ScalingSummary;
import org.apache.flink.autoscaler.TestingAutoscalerUtils;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.event.TestingEventCollector;
import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.VertexInfo;
import org.apache.flink.autoscaler.tuning.ConfigChanges;
import org.apache.flink.autoscaler.tuning.MemoryTuning;
import org.apache.flink.configuration.MemorySize;
@@ -36,6 +39,7 @@ import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
@@ -81,24 +85,37 @@ public class MemoryTuningTest {
var metrics = new EvaluatedMetrics(vertexMetrics, globalMetrics);
+ JobTopology jobTopology =
+ new JobTopology(
+ new VertexInfo(jobVertex1, Set.of(), 50, 1000, false,
null),
+ new VertexInfo(jobVertex2, Set.of(jobVertex1), 50,
1000, false, null));
+
+ Map<JobVertexID, ScalingSummary> scalingSummaries =
+ Map.of(
+ jobVertex1, new ScalingSummary(50, 25, Map.of()),
+ jobVertex2, new ScalingSummary(50, 10, Map.of()));
+
ConfigChanges configChanges =
- MemoryTuning.tuneTaskManagerHeapMemory(context, metrics,
eventHandler);
+ MemoryTuning.tuneTaskManagerHeapMemory(
+ context, metrics, jobTopology, scalingSummaries,
eventHandler);
// Test reducing overall memory
assertThat(configChanges.getOverrides())
.containsExactlyInAnyOrderEntriesOf(
Map.of(
TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(),
- "0.562",
-
TaskManagerOptions.NETWORK_MEMORY_FRACTION.key(),
- "0.14",
+ "0.654",
+ TaskManagerOptions.NETWORK_MEMORY_MIN.key(),
+ "13760 kb",
+ TaskManagerOptions.NETWORK_MEMORY_MAX.key(),
+ "13760 kb",
TaskManagerOptions.JVM_METASPACE.key(),
"120 mb",
TaskManagerOptions.JVM_OVERHEAD_FRACTION.key(),
- "0.099",
+ "0.139",
TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key(),
"0 bytes",
TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(),
- "10833048417 bytes"));
+ "7760130867 bytes"));
assertThat(configChanges.getRemovals())
.containsExactlyInAnyOrder(
@@ -116,18 +133,22 @@ public class MemoryTuningTest {
// Test maximize managed memory
config.set(AutoScalerOptions.MEMORY_TUNING_MAXIMIZE_MANAGED_MEMORY,
true);
- configChanges = MemoryTuning.tuneTaskManagerHeapMemory(context,
metrics, eventHandler);
+ configChanges =
+ MemoryTuning.tuneTaskManagerHeapMemory(
+ context, metrics, jobTopology, scalingSummaries,
eventHandler);
assertThat(configChanges.getOverrides())
.containsExactlyInAnyOrderEntriesOf(
Map.of(
TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(),
- "0.689",
-
TaskManagerOptions.NETWORK_MEMORY_FRACTION.key(),
- "0.1",
+ "0.789",
+ TaskManagerOptions.NETWORK_MEMORY_MIN.key(),
+ "13760 kb",
+ TaskManagerOptions.NETWORK_MEMORY_MAX.key(),
+ "13760 kb",
TaskManagerOptions.JVM_METASPACE.key(),
"120 mb",
TaskManagerOptions.JVM_OVERHEAD_FRACTION.key(),
- "0.033",
+ "0.034",
TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key(),
"0 bytes",
TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(),
@@ -137,27 +158,36 @@ public class MemoryTuningTest {
metrics = new EvaluatedMetrics(vertexMetrics, new
HashMap<>(globalMetrics));
metrics.getGlobalMetrics()
.put(ScalingMetric.MANAGED_MEMORY_USED,
EvaluatedScalingMetric.avg(0));
- configChanges = MemoryTuning.tuneTaskManagerHeapMemory(context,
metrics, eventHandler);
+ configChanges =
+ MemoryTuning.tuneTaskManagerHeapMemory(
+ context, metrics, jobTopology, scalingSummaries,
eventHandler);
assertThat(configChanges.getOverrides())
.containsExactlyInAnyOrderEntriesOf(
Map.of(
TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(),
"0.0",
-
TaskManagerOptions.NETWORK_MEMORY_FRACTION.key(),
- "0.32",
+ TaskManagerOptions.NETWORK_MEMORY_MIN.key(),
+ "13760 kb",
+ TaskManagerOptions.NETWORK_MEMORY_MAX.key(),
+ "13760 kb",
TaskManagerOptions.JVM_METASPACE.key(),
"120 mb",
TaskManagerOptions.JVM_OVERHEAD_FRACTION.key(),
- "0.099",
+ "0.139",
TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key(),
"0 bytes",
TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(),
- "10833048417 bytes"));
+ "7760130867 bytes"));
// Test tuning disabled
config.set(AutoScalerOptions.MEMORY_TUNING_ENABLED, false);
assertThat(
- MemoryTuning.tuneTaskManagerHeapMemory(context,
metrics, eventHandler)
+ MemoryTuning.tuneTaskManagerHeapMemory(
+ context,
+ metrics,
+ jobTopology,
+ scalingSummaries,
+ eventHandler)
.getOverrides())
.isEmpty();