1996fanrui commented on code in PR #762:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/762#discussion_r1464567468


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/MemoryTuningUtils.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.utils;
+
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** Tunes the TaskManager memory. */
+public class MemoryTuningUtils {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MemoryTuningUtils.class);
+
+    public static Optional<MemorySize> tuneTaskManagerHeapMemory(
+            JobAutoScalerContext<?> context,
+            EvaluatedMetrics evaluatedMetrics,
+            Map<JobVertexID, ScalingSummary> scalingSummaries) {
+
+        var config = context.getConfiguration();
+        if (!config.get(AutoScalerOptions.MEMORY_TUNING_ENABLED)) {
+            return Optional.empty();
+        }
+
+        var globalMetrics = evaluatedMetrics.getGlobalMetrics();
+        double avgHeapSize = 
globalMetrics.get(ScalingMetric.HEAP_AVERAGE_SIZE).getAverage();
+
+        double numTaskSlotsUsed = 
globalMetrics.get(ScalingMetric.NUM_TASK_SLOTS_USED).getCurrent();
+        int taskSlotsPerTm = config.get(TaskManagerOptions.NUM_TASK_SLOTS);
+        int currentNumTMs = (int) Math.ceil(numTaskSlotsUsed / taskSlotsPerTm);
+
+        double usedTotalHeapSize = currentNumTMs * avgHeapSize;
+        LOG.info("Total used heap size: {}", new MemorySize((long) 
usedTotalHeapSize));
+        usedTotalHeapSize *= computeDataChangeRate(evaluatedMetrics);
+        LOG.info("Resized total heap size: {}", new MemorySize((long) 
usedTotalHeapSize));
+
+        int numTaskSlotsAfterRescale =
+                ResourceCheckUtils.estimateNumTaskSlotsAfterRescale(
+                        evaluatedMetrics, scalingSummaries, numTaskSlotsUsed);
+        int newNumTms = (int) Math.ceil(numTaskSlotsAfterRescale / (double) 
taskSlotsPerTm);
+        LOG.info(
+                "Estimating {} task slots in use after rescale, spread across 
{} TaskManagers",
+                numTaskSlotsAfterRescale,
+                newNumTms);
+
+        MemorySize newHeapSize = new MemorySize((long) (usedTotalHeapSize / 
newNumTms));
+        // TM container memory can never grow beyond the user-specified max
+        Optional<MemorySize> maxMemory = 
context.getTaskManagerMemoryFromSpec();

Review Comment:
   From the code, I understand that "Adjust memory" will only adjust the memory 
down to below the user-specified memory, but not above the user-specified 
memory, right?
   
   I'm not sure could we make this option name as generic as possible? Such as: 
`job.autoscaler.taskmanager-memory.scaling.enabled` and 
`job.autoscaler.taskmanager-memory.scaling.heap.min`.
   
   In the short term, we only turn down. In the long term, we might support 
turn up the memory. If the option name is more generic, it's easy to 
compatible. WDYT?



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/MemoryTuningUtils.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.utils;
+
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** Tunes the TaskManager memory. */
+public class MemoryTuningUtils {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MemoryTuningUtils.class);
+
+    public static Optional<MemorySize> tuneTaskManagerHeapMemory(
+            JobAutoScalerContext<?> context,
+            EvaluatedMetrics evaluatedMetrics,
+            Map<JobVertexID, ScalingSummary> scalingSummaries) {
+
+        var config = context.getConfiguration();
+        if (!config.get(AutoScalerOptions.MEMORY_TUNING_ENABLED)) {
+            return Optional.empty();
+        }
+
+        var globalMetrics = evaluatedMetrics.getGlobalMetrics();
+        double avgHeapSize = 
globalMetrics.get(ScalingMetric.HEAP_AVERAGE_SIZE).getAverage();
+
+        double numTaskSlotsUsed = 
globalMetrics.get(ScalingMetric.NUM_TASK_SLOTS_USED).getCurrent();
+        int taskSlotsPerTm = config.get(TaskManagerOptions.NUM_TASK_SLOTS);
+        int currentNumTMs = (int) Math.ceil(numTaskSlotsUsed / taskSlotsPerTm);
+
+        double usedTotalHeapSize = currentNumTMs * avgHeapSize;
+        LOG.info("Total used heap size: {}", new MemorySize((long) 
usedTotalHeapSize));
+        usedTotalHeapSize *= computeDataChangeRate(evaluatedMetrics);
+        LOG.info("Resized total heap size: {}", new MemorySize((long) 
usedTotalHeapSize));
+
+        int numTaskSlotsAfterRescale =
+                ResourceCheckUtils.estimateNumTaskSlotsAfterRescale(
+                        evaluatedMetrics, scalingSummaries, numTaskSlotsUsed);
+        int newNumTms = (int) Math.ceil(numTaskSlotsAfterRescale / (double) 
taskSlotsPerTm);
+        LOG.info(
+                "Estimating {} task slots in use after rescale, spread across 
{} TaskManagers",
+                numTaskSlotsAfterRescale,
+                newNumTms);
+
+        MemorySize newHeapSize = new MemorySize((long) (usedTotalHeapSize / 
newNumTms));
+        // TM container memory can never grow beyond the user-specified max
+        Optional<MemorySize> maxMemory = 
context.getTaskManagerMemoryFromSpec();
+        if (maxMemory.isEmpty()) {
+            return Optional.empty();
+        }
+        // Apply limits
+        newHeapSize =
+                new MemorySize(
+                        Math.min(
+                                maxMemory.get().getBytes(),
+                                Math.max(
+                                        
config.get(AutoScalerOptions.MEMORY_TUNING_MIN_HEAP)
+                                                .getBytes(),
+                                        newHeapSize.getBytes())));
+        LOG.info("Calculated new TaskManager heap memory {}", 
newHeapSize.toHumanReadableString());
+
+        return Optional.of(newHeapSize);
+    }
+
+    /**
+     * Calculate the data change rate across the entire DAG. To add headroom, 
we use the
+     * EXPECTED_PROCESSING_RATE which is the max processing capacity after 
scaleup. We use the
+     * current known processing rate as the normalization factor.
+     */
+    private static double computeDataChangeRate(EvaluatedMetrics 
evaluatedMetrics) {
+        double totalCurrentProcessingRate = 0;
+        double targetedTotalProcessingRate = 0;
+        for (Map<ScalingMetric, EvaluatedScalingMetric> entry :
+                evaluatedMetrics.getVertexMetrics().values()) {
+            totalCurrentProcessingRate +=
+                    
entry.get(ScalingMetric.CURRENT_PROCESSING_RATE).getAverage();
+            targetedTotalProcessingRate +=
+                    
entry.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent();
+        }
+        return targetedTotalProcessingRate / totalCurrentProcessingRate;
+    }
+
+    public static MemorySize adjustTotalTmMemory(
+            JobAutoScalerContext<?> ctx,
+            MemorySize newHeapSize,
+            EvaluatedMetrics evaluatedMetrics) {
+        var totalTaskManagerMemory = 
ctx.getTaskManagerMemory().orElse(MemorySize.ZERO);
+        if (totalTaskManagerMemory.compareTo(MemorySize.ZERO) <= 0) {
+            return MemorySize.ZERO;
+        }
+        if (newHeapSize.compareTo(MemorySize.ZERO) <= 0) {
+            return totalTaskManagerMemory;
+        }
+
+        var currentMax =
+                
evaluatedMetrics.getGlobalMetrics().get(ScalingMetric.HEAP_MAX_SIZE).getCurrent();
+        if (!Double.isFinite(currentMax)) {

Review Comment:
   ```suggestion
           if (Double.isInfinite(currentMax)) {
   ```
   
   nit: it's easy to understand.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContext.java:
##########
@@ -56,19 +57,47 @@ public KubernetesJobAutoScalerContext(
                 jobStatus,
                 configuration,
                 metricGroup,
-                
Optional.ofNullable(configuration.get(KubernetesConfigOptions.TASK_MANAGER_CPU))
-                        .orElse(0.),
-                
Optional.ofNullable(configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY))
-                        .orElse(MemorySize.ZERO),
                 restClientSupplier);
         this.resourceContext = resourceContext;
     }
 
+    @Override
+    public Optional<Double> getTaskManagerCpu() {
+        return Optional.ofNullable(
+                
getConfiguration().get(KubernetesConfigOptions.TASK_MANAGER_CPU));
+    }
+
+    @Override
+    public Optional<MemorySize> getTaskManagerMemory() {
+        return 
Optional.ofNullable(getConfiguration().get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));

Review Comment:
   It's a unify option for all flink clusters(k8s, yarn, etc), could we move it 
in the `JobAutoScalerContext` class?



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java:
##########
@@ -69,6 +70,13 @@ void storeParallelismOverrides(Context jobContext, 
Map<String, String> paralleli
 
     void removeParallelismOverrides(Context jobContext) throws Exception;
 
+    void storeConfigOverrides(Context jobContext, Configuration 
configOverrides) throws Exception;

Review Comment:
   Could we rename it to `config` instead of `configOverrides`?
   
   I guess we might store some other configs into it even if it isn't overrides 
related. WDYT?
   
   



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##########
@@ -151,7 +154,18 @@ protected void applyParallelismOverrides(Context ctx) 
throws Exception {
                         userOverrides.put(k, v);
                     }
                 });
-        scalingRealizer.realize(ctx, userOverrides);
+        scalingRealizer.realizeParallelismOverrides(ctx, userOverrides);
+    }
+
+    @VisibleForTesting
+    void applyConfigOverrides(Context ctx) throws Exception {
+        Configuration configOverrides = stateStore.getConfigOverrides(ctx);
+        var tmHeapOverride = 
configOverrides.get(TaskManagerOptions.TASK_HEAP_MEMORY);
+
+        if (tmHeapOverride != null) {
+            LOG.info("Applying heap memory override: {}", tmHeapOverride);
+            scalingRealizer.realizeMemoryOverrides(ctx, tmHeapOverride);

Review Comment:
   The second parameter of `realizeMemoryOverrides` is  
`taskManagerMemoryOverride`, does it seem like tm total memory?



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContext.java:
##########
@@ -56,19 +57,47 @@ public KubernetesJobAutoScalerContext(
                 jobStatus,
                 configuration,
                 metricGroup,
-                
Optional.ofNullable(configuration.get(KubernetesConfigOptions.TASK_MANAGER_CPU))
-                        .orElse(0.),
-                
Optional.ofNullable(configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY))
-                        .orElse(MemorySize.ZERO),
                 restClientSupplier);
         this.resourceContext = resourceContext;
     }
 
+    @Override
+    public Optional<Double> getTaskManagerCpu() {
+        return Optional.ofNullable(
+                
getConfiguration().get(KubernetesConfigOptions.TASK_MANAGER_CPU));
+    }
+
+    @Override
+    public Optional<MemorySize> getTaskManagerMemory() {
+        return 
Optional.ofNullable(getConfiguration().get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+    }
+
+    @Override
+    public Optional<MemorySize> getTaskManagerMemoryFromSpec() {
+        return getJobDeployment()
+                .map(
+                        flinkDeployment ->
+                                MemorySize.parse(
+                                        flinkDeployment
+                                                .getSpec()
+                                                .getTaskManager()
+                                                .getResource()
+                                                .getMemory()));

Review Comment:
   I'm not familiar with kubernetes. I have a question here:
   
   Is it always the original memory size? Or it will be changed if we 
`realizeMemoryOverrides`?
   
   
   If it isn't changed, it's fine.
   
   It it will be changed, the memory cannot be turn up even if the processing 
rate is turned up, right?



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##########
@@ -250,6 +251,22 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
                     .withDescription(
                             "Max allowed percentage of heap usage during 
scaling operations. Autoscaling will be paused if the heap usage exceeds this 
threshold.");
 
+    public static final ConfigOption<Boolean> MEMORY_TUNING_ENABLED =
+            autoScalerConfig("memory.tuning.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    
.withFallbackKeys(oldOperatorConfigKey("memory.tuning.enabled"))
+                    .withDescription(
+                            "If enabled, the initial amount of memory 
specified for TaskManagers will be reduced according to the observed needs.");
+
+    public static final ConfigOption<MemorySize> MEMORY_TUNING_MIN_HEAP =
+            autoScalerConfig("memory.tuning.heap.min")
+                    .memoryType()
+                    .defaultValue(MemorySize.ofMebiBytes(2048L))
+                    
.withFallbackKeys(oldOperatorConfigKey("memory.tuning.heap.min"))
+                    .withDescription(
+                            "The minimum amount of TaskManager memory, if 
memory tuning is enabled.");

Review Comment:
   ```suggestion
                               "The minimum amount of TaskManager heap memory, 
if memory tuning is enabled.");
   ```
   
   heap?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to