This is an automated email from the ASF dual-hosted git repository. mbalassi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 048c492d09c2208904908d3f15a3a235969a7c19 Author: Marton Balassi <[email protected]> AuthorDate: Tue Mar 14 16:25:40 2023 +0100 [FLINK-31303] Expose Flink application resource usage via metrics and status --- .../operator/metrics/FlinkDeploymentMetrics.java | 93 +++++++++++++++++++--- .../operator/service/AbstractFlinkService.java | 72 +++++++++++++++-- 2 files changed, 147 insertions(+), 18 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java index d41c6dde..3f1cf394 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java @@ -20,6 +20,9 @@ package org.apache.flink.kubernetes.operator.metrics; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; +import org.apache.flink.kubernetes.operator.service.AbstractFlinkService; + +import org.apache.commons.lang3.math.NumberUtils; import java.util.Map; import java.util.Set; @@ -30,10 +33,19 @@ public class FlinkDeploymentMetrics implements CustomResourceMetrics<FlinkDeploy private final KubernetesOperatorMetricGroup parentMetricGroup; private final Configuration configuration; - private final Map<String, Map<JobManagerDeploymentStatus, Set<String>>> deployments = + + // map(namespace, map(status, set(deployment)) + private final Map<String, Map<JobManagerDeploymentStatus, Set<String>>> deploymentStatuses = new ConcurrentHashMap<>(); + // map(namespace, map(deployment, cpu)) + private final Map<String, Map<String, Double>> deploymentCpuUsage = new ConcurrentHashMap<>(); + // map(namespace, map(deployment, memory)) + private final Map<String, Map<String, Long>> deploymentMemoryUsage = new ConcurrentHashMap<>(); public static final String STATUS_GROUP_NAME = "JmDeploymentStatus"; + public static final String RESOURCE_USAGE_GROUP_NAME = "ResourceUsage"; public static final String COUNTER_NAME = "Count"; + public static final String CPU_NAME = "Cpu"; + public static final String MEMORY_NAME = "Memory"; public FlinkDeploymentMetrics( KubernetesOperatorMetricGroup parentMetricGroup, Configuration configuration) { @@ -43,26 +55,60 @@ public class FlinkDeploymentMetrics implements CustomResourceMetrics<FlinkDeploy public void onUpdate(FlinkDeployment flinkApp) { onRemove(flinkApp); - deployments + + var namespace = flinkApp.getMetadata().getNamespace(); + var clusterInfo = flinkApp.getStatus().getClusterInfo(); + var deploymentName = flinkApp.getMetadata().getName(); + + deploymentStatuses .computeIfAbsent( - flinkApp.getMetadata().getNamespace(), + namespace, ns -> { initNamespaceDeploymentCounts(ns); initNamespaceStatusCounts(ns); return createDeploymentStatusMap(); }) .get(flinkApp.getStatus().getJobManagerDeploymentStatus()) - .add(flinkApp.getMetadata().getName()); + .add(deploymentName); + + deploymentCpuUsage + .computeIfAbsent( + namespace, + ns -> { + initNamespaceCpuUsage(ns); + return new ConcurrentHashMap<>(); + }) + .put( + deploymentName, + NumberUtils.toDouble( + clusterInfo.getOrDefault( + AbstractFlinkService.FIELD_NAME_TOTAL_CPU, "0"))); + + deploymentMemoryUsage + .computeIfAbsent( + namespace, + ns -> { + initNamespaceMemoryUsage(ns); + return new ConcurrentHashMap<>(); + }) + .put( + deploymentName, + NumberUtils.toLong( + clusterInfo.getOrDefault( + AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, "0"))); } public void onRemove(FlinkDeployment flinkApp) { - if (!deployments.containsKey(flinkApp.getMetadata().getNamespace())) { + var namespace = flinkApp.getMetadata().getNamespace(); + var name = flinkApp.getMetadata().getName(); + + if (!deploymentStatuses.containsKey(namespace)) { return; } - deployments - .get(flinkApp.getMetadata().getNamespace()) - .values() - .forEach(names -> names.remove(flinkApp.getMetadata().getName())); + deploymentStatuses.get(namespace).values().forEach(names -> names.remove(name)); + + deploymentCpuUsage.get(namespace).remove(name); + deploymentMemoryUsage.get(namespace).remove(name); } private void initNamespaceDeploymentCounts(String ns) { @@ -70,7 +116,10 @@ public class FlinkDeploymentMetrics implements CustomResourceMetrics<FlinkDeploy .createResourceNamespaceGroup(configuration, FlinkDeployment.class, ns) .gauge( COUNTER_NAME, - () -> deployments.get(ns).values().stream().mapToInt(Set::size).sum()); + () -> + deploymentStatuses.get(ns).values().stream() + .mapToInt(Set::size) + .sum()); } private void initNamespaceStatusCounts(String ns) { @@ -79,10 +128,32 @@ public class FlinkDeploymentMetrics implements CustomResourceMetrics<FlinkDeploy .createResourceNamespaceGroup(configuration, FlinkDeployment.class, ns) .addGroup(STATUS_GROUP_NAME) .addGroup(status.toString()) - .gauge(COUNTER_NAME, () -> deployments.get(ns).get(status).size()); + .gauge(COUNTER_NAME, () -> deploymentStatuses.get(ns).get(status).size()); } } + private void initNamespaceCpuUsage(String ns) { + parentMetricGroup + .createResourceNamespaceGroup(configuration, FlinkDeployment.class, ns) + .addGroup(RESOURCE_USAGE_GROUP_NAME) + .gauge( + CPU_NAME, + () -> + deploymentCpuUsage.get(ns).values().stream() + .reduce(0.0, Double::sum)); + } + + private void initNamespaceMemoryUsage(String ns) { + parentMetricGroup + .createResourceNamespaceGroup(configuration, FlinkDeployment.class, ns) + .addGroup(RESOURCE_USAGE_GROUP_NAME) + .gauge( + MEMORY_NAME, + () -> + deploymentMemoryUsage.get(ns).values().stream() + .reduce(0L, Long::sum)); + } + private Map<JobManagerDeploymentStatus, Set<String>> createDeploymentStatusMap() { Map<JobManagerDeploymentStatus, Set<String>> statuses = new ConcurrentHashMap<>(); for (JobManagerDeploymentStatus status : JobManagerDeploymentStatus.values()) { diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java index d0f2b16d..e59c085d 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java @@ -26,8 +26,10 @@ import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; +import org.apache.flink.kubernetes.KubernetesClusterClientFactory; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator; +import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.FlinkSessionJob; import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec; @@ -50,6 +52,7 @@ import org.apache.flink.kubernetes.operator.utils.SavepointUtils; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices; +import org.apache.flink.runtime.instance.HardwareDescription; import org.apache.flink.runtime.jobgraph.RestoreMode; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.messages.webmonitor.JobDetails; @@ -70,6 +73,9 @@ import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHead import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters; import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders; import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo; import org.apache.flink.runtime.rest.util.RestConstants; import org.apache.flink.runtime.state.memory.NonPersistentMetadataCheckpointStorageLocation; import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHeaders; @@ -118,9 +124,11 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; import java.util.jar.JarOutputStream; import java.util.jar.Manifest; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION; import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.K8S_OP_CONF_PREFIX; @@ -133,6 +141,8 @@ public abstract class AbstractFlinkService implements FlinkService { private static final Logger LOG = LoggerFactory.getLogger(AbstractFlinkService.class); private static final String EMPTY_JAR_FILENAME = "empty.jar"; + public static final String FIELD_NAME_TOTAL_CPU = "total-cpu"; + public static final String FIELD_NAME_TOTAL_MEMORY = "total-memory"; protected final KubernetesClient kubernetesClient; protected final FlinkConfigManager configManager; @@ -623,7 +633,7 @@ public abstract class AbstractFlinkService implements FlinkService { @Override public Map<String, String> getClusterInfo(Configuration conf) throws Exception { - Map<String, String> runtimeVersion = new HashMap<>(); + Map<String, String> clusterInfo = new HashMap<>(); try (RestClusterClient<String> clusterClient = (RestClusterClient<String>) getClusterClient(conf)) { @@ -641,14 +651,42 @@ public abstract class AbstractFlinkService implements FlinkService { .toSeconds(), TimeUnit.SECONDS); - runtimeVersion.put( + clusterInfo.put( DashboardConfiguration.FIELD_NAME_FLINK_VERSION, dashboardConfiguration.getFlinkVersion()); - runtimeVersion.put( + clusterInfo.put( DashboardConfiguration.FIELD_NAME_FLINK_REVISION, dashboardConfiguration.getFlinkRevision()); } - return runtimeVersion; + + // JobManager resource usage can be deduced from the CR + var jmParameters = + new KubernetesJobManagerParameters( + conf, new KubernetesClusterClientFactory().getClusterSpecification(conf)); + var jmTotalCpu = + jmParameters.getJobManagerCPU() + * jmParameters.getJobManagerCPULimitFactor() + * jmParameters.getReplicas(); + var jmTotalMemory = + Math.round( + jmParameters.getJobManagerMemoryMB() + * Math.pow(1024, 2) + * jmParameters.getJobManagerMemoryLimitFactor() + * jmParameters.getReplicas()); + + // TaskManager resource usage is best gathered from the REST API to get current replicas + var taskManagerInfos = getTaskManagersInfo(conf).getTaskManagerInfos(); + Supplier<Stream<HardwareDescription>> tmHardwareDesc = + () -> taskManagerInfos.stream().map(TaskManagerInfo::getHardwareDescription); + var tmTotalCpu = + tmHardwareDesc.get().mapToInt(HardwareDescription::getNumberOfCPUCores).sum(); + var tmTotalMemory = + tmHardwareDesc.get().mapToLong(HardwareDescription::getSizeOfPhysicalMemory).sum(); + + clusterInfo.put(FIELD_NAME_TOTAL_CPU, String.valueOf(tmTotalCpu + jmTotalCpu)); + clusterInfo.put(FIELD_NAME_TOTAL_MEMORY, String.valueOf(tmTotalMemory + jmTotalMemory)); + + return clusterInfo; } @Override @@ -909,8 +947,7 @@ public abstract class AbstractFlinkService implements FlinkService { public Map<String, String> getMetrics( Configuration conf, String jobId, List<String> metricNames) throws Exception { - try (RestClusterClient<String> clusterClient = - (RestClusterClient<String>) getClusterClient(conf)) { + try (var clusterClient = (RestClusterClient<String>) getClusterClient(conf)) { var jobMetricsMessageParameters = JobMetricsHeaders.getInstance().getUnresolvedMessageParameters(); jobMetricsMessageParameters.jobPathParameter.resolve(JobID.fromHexString(jobId)); @@ -921,13 +958,34 @@ public abstract class AbstractFlinkService implements FlinkService { JobMetricsHeaders.getInstance(), jobMetricsMessageParameters, EmptyRequestBody.getInstance()) - .get(); + .get( + configManager + .getOperatorConfiguration() + .getFlinkClientTimeout() + .toSeconds(), + TimeUnit.SECONDS); return responseBody.getMetrics().stream() .map(metric -> Tuple2.of(metric.getId(), metric.getValue())) .collect(Collectors.toMap((t) -> t.f0, (t) -> t.f1)); } } + public TaskManagersInfo getTaskManagersInfo(Configuration conf) throws Exception { + try (var clusterClient = (RestClusterClient<String>) getClusterClient(conf)) { + return clusterClient + .sendRequest( + TaskManagersHeaders.getInstance(), + EmptyMessageParameters.getInstance(), + EmptyRequestBody.getInstance()) + .get( + configManager + .getOperatorConfiguration() + .getFlinkClientTimeout() + .toSeconds(), + TimeUnit.SECONDS); + } + } + @Override public final void deleteClusterDeployment( ObjectMeta meta,
