This is an automated email from the ASF dual-hosted git repository.

mbalassi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit d9ec4b034078a64d08ed01705f5712a45c9a2ca7
Author: Mate Czagany <[email protected]>
AuthorDate: Sun Apr 2 19:27:44 2023 +0200

    [FLINK-31303] Fix fractional CPU calculation and added test
---
 .../operator/metrics/FlinkDeploymentMetrics.java   |  12 +-
 .../operator/service/AbstractFlinkService.java     |  39 ++----
 .../kubernetes/operator/utils/FlinkUtils.java      |  38 ++++++
 .../metrics/FlinkDeploymentMetricsTest.java        | 137 +++++++++++++++++++++
 .../kubernetes/operator/utils/FlinkUtilsTest.java  |  40 ++++++
 5 files changed, 229 insertions(+), 37 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java
index 3f1cf394..36c09d95 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java
@@ -71,6 +71,12 @@ public class FlinkDeploymentMetrics implements 
CustomResourceMetrics<FlinkDeploy
                 .get(flinkApp.getStatus().getJobManagerDeploymentStatus())
                 .add(deploymentName);
 
+        var totalCpu =
+                NumberUtils.toDouble(
+                        
clusterInfo.getOrDefault(AbstractFlinkService.FIELD_NAME_TOTAL_CPU, "0"));
+        if (!Double.isFinite(totalCpu)) {
+            totalCpu = 0;
+        }
         deploymentCpuUsage
                 .computeIfAbsent(
                         namespace,
@@ -78,11 +84,7 @@ public class FlinkDeploymentMetrics implements 
CustomResourceMetrics<FlinkDeploy
                             initNamespaceCpuUsage(ns);
                             return new ConcurrentHashMap<>();
                         })
-                .put(
-                        deploymentName,
-                        NumberUtils.toDouble(
-                                clusterInfo.getOrDefault(
-                                        
AbstractFlinkService.FIELD_NAME_TOTAL_CPU, "0")));
+                .put(deploymentName, totalCpu);
 
         deploymentMemoryUsage
                 .computeIfAbsent(
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index e59c085d..aef44c6f 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -26,10 +26,8 @@ import 
org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.kubernetes.KubernetesClusterClientFactory;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import 
org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
-import 
org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
@@ -52,7 +50,6 @@ import 
org.apache.flink.kubernetes.operator.utils.SavepointUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.execution.ExecutionState;
 import 
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
-import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.jobgraph.RestoreMode;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
@@ -73,7 +70,6 @@ import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHead
 import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
 import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
 import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
-import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
 import org.apache.flink.runtime.rest.util.RestConstants;
@@ -124,11 +120,9 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.function.Supplier;
 import java.util.jar.JarOutputStream;
 import java.util.jar.Manifest;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 import static 
org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION;
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.K8S_OP_CONF_PREFIX;
@@ -659,32 +653,13 @@ public abstract class AbstractFlinkService implements 
FlinkService {
                     dashboardConfiguration.getFlinkRevision());
         }
 
-        // JobManager resource usage can be deduced from the CR
-        var jmParameters =
-                new KubernetesJobManagerParameters(
-                        conf, new 
KubernetesClusterClientFactory().getClusterSpecification(conf));
-        var jmTotalCpu =
-                jmParameters.getJobManagerCPU()
-                        * jmParameters.getJobManagerCPULimitFactor()
-                        * jmParameters.getReplicas();
-        var jmTotalMemory =
-                Math.round(
-                        jmParameters.getJobManagerMemoryMB()
-                                * Math.pow(1024, 2)
-                                * jmParameters.getJobManagerMemoryLimitFactor()
-                                * jmParameters.getReplicas());
-
-        // TaskManager resource usage is best gathered from the REST API to 
get current replicas
-        var taskManagerInfos = getTaskManagersInfo(conf).getTaskManagerInfos();
-        Supplier<Stream<HardwareDescription>> tmHardwareDesc =
-                () -> 
taskManagerInfos.stream().map(TaskManagerInfo::getHardwareDescription);
-        var tmTotalCpu =
-                
tmHardwareDesc.get().mapToInt(HardwareDescription::getNumberOfCPUCores).sum();
-        var tmTotalMemory =
-                
tmHardwareDesc.get().mapToLong(HardwareDescription::getSizeOfPhysicalMemory).sum();
-
-        clusterInfo.put(FIELD_NAME_TOTAL_CPU, String.valueOf(tmTotalCpu + 
jmTotalCpu));
-        clusterInfo.put(FIELD_NAME_TOTAL_MEMORY, String.valueOf(tmTotalMemory 
+ jmTotalMemory));
+        var taskManagerReplicas = 
getTaskManagersInfo(conf).getTaskManagerInfos().size();
+        clusterInfo.put(
+                FIELD_NAME_TOTAL_CPU,
+                String.valueOf(FlinkUtils.calculateClusterCpuUsage(conf, 
taskManagerReplicas)));
+        clusterInfo.put(
+                FIELD_NAME_TOTAL_MEMORY,
+                String.valueOf(FlinkUtils.calculateClusterMemoryUsage(conf, 
taskManagerReplicas)));
 
         return clusterInfo;
     }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index af6fc8f4..68b23e34 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -22,8 +22,10 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.KubernetesClusterClientFactory;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
+import 
org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
 import org.apache.flink.kubernetes.utils.Constants;
@@ -294,6 +296,42 @@ public class FlinkUtils {
         return (parallelism + taskSlots - 1) / taskSlots;
     }
 
+    public static Double calculateClusterCpuUsage(Configuration conf, int 
taskManagerReplicas) {
+        var jmTotalCpu =
+                conf.getDouble(KubernetesConfigOptions.JOB_MANAGER_CPU)
+                        * 
conf.getDouble(KubernetesConfigOptions.JOB_MANAGER_CPU_LIMIT_FACTOR)
+                        * 
conf.get(KubernetesConfigOptions.KUBERNETES_JOBMANAGER_REPLICAS);
+
+        var tmTotalCpu =
+                conf.getDouble(KubernetesConfigOptions.TASK_MANAGER_CPU, 1)
+                        * 
conf.getDouble(KubernetesConfigOptions.TASK_MANAGER_CPU_LIMIT_FACTOR)
+                        * taskManagerReplicas;
+
+        return tmTotalCpu + jmTotalCpu;
+    }
+
+    public static Long calculateClusterMemoryUsage(Configuration conf, int 
taskManagerReplicas) {
+        var clusterSpec = new 
KubernetesClusterClientFactory().getClusterSpecification(conf);
+
+        var jmParameters = new KubernetesJobManagerParameters(conf, 
clusterSpec);
+        var jmTotalMemory =
+                Math.round(
+                        jmParameters.getJobManagerMemoryMB()
+                                * Math.pow(1024, 2)
+                                * jmParameters.getJobManagerMemoryLimitFactor()
+                                * jmParameters.getReplicas());
+
+        var tmTotalMemory =
+                Math.round(
+                        clusterSpec.getTaskManagerMemoryMB()
+                                * Math.pow(1024, 2)
+                                * conf.getDouble(
+                                        
KubernetesConfigOptions.TASK_MANAGER_MEMORY_LIMIT_FACTOR)
+                                * taskManagerReplicas);
+
+        return tmTotalMemory + jmTotalMemory;
+    }
+
     public static void setGenerationAnnotation(Configuration conf, Long 
generation) {
         if (generation == null) {
             return;
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java
index d1a7e3a0..d7c27287 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java
@@ -22,11 +22,17 @@ import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import 
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.service.AbstractFlinkService;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.util.Map;
+
 import static 
org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.COUNTER_NAME;
+import static 
org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.CPU_NAME;
+import static 
org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.MEMORY_NAME;
+import static 
org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.RESOURCE_USAGE_GROUP_NAME;
 import static 
org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.STATUS_GROUP_NAME;
 import static 
org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions.OPERATOR_RESOURCE_METRICS_ENABLED;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -187,6 +193,137 @@ public class FlinkDeploymentMetricsTest {
         }
     }
 
+    @Test
+    public void testResourceMetrics() {
+        var namespace1 = "ns1";
+        var namespace2 = "ns2";
+        var deployment1 = TestUtils.buildApplicationCluster("deployment1", 
namespace1);
+        var deployment2 = TestUtils.buildApplicationCluster("deployment2", 
namespace1);
+        var deployment3 = TestUtils.buildApplicationCluster("deployment3", 
namespace2);
+
+        deployment1
+                .getStatus()
+                .getClusterInfo()
+                .putAll(
+                        Map.of(
+                                AbstractFlinkService.FIELD_NAME_TOTAL_CPU, "5",
+                                AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, 
"1024"));
+
+        deployment2
+                .getStatus()
+                .getClusterInfo()
+                .putAll(
+                        Map.of(
+                                AbstractFlinkService.FIELD_NAME_TOTAL_CPU, 
"10",
+                                AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, 
"2048"));
+
+        deployment3
+                .getStatus()
+                .getClusterInfo()
+                .putAll(
+                        Map.of(
+                                AbstractFlinkService.FIELD_NAME_TOTAL_CPU, 
"13",
+                                AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, 
"4096"));
+
+        var cpuGroupId1 =
+                listener.getNamespaceMetricId(
+                        FlinkDeployment.class, namespace1, 
RESOURCE_USAGE_GROUP_NAME, CPU_NAME);
+        var memoryGroupId1 =
+                listener.getNamespaceMetricId(
+                        FlinkDeployment.class, namespace1, 
RESOURCE_USAGE_GROUP_NAME, MEMORY_NAME);
+        var cpuGroupId2 =
+                listener.getNamespaceMetricId(
+                        FlinkDeployment.class, namespace2, 
RESOURCE_USAGE_GROUP_NAME, CPU_NAME);
+        var memoryGroupId2 =
+                listener.getNamespaceMetricId(
+                        FlinkDeployment.class, namespace2, 
RESOURCE_USAGE_GROUP_NAME, MEMORY_NAME);
+
+        assertTrue(listener.getGauge(cpuGroupId1).isEmpty());
+        assertTrue(listener.getGauge(memoryGroupId1).isEmpty());
+        assertTrue(listener.getGauge(cpuGroupId2).isEmpty());
+        assertTrue(listener.getGauge(memoryGroupId2).isEmpty());
+
+        metricManager.onUpdate(deployment1);
+        metricManager.onUpdate(deployment2);
+        metricManager.onUpdate(deployment3);
+
+        assertEquals(15D, listener.getGauge(cpuGroupId1).get().getValue());
+        assertEquals(3072L, 
listener.getGauge(memoryGroupId1).get().getValue());
+        assertEquals(13D, listener.getGauge(cpuGroupId2).get().getValue());
+        assertEquals(4096L, 
listener.getGauge(memoryGroupId2).get().getValue());
+    }
+
+    @Test
+    public void testResourceMetricsWithInvalidInput() {
+        var namespace = "ns";
+        var deployment = TestUtils.buildApplicationCluster("deployment", 
namespace);
+
+        var cpuGroupId =
+                listener.getNamespaceMetricId(
+                        FlinkDeployment.class, namespace, 
RESOURCE_USAGE_GROUP_NAME, CPU_NAME);
+        var memoryGroupId =
+                listener.getNamespaceMetricId(
+                        FlinkDeployment.class, namespace, 
RESOURCE_USAGE_GROUP_NAME, MEMORY_NAME);
+
+        metricManager.onUpdate(deployment);
+
+        assertTrue(listener.getGauge(cpuGroupId).isPresent());
+        assertTrue(listener.getGauge(memoryGroupId).isPresent());
+        assertEquals(0D, listener.getGauge(cpuGroupId).get().getValue());
+        assertEquals(0L, listener.getGauge(memoryGroupId).get().getValue());
+
+        deployment
+                .getStatus()
+                .getClusterInfo()
+                .putAll(
+                        Map.of(
+                                AbstractFlinkService.FIELD_NAME_TOTAL_CPU, "5",
+                                AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY,
+                                        "9223372036854775808"));
+        metricManager.onUpdate(deployment);
+
+        assertEquals(5D, listener.getGauge(cpuGroupId).get().getValue());
+        assertEquals(0L, listener.getGauge(memoryGroupId).get().getValue());
+
+        deployment
+                .getStatus()
+                .getClusterInfo()
+                .putAll(
+                        Map.of(
+                                AbstractFlinkService.FIELD_NAME_TOTAL_CPU, 
"null",
+                                AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY,
+                                        "9223372036854775808"));
+        metricManager.onUpdate(deployment);
+
+        assertEquals(0D, listener.getGauge(cpuGroupId).get().getValue());
+        assertEquals(0L, listener.getGauge(memoryGroupId).get().getValue());
+
+        deployment
+                .getStatus()
+                .getClusterInfo()
+                .putAll(
+                        Map.of(
+                                AbstractFlinkService.FIELD_NAME_TOTAL_CPU, "",
+                                AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, 
"invalid"));
+        metricManager.onUpdate(deployment);
+
+        assertEquals(0D, listener.getGauge(cpuGroupId).get().getValue());
+        assertEquals(0L, listener.getGauge(memoryGroupId).get().getValue());
+
+        deployment
+                .getStatus()
+                .getClusterInfo()
+                .put(AbstractFlinkService.FIELD_NAME_TOTAL_CPU, "Infinity");
+        deployment
+                .getStatus()
+                .getClusterInfo()
+                .remove(AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY);
+        metricManager.onUpdate(deployment);
+
+        assertEquals(0D, listener.getGauge(cpuGroupId).get().getValue());
+        assertEquals(0L, listener.getGauge(memoryGroupId).get().getValue());
+    }
+
     @Test
     public void testMetricsDisabled() {
 
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
index 832c1746..770a7042 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
@@ -21,11 +21,16 @@ package org.apache.flink.kubernetes.operator.utils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 
 import io.fabric8.kubernetes.api.model.ConfigMap;
 import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
@@ -227,6 +232,41 @@ public class FlinkUtilsTest {
         assertEquals(4, FlinkUtils.getNumTaskManagers(conf));
     }
 
+    @Test
+    public void testCalculateClusterCpuUsage() {
+        Configuration conf = new Configuration();
+        conf.set(KubernetesConfigOptions.KUBERNETES_JOBMANAGER_REPLICAS, 2);
+        conf.set(KubernetesConfigOptions.JOB_MANAGER_CPU, 2.5);
+        conf.set(KubernetesConfigOptions.JOB_MANAGER_CPU_LIMIT_FACTOR, 2.0);
+        conf.set(KubernetesConfigOptions.TASK_MANAGER_CPU, 3.5);
+        conf.set(KubernetesConfigOptions.TASK_MANAGER_CPU_LIMIT_FACTOR, 1.5);
+
+        assertEquals(10, FlinkUtils.calculateClusterCpuUsage(conf, 0));
+        assertEquals(15.25, FlinkUtils.calculateClusterCpuUsage(conf, 1));
+        assertEquals(20.5, FlinkUtils.calculateClusterCpuUsage(conf, 2));
+    }
+
+    @Test
+    public void testCalculateClusterMemoryUsage() {
+        Configuration conf = new Configuration();
+        conf.set(HighAvailabilityOptions.HA_MODE, 
HighAvailabilityMode.KUBERNETES.toString());
+
+        conf.set(KubernetesConfigOptions.KUBERNETES_JOBMANAGER_REPLICAS, 2);
+        conf.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.parse("1g"));
+        conf.set(KubernetesConfigOptions.JOB_MANAGER_MEMORY_LIMIT_FACTOR, 2.0);
+
+        conf.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.parse("2g"));
+        conf.set(KubernetesConfigOptions.TASK_MANAGER_MEMORY_LIMIT_FACTOR, 
1.5);
+
+        assertEquals(
+                MemorySize.parse("4g").getBytes(), 
FlinkUtils.calculateClusterMemoryUsage(conf, 0));
+        assertEquals(
+                MemorySize.parse("7g").getBytes(), 
FlinkUtils.calculateClusterMemoryUsage(conf, 1));
+        assertEquals(
+                MemorySize.parse("10g").getBytes(),
+                FlinkUtils.calculateClusterMemoryUsage(conf, 2));
+    }
+
     @Test
     public void testMergePodUsingArrayName() {
         Container container1 = new Container();

Reply via email to