This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 6a860651 [FLINK-34471] Tune network memory with memory tuning (#781)
6a860651 is described below

commit 6a8606517fb9f38f3a486c022f925368c9b4d9ae
Author: Maximilian Michels <[email protected]>
AuthorDate: Fri Feb 23 11:35:34 2024 +0100

    [FLINK-34471] Tune network memory with memory tuning (#781)
---
 .../apache/flink/autoscaler/JobAutoScalerImpl.java |   2 +-
 .../apache/flink/autoscaler/ScalingExecutor.java   |  10 +-
 .../flink/autoscaler/tuning/MemoryTuning.java      |  79 +++++++++++---
 .../flink/autoscaler/utils/ResourceCheckUtils.java |   2 +-
 .../MetricsCollectionAndEvaluationTest.java        |  21 +++-
 .../flink/autoscaler/ScalingExecutorTest.java      | 114 +++++++++++++++++----
 .../flink/autoscaler/utils/MemoryTuningTest.java   |  64 +++++++++---
 7 files changed, 236 insertions(+), 56 deletions(-)

diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
index 5bb7b791..cc56eb3e 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
@@ -216,7 +216,7 @@ public class JobAutoScalerImpl<KEY, Context extends 
JobAutoScalerContext<KEY>>
 
         var parallelismChanged =
                 scalingExecutor.scaleResource(
-                        ctx, evaluatedMetrics, scalingHistory, 
scalingTracking, now);
+                        ctx, evaluatedMetrics, scalingHistory, 
scalingTracking, now, jobTopology);
 
         if (parallelismChanged) {
             autoscalerMetrics.incrementScaling();
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
index 4ff9ae36..ef401ffe 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
@@ -26,6 +26,7 @@ import org.apache.flink.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.autoscaler.resources.NoopResourceCheck;
 import org.apache.flink.autoscaler.resources.ResourceCheck;
 import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.autoscaler.topology.JobTopology;
 import org.apache.flink.autoscaler.tuning.MemoryTuning;
 import org.apache.flink.autoscaler.utils.CalendarUtils;
 import org.apache.flink.autoscaler.utils.ResourceCheckUtils;
@@ -94,7 +95,8 @@ public class ScalingExecutor<KEY, Context extends 
JobAutoScalerContext<KEY>> {
             EvaluatedMetrics evaluatedMetrics,
             Map<JobVertexID, SortedMap<Instant, ScalingSummary>> 
scalingHistory,
             ScalingTracking scalingTracking,
-            Instant now)
+            Instant now,
+            JobTopology jobTopology)
             throws Exception {
         var conf = context.getConfiguration();
         var restartTime = scalingTracking.getMaxRestartTimeOrDefault(conf);
@@ -120,7 +122,11 @@ public class ScalingExecutor<KEY, Context extends 
JobAutoScalerContext<KEY>> {
 
         var configOverrides =
                 MemoryTuning.tuneTaskManagerHeapMemory(
-                        context, evaluatedMetrics, autoScalerEventHandler);
+                        context,
+                        evaluatedMetrics,
+                        jobTopology,
+                        scalingSummaries,
+                        autoScalerEventHandler);
 
         if (scalingWouldExceedClusterResources(
                 configOverrides.applyOverrides(conf),
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java
index 8ca0dcc3..af5fd8a1 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java
@@ -18,16 +18,22 @@
 package org.apache.flink.autoscaler.tuning;
 
 import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.ScalingSummary;
 import org.apache.flink.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
 import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
 import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
 import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.VertexInfo;
+import org.apache.flink.autoscaler.utils.ResourceCheckUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.util.config.memory.CommonProcessMemorySpec;
 import 
org.apache.flink.runtime.util.config.memory.JvmMetaspaceAndOverheadOptions;
 import org.apache.flink.runtime.util.config.memory.ProcessMemoryOptions;
@@ -38,6 +44,8 @@ import 
org.apache.flink.runtime.util.config.memory.taskmanager.TaskExecutorFlink
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.math.BigDecimal;
+import java.math.RoundingMode;
 import java.util.Arrays;
 import java.util.Map;
 
@@ -63,6 +71,8 @@ public class MemoryTuning {
     public static ConfigChanges tuneTaskManagerHeapMemory(
             JobAutoScalerContext<?> context,
             EvaluatedMetrics evaluatedMetrics,
+            JobTopology jobTopology,
+            Map<JobVertexID, ScalingSummary> scalingSummaries,
             AutoScalerEventHandler eventHandler) {
 
         // Please note that this config is the original configuration created 
from the user spec.
@@ -103,7 +113,13 @@ public class MemoryTuning {
         var globalMetrics = evaluatedMetrics.getGlobalMetrics();
         // The order matters in case the memory usage is higher than the 
maximum available memory.
         // Managed memory comes last because it can grow arbitrary for RocksDB 
jobs.
-        MemorySize newNetworkSize = adjustNetworkMemory(specNetworkSize, 
memBudget);
+        MemorySize newNetworkSize =
+                adjustNetworkMemory(
+                        jobTopology,
+                        ResourceCheckUtils.computeNewParallelisms(
+                                scalingSummaries, 
evaluatedMetrics.getVertexMetrics()),
+                        config,
+                        memBudget);
         MemorySize newHeapSize =
                 determineNewSize(getUsage(HEAP_MEMORY_USED, globalMetrics), 
config, memBudget);
         MemorySize newMetaspaceSize =
@@ -159,9 +175,8 @@ public class MemoryTuning {
                 getFraction(newManagedSize, flinkMemorySize));
         tuningConfig.addRemoval(TaskManagerOptions.MANAGED_MEMORY_SIZE);
 
-        tuningConfig.addOverride(
-                TaskManagerOptions.NETWORK_MEMORY_FRACTION,
-                getFraction(newNetworkSize, flinkMemorySize));
+        tuningConfig.addOverride(TaskManagerOptions.NETWORK_MEMORY_MIN, 
newNetworkSize);
+        tuningConfig.addOverride(TaskManagerOptions.NETWORK_MEMORY_MAX, 
newNetworkSize);
 
         tuningConfig.addOverride(
                 TaskManagerOptions.JVM_OVERHEAD_FRACTION,
@@ -218,11 +233,48 @@ public class MemoryTuning {
         }
     }
 
-    private static MemorySize adjustNetworkMemory(MemorySize usage, 
MemoryBudget memBudget) {
-        // TODO mxm: Follow-up to tune network memory via
-        // https://issues.apache.org/jira/browse/FLINK-34471
-        long networkBytes = memBudget.budget(usage.getBytes());
-        return new MemorySize(networkBytes);
+    /* Calculate the maximum amount of memory for a TaskManager required by 
all its subtask buffer pools.
+     *
+     * See 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/network_mem_tuning/#network-buffer-lifecycle
+     */
+    private static MemorySize adjustNetworkMemory(
+            JobTopology jobTopology,
+            Map<JobVertexID, Integer> updatedParallelisms,
+            Configuration config,
+            MemoryBudget memBudget) {
+
+        final long buffersPerChannel =
+                
config.get(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL);
+        final long floatingBuffers =
+                
config.get(NettyShuffleEnvironmentOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+        final long memorySegmentBytes =
+                config.get(TaskManagerOptions.MEMORY_SEGMENT_SIZE).getBytes();
+
+        long maxNetworkMemory = 0;
+        for (VertexInfo vertexInfo : jobTopology.getVertexInfos().values()) {
+            // Add max amount of memory for each input gate
+            for (JobVertexID input : vertexInfo.getInputs()) {
+                int inputParallelism = updatedParallelisms.get(input);
+                maxNetworkMemory +=
+                        (inputParallelism * buffersPerChannel + 
floatingBuffers)
+                                * memorySegmentBytes;
+            }
+            // Add max amount of memory for each output gate
+            // Usually, there is just one output per task
+            for (JobVertexID output : vertexInfo.getOutputs()) {
+                int downstreamParallelism = updatedParallelisms.get(output);
+                maxNetworkMemory +=
+                        (downstreamParallelism * buffersPerChannel + 
floatingBuffers)
+                                * memorySegmentBytes;
+            }
+        }
+
+        // Each task slot will potentially host all runtime subtasks if slot 
sharing enabled.
+        // If slot sharing is disabled, this will use more memory than 
necessary, we better
+        // overprovision slightly than failing with "Insufficient Network 
buffers".
+        maxNetworkMemory *= config.get(TaskManagerOptions.NUM_TASK_SLOTS);
+
+        return new MemorySize(memBudget.budget(maxNetworkMemory));
     }
 
     private static MemorySize getUsage(
@@ -255,10 +307,11 @@ public class MemoryTuning {
     }
 
     private static float getFraction(MemorySize enumerator, MemorySize 
denominator) {
-        // Round to three decimal places
-        return (float)
-                (Math.round(enumerator.getBytes() / (double) 
denominator.getBytes() * 1000)
-                        / 1000.);
+        // Round to three decimal places but make sure to round up values
+        // like 0.0002 to 0.001 instead of 0.0
+        return BigDecimal.valueOf(enumerator.getBytes() / (double) 
denominator.getBytes())
+                .setScale(3, RoundingMode.CEILING)
+                .floatValue();
     }
 
     /** Format config such that it can be directly used as a Flink 
configuration. */
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/ResourceCheckUtils.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/ResourceCheckUtils.java
index 18bf58c0..f8d3bade 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/ResourceCheckUtils.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/ResourceCheckUtils.java
@@ -46,7 +46,7 @@ public class ResourceCheckUtils {
         }
     }
 
-    private static Map<JobVertexID, Integer> computeNewParallelisms(
+    public static Map<JobVertexID, Integer> computeNewParallelisms(
             Map<JobVertexID, ScalingSummary> scalingSummaries,
             Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> 
vertexMetrics) {
 
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
index d5708e79..a44b0c17 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -174,7 +174,12 @@ public class MetricsCollectionAndEvaluationTest {
 
         var evaluation = evaluator.evaluate(conf, collectedMetrics, 
restartTime);
         scalingExecutor.scaleResource(
-                context, evaluation, new HashMap<>(), new ScalingTracking(), 
clock.instant());
+                context,
+                evaluation,
+                new HashMap<>(),
+                new ScalingTracking(),
+                clock.instant(),
+                new JobTopology());
 
         var scaledParallelism = 
ScalingExecutorTest.getScaledParallelism(stateStore, context);
         assertEquals(4, scaledParallelism.size());
@@ -387,7 +392,12 @@ public class MetricsCollectionAndEvaluationTest {
                         .getCurrent());
 
         scalingExecutor.scaleResource(
-                context, evaluation, new HashMap<>(), new ScalingTracking(), 
clock.instant());
+                context,
+                evaluation,
+                new HashMap<>(),
+                new ScalingTracking(),
+                clock.instant(),
+                new JobTopology());
         var scaledParallelism = 
ScalingExecutorTest.getScaledParallelism(stateStore, context);
         assertEquals(1, scaledParallelism.get(source1));
     }
@@ -633,7 +643,12 @@ public class MetricsCollectionAndEvaluationTest {
                         .getCurrent());
 
         scalingExecutor.scaleResource(
-                context, evaluation, new HashMap<>(), new ScalingTracking(), 
clock.instant());
+                context,
+                evaluation,
+                new HashMap<>(),
+                new ScalingTracking(),
+                clock.instant(),
+                new JobTopology());
         var scaledParallelism = 
ScalingExecutorTest.getScaledParallelism(stateStore, context);
         assertEquals(1, scaledParallelism.get(source1));
 
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
index ef5d99f2..afb6773e 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
@@ -26,6 +26,8 @@ import org.apache.flink.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.autoscaler.resources.ResourceCheck;
 import org.apache.flink.autoscaler.state.AutoScalerStateStore;
 import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.VertexInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -43,6 +45,7 @@ import java.time.ZonedDateTime;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import static 
org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext;
@@ -166,12 +169,22 @@ public class ScalingExecutorTest {
         var now = Instant.now();
         assertFalse(
                 scalingDecisionExecutor.scaleResource(
-                        context, metrics, new HashMap<>(), new 
ScalingTracking(), now));
+                        context,
+                        metrics,
+                        new HashMap<>(),
+                        new ScalingTracking(),
+                        now,
+                        new JobTopology()));
         // filter operator should scale
         conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, List.of());
         assertTrue(
                 scalingDecisionExecutor.scaleResource(
-                        context, metrics, new HashMap<>(), new 
ScalingTracking(), now));
+                        context,
+                        metrics,
+                        new HashMap<>(),
+                        new ScalingTracking(),
+                        now,
+                        new JobTopology()));
     }
 
     @Test
@@ -196,7 +209,12 @@ public class ScalingExecutorTest {
                         dummyGlobalMetrics);
         assertFalse(
                 scalingDecisionExecutor.scaleResource(
-                        context, metrics, new HashMap<>(), new 
ScalingTracking(), now));
+                        context,
+                        metrics,
+                        new HashMap<>(),
+                        new ScalingTracking(),
+                        now,
+                        new JobTopology()));
         // scaling execution outside excluded periods
         excludedPeriod =
                 new 
StringBuilder(localTime.plusSeconds(100).toString().split("\\.")[0])
@@ -206,7 +224,12 @@ public class ScalingExecutorTest {
         conf.set(AutoScalerOptions.EXCLUDED_PERIODS, List.of(excludedPeriod));
         assertTrue(
                 scalingDecisionExecutor.scaleResource(
-                        context, metrics, new HashMap<>(), new 
ScalingTracking(), now));
+                        context,
+                        metrics,
+                        new HashMap<>(),
+                        new ScalingTracking(),
+                        now,
+                        new JobTopology()));
     }
 
     @Test
@@ -230,7 +253,12 @@ public class ScalingExecutorTest {
         // Would normally scale without resource usage check
         assertTrue(
                 scalingDecisionExecutor.scaleResource(
-                        context, metrics, new HashMap<>(), new 
ScalingTracking(), now));
+                        context,
+                        metrics,
+                        new HashMap<>(),
+                        new ScalingTracking(),
+                        now,
+                        new JobTopology()));
 
         scalingDecisionExecutor =
                 new ScalingExecutor<>(
@@ -254,7 +282,8 @@ public class ScalingExecutorTest {
                         metrics,
                         new HashMap<>(),
                         new ScalingTracking(),
-                        now));
+                        now,
+                        new JobTopology()));
     }
 
     @Test
@@ -287,24 +316,36 @@ public class ScalingExecutorTest {
                 Map.of(source, evaluated(10, 100, 50, 0), sink, evaluated(10, 
100, 50, 0));
         var metrics = new EvaluatedMetrics(vertexMetrics, globalMetrics);
 
+        JobTopology jobTopology =
+                new JobTopology(
+                        new VertexInfo(source, Set.of(), 10, 1000, false, 
null),
+                        new VertexInfo(sink, Set.of(source), 10, 1000, false, 
null));
+
         assertTrue(
                 scalingDecisionExecutor.scaleResource(
-                        context, metrics, new HashMap<>(), new 
ScalingTracking(), now));
+                        context,
+                        metrics,
+                        new HashMap<>(),
+                        new ScalingTracking(),
+                        now,
+                        jobTopology));
         assertThat(stateStore.getConfigChanges(context).getOverrides())
                 .containsExactlyInAnyOrderEntriesOf(
                         Map.of(
                                 
TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(),
-                                "0.561",
-                                
TaskManagerOptions.NETWORK_MEMORY_FRACTION.key(),
-                                "0.14",
+                                "0.652",
+                                TaskManagerOptions.NETWORK_MEMORY_MIN.key(),
+                                "25 mb",
+                                TaskManagerOptions.NETWORK_MEMORY_MAX.key(),
+                                "25 mb",
                                 TaskManagerOptions.JVM_METASPACE.key(),
                                 "360 mb",
                                 TaskManagerOptions.JVM_OVERHEAD_FRACTION.key(),
-                                "0.097",
+                                "0.134",
                                 TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key(),
                                 "0 bytes",
                                 TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(),
-                                "11114905646 bytes"));
+                                "7681 mb"));
     }
 
     @ParameterizedTest
@@ -341,11 +382,21 @@ public class ScalingExecutorTest {
         assertEquals(
                 scalingEnabled,
                 scalingDecisionExecutor.scaleResource(
-                        context, metrics, new HashMap<>(), new 
ScalingTracking(), now));
+                        context,
+                        metrics,
+                        new HashMap<>(),
+                        new ScalingTracking(),
+                        now,
+                        new JobTopology()));
         assertEquals(
                 scalingEnabled,
                 scalingDecisionExecutor.scaleResource(
-                        context, metrics, new HashMap<>(), new 
ScalingTracking(), now));
+                        context,
+                        metrics,
+                        new HashMap<>(),
+                        new ScalingTracking(),
+                        now,
+                        new JobTopology()));
         int expectedSize = (interval == null || interval.toMillis() > 0) && 
!scalingEnabled ? 1 : 2;
         assertEquals(expectedSize, eventCollector.events.size());
 
@@ -379,7 +430,12 @@ public class ScalingExecutorTest {
         assertEquals(
                 scalingEnabled,
                 scalingDecisionExecutor.scaleResource(
-                        context, metrics, new HashMap<>(), new 
ScalingTracking(), now));
+                        context,
+                        metrics,
+                        new HashMap<>(),
+                        new ScalingTracking(),
+                        now,
+                        new JobTopology()));
         var event2 = eventCollector.events.poll();
         assertThat(event2).isNotNull();
         assertThat(event2.getContext()).isSameAs(event.getContext());
@@ -407,7 +463,12 @@ public class ScalingExecutorTest {
         // Baseline, no GC/Heap metrics
         assertTrue(
                 scalingDecisionExecutor.scaleResource(
-                        context, metrics, new HashMap<>(), new 
ScalingTracking(), Instant.now()));
+                        context,
+                        metrics,
+                        new HashMap<>(),
+                        new ScalingTracking(),
+                        Instant.now(),
+                        new JobTopology()));
 
         // Just below the thresholds
         metrics =
@@ -420,7 +481,12 @@ public class ScalingExecutorTest {
                                 new EvaluatedScalingMetric(0.9, 0.79)));
         assertTrue(
                 scalingDecisionExecutor.scaleResource(
-                        context, metrics, new HashMap<>(), new 
ScalingTracking(), Instant.now()));
+                        context,
+                        metrics,
+                        new HashMap<>(),
+                        new ScalingTracking(),
+                        Instant.now(),
+                        new JobTopology()));
 
         eventCollector.events.clear();
 
@@ -435,7 +501,12 @@ public class ScalingExecutorTest {
                                 new EvaluatedScalingMetric(0.9, 0.79)));
         assertFalse(
                 scalingDecisionExecutor.scaleResource(
-                        context, metrics, new HashMap<>(), new 
ScalingTracking(), Instant.now()));
+                        context,
+                        metrics,
+                        new HashMap<>(),
+                        new ScalingTracking(),
+                        Instant.now(),
+                        new JobTopology()));
         assertEquals("MemoryPressure", 
eventCollector.events.poll().getReason());
         assertTrue(eventCollector.events.isEmpty());
 
@@ -450,7 +521,12 @@ public class ScalingExecutorTest {
                                 new EvaluatedScalingMetric(0.6, 0.81)));
         assertFalse(
                 scalingDecisionExecutor.scaleResource(
-                        context, metrics, new HashMap<>(), new 
ScalingTracking(), Instant.now()));
+                        context,
+                        metrics,
+                        new HashMap<>(),
+                        new ScalingTracking(),
+                        Instant.now(),
+                        new JobTopology()));
         assertEquals("MemoryPressure", 
eventCollector.events.poll().getReason());
         assertTrue(eventCollector.events.isEmpty());
     }
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/MemoryTuningTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/MemoryTuningTest.java
index 3e2583a1..2838e228 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/MemoryTuningTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/MemoryTuningTest.java
@@ -19,12 +19,15 @@ package org.apache.flink.autoscaler.utils;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.ScalingSummary;
 import org.apache.flink.autoscaler.TestingAutoscalerUtils;
 import org.apache.flink.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.autoscaler.event.TestingEventCollector;
 import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
 import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
 import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.VertexInfo;
 import org.apache.flink.autoscaler.tuning.ConfigChanges;
 import org.apache.flink.autoscaler.tuning.MemoryTuning;
 import org.apache.flink.configuration.MemorySize;
@@ -36,6 +39,7 @@ import org.junit.jupiter.api.Test;
 import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
 import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
 
@@ -81,24 +85,37 @@ public class MemoryTuningTest {
 
         var metrics = new EvaluatedMetrics(vertexMetrics, globalMetrics);
 
+        JobTopology jobTopology =
+                new JobTopology(
+                        new VertexInfo(jobVertex1, Set.of(), 50, 1000, false, 
null),
+                        new VertexInfo(jobVertex2, Set.of(jobVertex1), 50, 
1000, false, null));
+
+        Map<JobVertexID, ScalingSummary> scalingSummaries =
+                Map.of(
+                        jobVertex1, new ScalingSummary(50, 25, Map.of()),
+                        jobVertex2, new ScalingSummary(50, 10, Map.of()));
+
         ConfigChanges configChanges =
-                MemoryTuning.tuneTaskManagerHeapMemory(context, metrics, 
eventHandler);
+                MemoryTuning.tuneTaskManagerHeapMemory(
+                        context, metrics, jobTopology, scalingSummaries, 
eventHandler);
         // Test reducing overall memory
         assertThat(configChanges.getOverrides())
                 .containsExactlyInAnyOrderEntriesOf(
                         Map.of(
                                 
TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(),
-                                "0.562",
-                                
TaskManagerOptions.NETWORK_MEMORY_FRACTION.key(),
-                                "0.14",
+                                "0.654",
+                                TaskManagerOptions.NETWORK_MEMORY_MIN.key(),
+                                "13760 kb",
+                                TaskManagerOptions.NETWORK_MEMORY_MAX.key(),
+                                "13760 kb",
                                 TaskManagerOptions.JVM_METASPACE.key(),
                                 "120 mb",
                                 TaskManagerOptions.JVM_OVERHEAD_FRACTION.key(),
-                                "0.099",
+                                "0.139",
                                 TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key(),
                                 "0 bytes",
                                 TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(),
-                                "10833048417 bytes"));
+                                "7760130867 bytes"));
 
         assertThat(configChanges.getRemovals())
                 .containsExactlyInAnyOrder(
@@ -116,18 +133,22 @@ public class MemoryTuningTest {
 
         // Test maximize managed memory
         config.set(AutoScalerOptions.MEMORY_TUNING_MAXIMIZE_MANAGED_MEMORY, 
true);
-        configChanges = MemoryTuning.tuneTaskManagerHeapMemory(context, 
metrics, eventHandler);
+        configChanges =
+                MemoryTuning.tuneTaskManagerHeapMemory(
+                        context, metrics, jobTopology, scalingSummaries, 
eventHandler);
         assertThat(configChanges.getOverrides())
                 .containsExactlyInAnyOrderEntriesOf(
                         Map.of(
                                 
TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(),
-                                "0.689",
-                                
TaskManagerOptions.NETWORK_MEMORY_FRACTION.key(),
-                                "0.1",
+                                "0.789",
+                                TaskManagerOptions.NETWORK_MEMORY_MIN.key(),
+                                "13760 kb",
+                                TaskManagerOptions.NETWORK_MEMORY_MAX.key(),
+                                "13760 kb",
                                 TaskManagerOptions.JVM_METASPACE.key(),
                                 "120 mb",
                                 TaskManagerOptions.JVM_OVERHEAD_FRACTION.key(),
-                                "0.033",
+                                "0.034",
                                 TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key(),
                                 "0 bytes",
                                 TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(),
@@ -137,27 +158,36 @@ public class MemoryTuningTest {
         metrics = new EvaluatedMetrics(vertexMetrics, new 
HashMap<>(globalMetrics));
         metrics.getGlobalMetrics()
                 .put(ScalingMetric.MANAGED_MEMORY_USED, 
EvaluatedScalingMetric.avg(0));
-        configChanges = MemoryTuning.tuneTaskManagerHeapMemory(context, 
metrics, eventHandler);
+        configChanges =
+                MemoryTuning.tuneTaskManagerHeapMemory(
+                        context, metrics, jobTopology, scalingSummaries, 
eventHandler);
         assertThat(configChanges.getOverrides())
                 .containsExactlyInAnyOrderEntriesOf(
                         Map.of(
                                 
TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(),
                                 "0.0",
-                                
TaskManagerOptions.NETWORK_MEMORY_FRACTION.key(),
-                                "0.32",
+                                TaskManagerOptions.NETWORK_MEMORY_MIN.key(),
+                                "13760 kb",
+                                TaskManagerOptions.NETWORK_MEMORY_MAX.key(),
+                                "13760 kb",
                                 TaskManagerOptions.JVM_METASPACE.key(),
                                 "120 mb",
                                 TaskManagerOptions.JVM_OVERHEAD_FRACTION.key(),
-                                "0.099",
+                                "0.139",
                                 TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key(),
                                 "0 bytes",
                                 TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(),
-                                "10833048417 bytes"));
+                                "7760130867 bytes"));
 
         // Test tuning disabled
         config.set(AutoScalerOptions.MEMORY_TUNING_ENABLED, false);
         assertThat(
-                        MemoryTuning.tuneTaskManagerHeapMemory(context, 
metrics, eventHandler)
+                        MemoryTuning.tuneTaskManagerHeapMemory(
+                                        context,
+                                        metrics,
+                                        jobTopology,
+                                        scalingSummaries,
+                                        eventHandler)
                                 .getOverrides())
                 .isEmpty();
 

Reply via email to