This is an automated email from the ASF dual-hosted git repository. mbalassi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit d9ec4b034078a64d08ed01705f5712a45c9a2ca7 Author: Mate Czagany <[email protected]> AuthorDate: Sun Apr 2 19:27:44 2023 +0200 [FLINK-31303] Fix fractional CPU calculation and added test --- .../operator/metrics/FlinkDeploymentMetrics.java | 12 +- .../operator/service/AbstractFlinkService.java | 39 ++---- .../kubernetes/operator/utils/FlinkUtils.java | 38 ++++++ .../metrics/FlinkDeploymentMetricsTest.java | 137 +++++++++++++++++++++ .../kubernetes/operator/utils/FlinkUtilsTest.java | 40 ++++++ 5 files changed, 229 insertions(+), 37 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java index 3f1cf394..36c09d95 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java @@ -71,6 +71,12 @@ public class FlinkDeploymentMetrics implements CustomResourceMetrics<FlinkDeploy .get(flinkApp.getStatus().getJobManagerDeploymentStatus()) .add(deploymentName); + var totalCpu = + NumberUtils.toDouble( + clusterInfo.getOrDefault(AbstractFlinkService.FIELD_NAME_TOTAL_CPU, "0")); + if (!Double.isFinite(totalCpu)) { + totalCpu = 0; + } deploymentCpuUsage .computeIfAbsent( namespace, @@ -78,11 +84,7 @@ public class FlinkDeploymentMetrics implements CustomResourceMetrics<FlinkDeploy initNamespaceCpuUsage(ns); return new ConcurrentHashMap<>(); }) - .put( - deploymentName, - NumberUtils.toDouble( - clusterInfo.getOrDefault( - AbstractFlinkService.FIELD_NAME_TOTAL_CPU, "0"))); + .put(deploymentName, totalCpu); deploymentMemoryUsage .computeIfAbsent( diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java index e59c085d..aef44c6f 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java @@ -26,10 +26,8 @@ import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; -import org.apache.flink.kubernetes.KubernetesClusterClientFactory; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator; -import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.FlinkSessionJob; import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec; @@ -52,7 +50,6 @@ import org.apache.flink.kubernetes.operator.utils.SavepointUtils; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices; -import org.apache.flink.runtime.instance.HardwareDescription; import org.apache.flink.runtime.jobgraph.RestoreMode; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.messages.webmonitor.JobDetails; @@ -73,7 +70,6 @@ import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHead import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters; import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders; import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody; -import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo; import org.apache.flink.runtime.rest.util.RestConstants; @@ -124,11 +120,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.function.Supplier; import java.util.jar.JarOutputStream; import java.util.jar.Manifest; import java.util.stream.Collectors; -import java.util.stream.Stream; import static org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION; import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.K8S_OP_CONF_PREFIX; @@ -659,32 +653,13 @@ public abstract class AbstractFlinkService implements FlinkService { dashboardConfiguration.getFlinkRevision()); } - // JobManager resource usage can be deduced from the CR - var jmParameters = - new KubernetesJobManagerParameters( - conf, new KubernetesClusterClientFactory().getClusterSpecification(conf)); - var jmTotalCpu = - jmParameters.getJobManagerCPU() - * jmParameters.getJobManagerCPULimitFactor() - * jmParameters.getReplicas(); - var jmTotalMemory = - Math.round( - jmParameters.getJobManagerMemoryMB() - * Math.pow(1024, 2) - * jmParameters.getJobManagerMemoryLimitFactor() - * jmParameters.getReplicas()); - - // TaskManager resource usage is best gathered from the REST API to get current replicas - var taskManagerInfos = getTaskManagersInfo(conf).getTaskManagerInfos(); - Supplier<Stream<HardwareDescription>> tmHardwareDesc = - () -> taskManagerInfos.stream().map(TaskManagerInfo::getHardwareDescription); - var tmTotalCpu = - tmHardwareDesc.get().mapToInt(HardwareDescription::getNumberOfCPUCores).sum(); - var tmTotalMemory = - tmHardwareDesc.get().mapToLong(HardwareDescription::getSizeOfPhysicalMemory).sum(); - - clusterInfo.put(FIELD_NAME_TOTAL_CPU, String.valueOf(tmTotalCpu + jmTotalCpu)); - clusterInfo.put(FIELD_NAME_TOTAL_MEMORY, String.valueOf(tmTotalMemory + jmTotalMemory)); + var taskManagerReplicas = getTaskManagersInfo(conf).getTaskManagerInfos().size(); + clusterInfo.put( + FIELD_NAME_TOTAL_CPU, + String.valueOf(FlinkUtils.calculateClusterCpuUsage(conf, taskManagerReplicas))); + clusterInfo.put( + FIELD_NAME_TOTAL_MEMORY, + String.valueOf(FlinkUtils.calculateClusterMemoryUsage(conf, taskManagerReplicas))); return clusterInfo; } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java index af6fc8f4..68b23e34 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java @@ -22,8 +22,10 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.KubernetesClusterClientFactory; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory; +import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters; import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion; import org.apache.flink.kubernetes.utils.Constants; @@ -294,6 +296,42 @@ public class FlinkUtils { return (parallelism + taskSlots - 1) / taskSlots; } + public static Double calculateClusterCpuUsage(Configuration conf, int taskManagerReplicas) { + var jmTotalCpu = + conf.getDouble(KubernetesConfigOptions.JOB_MANAGER_CPU) + * conf.getDouble(KubernetesConfigOptions.JOB_MANAGER_CPU_LIMIT_FACTOR) + * conf.get(KubernetesConfigOptions.KUBERNETES_JOBMANAGER_REPLICAS); + + var tmTotalCpu = + conf.getDouble(KubernetesConfigOptions.TASK_MANAGER_CPU, 1) + * conf.getDouble(KubernetesConfigOptions.TASK_MANAGER_CPU_LIMIT_FACTOR) + * taskManagerReplicas; + + return tmTotalCpu + jmTotalCpu; + } + + public static Long calculateClusterMemoryUsage(Configuration conf, int taskManagerReplicas) { + var clusterSpec = new KubernetesClusterClientFactory().getClusterSpecification(conf); + + var jmParameters = new KubernetesJobManagerParameters(conf, clusterSpec); + var jmTotalMemory = + Math.round( + jmParameters.getJobManagerMemoryMB() + * Math.pow(1024, 2) + * jmParameters.getJobManagerMemoryLimitFactor() + * jmParameters.getReplicas()); + + var tmTotalMemory = + Math.round( + clusterSpec.getTaskManagerMemoryMB() + * Math.pow(1024, 2) + * conf.getDouble( + KubernetesConfigOptions.TASK_MANAGER_MEMORY_LIMIT_FACTOR) + * taskManagerReplicas); + + return tmTotalMemory + jmTotalMemory; + } + public static void setGenerationAnnotation(Configuration conf, Long generation) { if (generation == null) { return; diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java index d1a7e3a0..d7c27287 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java @@ -22,11 +22,17 @@ import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; +import org.apache.flink.kubernetes.operator.service.AbstractFlinkService; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.util.Map; + import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.COUNTER_NAME; +import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.CPU_NAME; +import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.MEMORY_NAME; +import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.RESOURCE_USAGE_GROUP_NAME; import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.STATUS_GROUP_NAME; import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions.OPERATOR_RESOURCE_METRICS_ENABLED; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -187,6 +193,137 @@ public class FlinkDeploymentMetricsTest { } } + @Test + public void testResourceMetrics() { + var namespace1 = "ns1"; + var namespace2 = "ns2"; + var deployment1 = TestUtils.buildApplicationCluster("deployment1", namespace1); + var deployment2 = TestUtils.buildApplicationCluster("deployment2", namespace1); + var deployment3 = TestUtils.buildApplicationCluster("deployment3", namespace2); + + deployment1 + .getStatus() + .getClusterInfo() + .putAll( + Map.of( + AbstractFlinkService.FIELD_NAME_TOTAL_CPU, "5", + AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, "1024")); + + deployment2 + .getStatus() + .getClusterInfo() + .putAll( + Map.of( + AbstractFlinkService.FIELD_NAME_TOTAL_CPU, "10", + AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, "2048")); + + deployment3 + .getStatus() + .getClusterInfo() + .putAll( + Map.of( + AbstractFlinkService.FIELD_NAME_TOTAL_CPU, "13", + AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, "4096")); + + var cpuGroupId1 = + listener.getNamespaceMetricId( + FlinkDeployment.class, namespace1, RESOURCE_USAGE_GROUP_NAME, CPU_NAME); + var memoryGroupId1 = + listener.getNamespaceMetricId( + FlinkDeployment.class, namespace1, RESOURCE_USAGE_GROUP_NAME, MEMORY_NAME); + var cpuGroupId2 = + listener.getNamespaceMetricId( + FlinkDeployment.class, namespace2, RESOURCE_USAGE_GROUP_NAME, CPU_NAME); + var memoryGroupId2 = + listener.getNamespaceMetricId( + FlinkDeployment.class, namespace2, RESOURCE_USAGE_GROUP_NAME, MEMORY_NAME); + + assertTrue(listener.getGauge(cpuGroupId1).isEmpty()); + assertTrue(listener.getGauge(memoryGroupId1).isEmpty()); + assertTrue(listener.getGauge(cpuGroupId2).isEmpty()); + assertTrue(listener.getGauge(memoryGroupId2).isEmpty()); + + metricManager.onUpdate(deployment1); + metricManager.onUpdate(deployment2); + metricManager.onUpdate(deployment3); + + assertEquals(15D, listener.getGauge(cpuGroupId1).get().getValue()); + assertEquals(3072L, listener.getGauge(memoryGroupId1).get().getValue()); + assertEquals(13D, listener.getGauge(cpuGroupId2).get().getValue()); + assertEquals(4096L, listener.getGauge(memoryGroupId2).get().getValue()); + } + + @Test + public void testResourceMetricsWithInvalidInput() { + var namespace = "ns"; + var deployment = TestUtils.buildApplicationCluster("deployment", namespace); + + var cpuGroupId = + listener.getNamespaceMetricId( + FlinkDeployment.class, namespace, RESOURCE_USAGE_GROUP_NAME, CPU_NAME); + var memoryGroupId = + listener.getNamespaceMetricId( + FlinkDeployment.class, namespace, RESOURCE_USAGE_GROUP_NAME, MEMORY_NAME); + + metricManager.onUpdate(deployment); + + assertTrue(listener.getGauge(cpuGroupId).isPresent()); + assertTrue(listener.getGauge(memoryGroupId).isPresent()); + assertEquals(0D, listener.getGauge(cpuGroupId).get().getValue()); + assertEquals(0L, listener.getGauge(memoryGroupId).get().getValue()); + + deployment + .getStatus() + .getClusterInfo() + .putAll( + Map.of( + AbstractFlinkService.FIELD_NAME_TOTAL_CPU, "5", + AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, + "9223372036854775808")); + metricManager.onUpdate(deployment); + + assertEquals(5D, listener.getGauge(cpuGroupId).get().getValue()); + assertEquals(0L, listener.getGauge(memoryGroupId).get().getValue()); + + deployment + .getStatus() + .getClusterInfo() + .putAll( + Map.of( + AbstractFlinkService.FIELD_NAME_TOTAL_CPU, "null", + AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, + "9223372036854775808")); + metricManager.onUpdate(deployment); + + assertEquals(0D, listener.getGauge(cpuGroupId).get().getValue()); + assertEquals(0L, listener.getGauge(memoryGroupId).get().getValue()); + + deployment + .getStatus() + .getClusterInfo() + .putAll( + Map.of( + AbstractFlinkService.FIELD_NAME_TOTAL_CPU, "", + AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, "invalid")); + metricManager.onUpdate(deployment); + + assertEquals(0D, listener.getGauge(cpuGroupId).get().getValue()); + assertEquals(0L, listener.getGauge(memoryGroupId).get().getValue()); + + deployment + .getStatus() + .getClusterInfo() + .put(AbstractFlinkService.FIELD_NAME_TOTAL_CPU, "Infinity"); + deployment + .getStatus() + .getClusterInfo() + .remove(AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY); + metricManager.onUpdate(deployment); + + assertEquals(0D, listener.getGauge(cpuGroupId).get().getValue()); + assertEquals(0L, listener.getGauge(memoryGroupId).get().getValue()); + } + @Test public void testMetricsDisabled() { diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java index 832c1746..770a7042 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java @@ -21,11 +21,16 @@ package org.apache.flink.kubernetes.operator.utils; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.utils.Constants; import org.apache.flink.kubernetes.utils.KubernetesUtils; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import io.fabric8.kubernetes.api.model.ConfigMap; import io.fabric8.kubernetes.api.model.ConfigMapBuilder; @@ -227,6 +232,41 @@ public class FlinkUtilsTest { assertEquals(4, FlinkUtils.getNumTaskManagers(conf)); } + @Test + public void testCalculateClusterCpuUsage() { + Configuration conf = new Configuration(); + conf.set(KubernetesConfigOptions.KUBERNETES_JOBMANAGER_REPLICAS, 2); + conf.set(KubernetesConfigOptions.JOB_MANAGER_CPU, 2.5); + conf.set(KubernetesConfigOptions.JOB_MANAGER_CPU_LIMIT_FACTOR, 2.0); + conf.set(KubernetesConfigOptions.TASK_MANAGER_CPU, 3.5); + conf.set(KubernetesConfigOptions.TASK_MANAGER_CPU_LIMIT_FACTOR, 1.5); + + assertEquals(10, FlinkUtils.calculateClusterCpuUsage(conf, 0)); + assertEquals(15.25, FlinkUtils.calculateClusterCpuUsage(conf, 1)); + assertEquals(20.5, FlinkUtils.calculateClusterCpuUsage(conf, 2)); + } + + @Test + public void testCalculateClusterMemoryUsage() { + Configuration conf = new Configuration(); + conf.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.KUBERNETES.toString()); + + conf.set(KubernetesConfigOptions.KUBERNETES_JOBMANAGER_REPLICAS, 2); + conf.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1g")); + conf.set(KubernetesConfigOptions.JOB_MANAGER_MEMORY_LIMIT_FACTOR, 2.0); + + conf.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("2g")); + conf.set(KubernetesConfigOptions.TASK_MANAGER_MEMORY_LIMIT_FACTOR, 1.5); + + assertEquals( + MemorySize.parse("4g").getBytes(), FlinkUtils.calculateClusterMemoryUsage(conf, 0)); + assertEquals( + MemorySize.parse("7g").getBytes(), FlinkUtils.calculateClusterMemoryUsage(conf, 1)); + assertEquals( + MemorySize.parse("10g").getBytes(), + FlinkUtils.calculateClusterMemoryUsage(conf, 2)); + } + @Test public void testMergePodUsingArrayName() { Container container1 = new Container();
