This is an automated email from the ASF dual-hosted git repository.

mbalassi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 048c492d09c2208904908d3f15a3a235969a7c19
Author: Marton Balassi <[email protected]>
AuthorDate: Tue Mar 14 16:25:40 2023 +0100

    [FLINK-31303] Expose Flink application resource usage via metrics and status
---
 .../operator/metrics/FlinkDeploymentMetrics.java   | 93 +++++++++++++++++++---
 .../operator/service/AbstractFlinkService.java     | 72 +++++++++++++++--
 2 files changed, 147 insertions(+), 18 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java
index d41c6dde..3f1cf394 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java
@@ -20,6 +20,9 @@ package org.apache.flink.kubernetes.operator.metrics;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import 
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.service.AbstractFlinkService;
+
+import org.apache.commons.lang3.math.NumberUtils;
 
 import java.util.Map;
 import java.util.Set;
@@ -30,10 +33,19 @@ public class FlinkDeploymentMetrics implements 
CustomResourceMetrics<FlinkDeploy
 
     private final KubernetesOperatorMetricGroup parentMetricGroup;
     private final Configuration configuration;
-    private final Map<String, Map<JobManagerDeploymentStatus, Set<String>>> 
deployments =
+
+    // map(namespace, map(status, set(deployment))
+    private final Map<String, Map<JobManagerDeploymentStatus, Set<String>>> 
deploymentStatuses =
             new ConcurrentHashMap<>();
+    // map(namespace, map(deployment, cpu))
+    private final Map<String, Map<String, Double>> deploymentCpuUsage = new 
ConcurrentHashMap<>();
+    // map(namespace, map(deployment, memory))
+    private final Map<String, Map<String, Long>> deploymentMemoryUsage = new 
ConcurrentHashMap<>();
     public static final String STATUS_GROUP_NAME = "JmDeploymentStatus";
+    public static final String RESOURCE_USAGE_GROUP_NAME = "ResourceUsage";
     public static final String COUNTER_NAME = "Count";
+    public static final String CPU_NAME = "Cpu";
+    public static final String MEMORY_NAME = "Memory";
 
     public FlinkDeploymentMetrics(
             KubernetesOperatorMetricGroup parentMetricGroup, Configuration 
configuration) {
@@ -43,26 +55,60 @@ public class FlinkDeploymentMetrics implements 
CustomResourceMetrics<FlinkDeploy
 
     public void onUpdate(FlinkDeployment flinkApp) {
         onRemove(flinkApp);
-        deployments
+
+        var namespace = flinkApp.getMetadata().getNamespace();
+        var clusterInfo = flinkApp.getStatus().getClusterInfo();
+        var deploymentName = flinkApp.getMetadata().getName();
+
+        deploymentStatuses
                 .computeIfAbsent(
-                        flinkApp.getMetadata().getNamespace(),
+                        namespace,
                         ns -> {
                             initNamespaceDeploymentCounts(ns);
                             initNamespaceStatusCounts(ns);
                             return createDeploymentStatusMap();
                         })
                 .get(flinkApp.getStatus().getJobManagerDeploymentStatus())
-                .add(flinkApp.getMetadata().getName());
+                .add(deploymentName);
+
+        deploymentCpuUsage
+                .computeIfAbsent(
+                        namespace,
+                        ns -> {
+                            initNamespaceCpuUsage(ns);
+                            return new ConcurrentHashMap<>();
+                        })
+                .put(
+                        deploymentName,
+                        NumberUtils.toDouble(
+                                clusterInfo.getOrDefault(
+                                        
AbstractFlinkService.FIELD_NAME_TOTAL_CPU, "0")));
+
+        deploymentMemoryUsage
+                .computeIfAbsent(
+                        namespace,
+                        ns -> {
+                            initNamespaceMemoryUsage(ns);
+                            return new ConcurrentHashMap<>();
+                        })
+                .put(
+                        deploymentName,
+                        NumberUtils.toLong(
+                                clusterInfo.getOrDefault(
+                                        
AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, "0")));
     }
 
     public void onRemove(FlinkDeployment flinkApp) {
-        if (!deployments.containsKey(flinkApp.getMetadata().getNamespace())) {
+        var namespace = flinkApp.getMetadata().getNamespace();
+        var name = flinkApp.getMetadata().getName();
+
+        if (!deploymentStatuses.containsKey(namespace)) {
             return;
         }
-        deployments
-                .get(flinkApp.getMetadata().getNamespace())
-                .values()
-                .forEach(names -> 
names.remove(flinkApp.getMetadata().getName()));
+        deploymentStatuses.get(namespace).values().forEach(names -> 
names.remove(name));
+
+        deploymentCpuUsage.get(namespace).remove(name);
+        deploymentMemoryUsage.get(namespace).remove(name);
     }
 
     private void initNamespaceDeploymentCounts(String ns) {
@@ -70,7 +116,10 @@ public class FlinkDeploymentMetrics implements 
CustomResourceMetrics<FlinkDeploy
                 .createResourceNamespaceGroup(configuration, 
FlinkDeployment.class, ns)
                 .gauge(
                         COUNTER_NAME,
-                        () -> 
deployments.get(ns).values().stream().mapToInt(Set::size).sum());
+                        () ->
+                                deploymentStatuses.get(ns).values().stream()
+                                        .mapToInt(Set::size)
+                                        .sum());
     }
 
     private void initNamespaceStatusCounts(String ns) {
@@ -79,10 +128,32 @@ public class FlinkDeploymentMetrics implements 
CustomResourceMetrics<FlinkDeploy
                     .createResourceNamespaceGroup(configuration, 
FlinkDeployment.class, ns)
                     .addGroup(STATUS_GROUP_NAME)
                     .addGroup(status.toString())
-                    .gauge(COUNTER_NAME, () -> 
deployments.get(ns).get(status).size());
+                    .gauge(COUNTER_NAME, () -> 
deploymentStatuses.get(ns).get(status).size());
         }
     }
 
+    private void initNamespaceCpuUsage(String ns) {
+        parentMetricGroup
+                .createResourceNamespaceGroup(configuration, 
FlinkDeployment.class, ns)
+                .addGroup(RESOURCE_USAGE_GROUP_NAME)
+                .gauge(
+                        CPU_NAME,
+                        () ->
+                                deploymentCpuUsage.get(ns).values().stream()
+                                        .reduce(0.0, Double::sum));
+    }
+
+    private void initNamespaceMemoryUsage(String ns) {
+        parentMetricGroup
+                .createResourceNamespaceGroup(configuration, 
FlinkDeployment.class, ns)
+                .addGroup(RESOURCE_USAGE_GROUP_NAME)
+                .gauge(
+                        MEMORY_NAME,
+                        () ->
+                                deploymentMemoryUsage.get(ns).values().stream()
+                                        .reduce(0L, Long::sum));
+    }
+
     private Map<JobManagerDeploymentStatus, Set<String>> 
createDeploymentStatusMap() {
         Map<JobManagerDeploymentStatus, Set<String>> statuses = new 
ConcurrentHashMap<>();
         for (JobManagerDeploymentStatus status : 
JobManagerDeploymentStatus.values()) {
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index d0f2b16d..e59c085d 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -26,8 +26,10 @@ import 
org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.kubernetes.KubernetesClusterClientFactory;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import 
org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
+import 
org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
@@ -50,6 +52,7 @@ import 
org.apache.flink.kubernetes.operator.utils.SavepointUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.execution.ExecutionState;
 import 
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.jobgraph.RestoreMode;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
@@ -70,6 +73,9 @@ import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHead
 import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
 import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
 import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
 import org.apache.flink.runtime.rest.util.RestConstants;
 import 
org.apache.flink.runtime.state.memory.NonPersistentMetadataCheckpointStorageLocation;
 import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHeaders;
@@ -118,9 +124,11 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
 import java.util.jar.JarOutputStream;
 import java.util.jar.Manifest;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static 
org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION;
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.K8S_OP_CONF_PREFIX;
@@ -133,6 +141,8 @@ public abstract class AbstractFlinkService implements 
FlinkService {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AbstractFlinkService.class);
     private static final String EMPTY_JAR_FILENAME = "empty.jar";
+    public static final String FIELD_NAME_TOTAL_CPU = "total-cpu";
+    public static final String FIELD_NAME_TOTAL_MEMORY = "total-memory";
 
     protected final KubernetesClient kubernetesClient;
     protected final FlinkConfigManager configManager;
@@ -623,7 +633,7 @@ public abstract class AbstractFlinkService implements 
FlinkService {
 
     @Override
     public Map<String, String> getClusterInfo(Configuration conf) throws 
Exception {
-        Map<String, String> runtimeVersion = new HashMap<>();
+        Map<String, String> clusterInfo = new HashMap<>();
 
         try (RestClusterClient<String> clusterClient =
                 (RestClusterClient<String>) getClusterClient(conf)) {
@@ -641,14 +651,42 @@ public abstract class AbstractFlinkService implements 
FlinkService {
                                             .toSeconds(),
                                     TimeUnit.SECONDS);
 
-            runtimeVersion.put(
+            clusterInfo.put(
                     DashboardConfiguration.FIELD_NAME_FLINK_VERSION,
                     dashboardConfiguration.getFlinkVersion());
-            runtimeVersion.put(
+            clusterInfo.put(
                     DashboardConfiguration.FIELD_NAME_FLINK_REVISION,
                     dashboardConfiguration.getFlinkRevision());
         }
-        return runtimeVersion;
+
+        // JobManager resource usage can be deduced from the CR
+        var jmParameters =
+                new KubernetesJobManagerParameters(
+                        conf, new 
KubernetesClusterClientFactory().getClusterSpecification(conf));
+        var jmTotalCpu =
+                jmParameters.getJobManagerCPU()
+                        * jmParameters.getJobManagerCPULimitFactor()
+                        * jmParameters.getReplicas();
+        var jmTotalMemory =
+                Math.round(
+                        jmParameters.getJobManagerMemoryMB()
+                                * Math.pow(1024, 2)
+                                * jmParameters.getJobManagerMemoryLimitFactor()
+                                * jmParameters.getReplicas());
+
+        // TaskManager resource usage is best gathered from the REST API to 
get current replicas
+        var taskManagerInfos = getTaskManagersInfo(conf).getTaskManagerInfos();
+        Supplier<Stream<HardwareDescription>> tmHardwareDesc =
+                () -> 
taskManagerInfos.stream().map(TaskManagerInfo::getHardwareDescription);
+        var tmTotalCpu =
+                
tmHardwareDesc.get().mapToInt(HardwareDescription::getNumberOfCPUCores).sum();
+        var tmTotalMemory =
+                
tmHardwareDesc.get().mapToLong(HardwareDescription::getSizeOfPhysicalMemory).sum();
+
+        clusterInfo.put(FIELD_NAME_TOTAL_CPU, String.valueOf(tmTotalCpu + 
jmTotalCpu));
+        clusterInfo.put(FIELD_NAME_TOTAL_MEMORY, String.valueOf(tmTotalMemory 
+ jmTotalMemory));
+
+        return clusterInfo;
     }
 
     @Override
@@ -909,8 +947,7 @@ public abstract class AbstractFlinkService implements 
FlinkService {
 
     public Map<String, String> getMetrics(
             Configuration conf, String jobId, List<String> metricNames) throws 
Exception {
-        try (RestClusterClient<String> clusterClient =
-                (RestClusterClient<String>) getClusterClient(conf)) {
+        try (var clusterClient = (RestClusterClient<String>) 
getClusterClient(conf)) {
             var jobMetricsMessageParameters =
                     
JobMetricsHeaders.getInstance().getUnresolvedMessageParameters();
             
jobMetricsMessageParameters.jobPathParameter.resolve(JobID.fromHexString(jobId));
@@ -921,13 +958,34 @@ public abstract class AbstractFlinkService implements 
FlinkService {
                                     JobMetricsHeaders.getInstance(),
                                     jobMetricsMessageParameters,
                                     EmptyRequestBody.getInstance())
-                            .get();
+                            .get(
+                                    configManager
+                                            .getOperatorConfiguration()
+                                            .getFlinkClientTimeout()
+                                            .toSeconds(),
+                                    TimeUnit.SECONDS);
             return responseBody.getMetrics().stream()
                     .map(metric -> Tuple2.of(metric.getId(), 
metric.getValue()))
                     .collect(Collectors.toMap((t) -> t.f0, (t) -> t.f1));
         }
     }
 
+    public TaskManagersInfo getTaskManagersInfo(Configuration conf) throws 
Exception {
+        try (var clusterClient = (RestClusterClient<String>) 
getClusterClient(conf)) {
+            return clusterClient
+                    .sendRequest(
+                            TaskManagersHeaders.getInstance(),
+                            EmptyMessageParameters.getInstance(),
+                            EmptyRequestBody.getInstance())
+                    .get(
+                            configManager
+                                    .getOperatorConfiguration()
+                                    .getFlinkClientTimeout()
+                                    .toSeconds(),
+                            TimeUnit.SECONDS);
+        }
+    }
+
     @Override
     public final void deleteClusterDeployment(
             ObjectMeta meta,

Reply via email to