This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 48b18cdd [FLINK-33771] Add cluster capacity awareness to autoscaler 
(#751)
48b18cdd is described below

commit 48b18cddd56d4b5d4ffbc93d9e352aa83099f26b
Author: Maximilian Michels <m...@apache.org>
AuthorDate: Thu Jan 18 13:19:33 2024 +0100

    [FLINK-33771] Add cluster capacity awareness to autoscaler (#751)
    
    To avoid starvation of pipelines when the Kubernetes cluster runs out of
    resources, new scaling attempts should be stopped when the no more 
additional
    pods can be scheduled for rescaling.
    
    While Flink's ResourceRequirement API can prevent some of these cases, it
    requires using Flink 1.18 and an entirely different Flink scheduler. 
Extensive
    testing still has to be done with the new scheduler and the rescaling
    behavior. We woud hand off control over the rescale time to Flink which uses
    various parameters to control the exact scaling behavior.
    
    For the config-based parallelism overrides, we have pretty good heuristics 
in
    the operator to check in Kubernetes for the approximate number of free 
cluster
    resources, the max cluster scaleup for the Cluster Autoscaler, and the 
required
    scaling costs. Having cluster resource information will also allow to 
implement
    fairness between all the autoscaled pipelines.
    
    This PR adds ClusterResourceManager which which provides a view over the
    allocatable resources within a Kubernetes cluster and allows to simulate
    scheduling pods with a defined number of required resources.
    
    The goal is to provide a good indicator for whether resources needed for
    autoscaling are going to be available. This is achieved by pulling the node
    resource usage from the Kubernetes cluster at a regular configurable 
interval,
    after which we use this data to simulate adding / removing
    resources (pods). Note that this is merely a (pretty good) heuristic 
because the
    Kubernetes scheduler has the final saying. However, we prevent 99% of the
    scenarios after pipeline outages which can lead to massive scale up where 
all
    pipelines may be scaled up at the same time and exhaust the number of 
available
    resources.
    
    The simulation can run on a fixed set of Kubernetes nodes. Additionally, if 
we
    detect that the cluster is using the Kubernetes Cluster Autoscaler, we will 
use
    this data to extrapolate the number of nodes to the maximum defined nodes 
in the
    autoscaler configuration.  We currently track CPU and memory. Ephemeral 
storage
    is missing because there is no easy way to get node statics on free storage.
---
 .../kubernetes_operator_config_configuration.html  |   6 +
 .../generated/system_advanced_section.html         |   6 +
 .../flinkcluster/FlinkClusterJobListFetcher.java   |   3 +
 .../StandaloneAutoscalerExecutorTest.java          |  10 +-
 .../realizer/RescaleApiScalingRealizerTest.java    |  12 +-
 .../flink/autoscaler/JobAutoScalerContext.java     |  11 +-
 .../flink/autoscaler/RestApiMetricsCollector.java  |  38 +++
 .../apache/flink/autoscaler/ScalingExecutor.java   |  86 ++++++-
 .../flink/autoscaler/ScalingMetricCollector.java   |  23 +-
 .../flink/autoscaler/ScalingMetricEvaluator.java   |   6 +
 .../flink/autoscaler/metrics/FlinkMetric.java      |   4 +-
 .../flink/autoscaler/metrics/ScalingMetric.java    |   4 +-
 .../flink/autoscaler/metrics/ScalingMetrics.java   |  16 ++
 .../autoscaler/resources/NoopResourceCheck.java    |  30 +++
 .../flink/autoscaler/resources/ResourceCheck.java  |  39 +++
 .../flink/autoscaler/JobVertexScalerTest.java      |   3 +
 .../autoscaler/RecommendedParallelismTest.java     |   5 +
 .../autoscaler/RestApiMetricsCollectorTest.java    |  54 ++++
 .../flink/autoscaler/ScalingExecutorTest.java      |  50 ++++
 .../autoscaler/ScalingMetricEvaluatorTest.java     |  18 +-
 .../flink/autoscaler/TestingAutoscalerUtils.java   |  29 ++-
 .../flink/autoscaler/TestingMetricsCollector.java  |   6 +
 .../autoscaler/metrics/ScalingMetricsTest.java     |   6 +-
 .../flink/kubernetes/operator/FlinkOperator.java   |   7 +-
 .../operator/autoscaler/AutoscalerFactory.java     |   7 +-
 .../autoscaler/KubernetesJobAutoScalerContext.java |   9 +
 .../config/KubernetesOperatorConfigOptions.java    |   8 +
 .../operator/resources/ClusterResourceManager.java | 285 +++++++++++++++++++++
 .../operator/resources/ClusterResourceView.java    |  70 +++++
 .../resources/KubernetesNodeResourceInfo.java      |  74 ++++++
 .../operator/resources/KubernetesResource.java     |  71 +++++
 .../operator/autoscaler/AutoscalerFactoryTest.java |   8 +-
 .../KubernetesJobAutoScalerContextTest.java        |  69 +++++
 .../TestingFlinkDeploymentController.java          |   7 +-
 .../resources/ClusterResourceManagerTest.java      | 242 +++++++++++++++++
 .../resources/ClusterResourceViewTest.java         |  67 +++++
 .../operator/resources/KubernetesResourceTest.java |  64 +++++
 37 files changed, 1413 insertions(+), 40 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
 
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index 893d5eab..e4778ed9 100644
--- 
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++ 
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -50,6 +50,12 @@
             <td>Duration</td>
             <td>The duration of the time window where job restart count 
measured.</td>
         </tr>
+        <tr>
+            
<td><h5>kubernetes.operator.cluster.resource-view.refresh-interval</h5></td>
+            <td style="word-wrap: break-word;">-1 min</td>
+            <td>Duration</td>
+            <td>How often to retrieve Kubernetes cluster resource usage 
information. This information is used to avoid running out of cluster resources 
when scaling up resources. Negative values disable the feature.</td>
+        </tr>
         <tr>
             <td><h5>kubernetes.operator.config.cache.size</h5></td>
             <td style="word-wrap: break-word;">1000</td>
diff --git a/docs/layouts/shortcodes/generated/system_advanced_section.html 
b/docs/layouts/shortcodes/generated/system_advanced_section.html
index 487f6fc7..f24d9d9e 100644
--- a/docs/layouts/shortcodes/generated/system_advanced_section.html
+++ b/docs/layouts/shortcodes/generated/system_advanced_section.html
@@ -8,6 +8,12 @@
         </tr>
     </thead>
     <tbody>
+        <tr>
+            
<td><h5>kubernetes.operator.cluster.resource-view.refresh-interval</h5></td>
+            <td style="word-wrap: break-word;">-1 min</td>
+            <td>Duration</td>
+            <td>How often to retrieve Kubernetes cluster resource usage 
information. This information is used to avoid running out of cluster resources 
when scaling up resources. Negative values disable the feature.</td>
+        </tr>
         <tr>
             <td><h5>kubernetes.operator.config.cache.size</h5></td>
             <td style="word-wrap: break-word;">1000</td>
diff --git 
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcher.java
 
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcher.java
index e99424c5..63d79cd4 100644
--- 
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcher.java
+++ 
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcher.java
@@ -22,6 +22,7 @@ import org.apache.flink.autoscaler.JobAutoScalerContext;
 import org.apache.flink.autoscaler.standalone.JobListFetcher;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
@@ -82,6 +83,8 @@ public class FlinkClusterJobListFetcher
                 jobStatusMessage.getJobState(),
                 conf,
                 new UnregisteredMetricsGroup(),
+                0,
+                MemorySize.ZERO,
                 () -> restClientGetter.apply(conf));
     }
 
diff --git 
a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java
 
b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java
index ca905071..d2e50092 100644
--- 
a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java
+++ 
b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.autoscaler.JobAutoScaler;
 import org.apache.flink.autoscaler.JobAutoScalerContext;
 import org.apache.flink.autoscaler.event.TestingEventCollector;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -230,6 +231,13 @@ class StandaloneAutoscalerExecutorTest {
     private static JobAutoScalerContext<JobID> createJobAutoScalerContext() {
         var jobID = new JobID();
         return new JobAutoScalerContext<>(
-                jobID, jobID, JobStatus.RUNNING, new Configuration(), null, 
null);
+                jobID,
+                jobID,
+                JobStatus.RUNNING,
+                new Configuration(),
+                null,
+                0,
+                MemorySize.ZERO,
+                null);
     }
 }
diff --git 
a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizerTest.java
 
b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizerTest.java
index b5f769e0..6b209eb0 100644
--- 
a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizerTest.java
+++ 
b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizerTest.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.autoscaler.event.TestingEventCollector;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
 import 
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
 import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -148,6 +149,8 @@ class RescaleApiScalingRealizerTest {
                         JobStatus.CANCELLING,
                         conf,
                         null,
+                        0,
+                        MemorySize.ZERO,
                         () ->
                                 fail(
                                         "The rest client shouldn't be created 
if the job isn't running."));
@@ -164,7 +167,14 @@ class RescaleApiScalingRealizerTest {
             JobID jobID,
             SupplierWithException<RestClusterClient<String>, Exception> 
restClientSupplier) {
         return new JobAutoScalerContext<>(
-                jobID, jobID, JobStatus.RUNNING, new Configuration(), null, 
restClientSupplier);
+                jobID,
+                jobID,
+                JobStatus.RUNNING,
+                new Configuration(),
+                null,
+                0,
+                MemorySize.ZERO,
+                restClientSupplier);
     }
 
     private JobResourceRequirements createResourceRequirements(
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerContext.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerContext.java
index 121e5359..026e0fb3 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerContext.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerContext.java
@@ -22,12 +22,13 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.util.function.SupplierWithException;
 
-import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 import lombok.ToString;
 
 import javax.annotation.Nullable;
@@ -38,7 +39,7 @@ import javax.annotation.Nullable;
  * @param <KEY> The job key.
  */
 @Experimental
-@AllArgsConstructor
+@RequiredArgsConstructor
 @ToString
 @Builder(toBuilder = true)
 public class JobAutoScalerContext<KEY> {
@@ -55,6 +56,12 @@ public class JobAutoScalerContext<KEY> {
 
     @Getter private final MetricGroup metricGroup;
 
+    /** Task manager CPU as a fraction (if available). */
+    @Getter private final double taskManagerCpu;
+
+    /** Task manager memory size (if available). */
+    @Getter @Nullable private final MemorySize taskManagerMemory;
+
     @ToString.Exclude
     private final SupplierWithException<RestClusterClient<String>, Exception> 
restClientSupplier;
 
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java
index e3fa58e5..572bec9b 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java
@@ -29,6 +29,9 @@ import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsRespo
 import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
 import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
 import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedTaskManagerMetricsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.Metric;
 import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricsAggregationParameter;
 import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
 
@@ -103,6 +106,41 @@ public class RestApiMetricsCollector<KEY, Context extends 
JobAutoScalerContext<K
         }
     }
 
+    @Override
+    @SneakyThrows
+    protected Map<FlinkMetric, Metric> queryJmMetrics(Context ctx) {
+        Map<String, FlinkMetric> metrics =
+                Map.of(
+                        "taskSlotsTotal", FlinkMetric.NUM_TASK_SLOTS_TOTAL,
+                        "taskSlotsAvailable", 
FlinkMetric.NUM_TASK_SLOTS_AVAILABLE);
+        try (var restClient = ctx.getRestClusterClient()) {
+            return queryJmMetrics(restClient, metrics);
+        }
+    }
+
+    @SneakyThrows
+    protected Map<FlinkMetric, Metric> queryJmMetrics(
+            RestClusterClient<?> restClient, Map<String, FlinkMetric> metrics) 
{
+
+        var parameters = new JobManagerMetricsMessageParameters();
+        var queryParamIt = parameters.getQueryParameters().iterator();
+
+        MetricsFilterParameter filterParameter = (MetricsFilterParameter) 
queryParamIt.next();
+        filterParameter.resolve(List.copyOf(metrics.keySet()));
+
+        var responseBody =
+                restClient
+                        .sendRequest(
+                                JobManagerMetricsHeaders.getInstance(),
+                                parameters,
+                                EmptyRequestBody.getInstance())
+                        .get();
+
+        return responseBody.getMetrics().stream()
+                .collect(Collectors.toMap(m -> metrics.get(m.getId()), m -> 
m));
+    }
+
+    @Override
     protected Map<FlinkMetric, AggregatedMetric> queryTmMetrics(Context ctx) 
throws Exception {
         try (var restClient = ctx.getRestClusterClient()) {
             // Unfortunately we cannot simply query for all metric names as 
Flink doesn't return
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
index 680a4135..d85b311e 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
@@ -23,14 +23,20 @@ import 
org.apache.flink.autoscaler.event.AutoScalerEventHandler;
 import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
 import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
 import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.resources.NoopResourceCheck;
+import org.apache.flink.autoscaler.resources.ResourceCheck;
 import org.apache.flink.autoscaler.state.AutoScalerStateStore;
 import org.apache.flink.autoscaler.utils.CalendarUtils;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Collections;
@@ -63,23 +69,22 @@ public class ScalingExecutor<KEY, Context extends 
JobAutoScalerContext<KEY>> {
     private final JobVertexScaler<KEY, Context> jobVertexScaler;
     private final AutoScalerEventHandler<KEY, Context> autoScalerEventHandler;
     private final AutoScalerStateStore<KEY, Context> autoScalerStateStore;
+    private final ResourceCheck resourceCheck;
 
     public ScalingExecutor(
             AutoScalerEventHandler<KEY, Context> autoScalerEventHandler,
             AutoScalerStateStore<KEY, Context> autoScalerStateStore) {
-        this(
-                new JobVertexScaler<>(autoScalerEventHandler),
-                autoScalerEventHandler,
-                autoScalerStateStore);
+        this(autoScalerEventHandler, autoScalerStateStore, null);
     }
 
     public ScalingExecutor(
-            JobVertexScaler<KEY, Context> jobVertexScaler,
             AutoScalerEventHandler<KEY, Context> autoScalerEventHandler,
-            AutoScalerStateStore<KEY, Context> autoScalerStateStore) {
-        this.jobVertexScaler = jobVertexScaler;
+            AutoScalerStateStore<KEY, Context> autoScalerStateStore,
+            @Nullable ResourceCheck resourceCheck) {
+        this.jobVertexScaler = new JobVertexScaler<>(autoScalerEventHandler);
         this.autoScalerEventHandler = autoScalerEventHandler;
         this.autoScalerStateStore = autoScalerStateStore;
+        this.resourceCheck = resourceCheck != null ? resourceCheck : new 
NoopResourceCheck();
     }
 
     public boolean scaleResource(
@@ -111,6 +116,10 @@ public class ScalingExecutor<KEY, Context extends 
JobAutoScalerContext<KEY>> {
             return false;
         }
 
+        if (scalingWouldExceedClusterResources(evaluatedMetrics, 
scalingSummaries, context)) {
+            return false;
+        }
+
         addToScalingHistoryAndStore(
                 autoScalerStateStore, context, scalingHistory, now, 
scalingSummaries);
 
@@ -248,6 +257,69 @@ public class ScalingExecutor<KEY, Context extends 
JobAutoScalerContext<KEY>> {
         return false;
     }
 
+    private boolean scalingWouldExceedClusterResources(
+            EvaluatedMetrics evaluatedMetrics,
+            Map<JobVertexID, ScalingSummary> scalingSummaries,
+            JobAutoScalerContext<?> ctx) {
+
+        final double taskManagerCpu = ctx.getTaskManagerCpu();
+        final MemorySize taskManagerMemory = ctx.getTaskManagerMemory();
+
+        if (taskManagerCpu <= 0
+                || taskManagerMemory == null
+                || taskManagerMemory.compareTo(MemorySize.ZERO) <= 0) {
+            // We can't extract the requirements, we can't make any assumptions
+            return false;
+        }
+
+        var globalMetrics = evaluatedMetrics.getGlobalMetrics();
+        if (!globalMetrics.containsKey(ScalingMetric.NUM_TASK_SLOTS_USED)) {
+            LOG.info("JM metrics not ready yet");
+            return true;
+        }
+
+        var vertexMetrics = evaluatedMetrics.getVertexMetrics();
+
+        int oldParallelismSum =
+                vertexMetrics.values().stream()
+                        .map(map -> (int) 
map.get(ScalingMetric.PARALLELISM).getCurrent())
+                        .reduce(0, Integer::sum);
+
+        Map<JobVertexID, Integer> newParallelisms = new HashMap<>();
+        for (Map.Entry<JobVertexID, Map<ScalingMetric, 
EvaluatedScalingMetric>> entry :
+                vertexMetrics.entrySet()) {
+            JobVertexID jobVertexID = entry.getKey();
+            ScalingSummary scalingSummary = scalingSummaries.get(jobVertexID);
+            if (scalingSummary != null) {
+                newParallelisms.put(jobVertexID, 
scalingSummary.getNewParallelism());
+            } else {
+                newParallelisms.put(
+                        jobVertexID,
+                        (int) 
entry.getValue().get(ScalingMetric.PARALLELISM).getCurrent());
+            }
+        }
+
+        double numTaskSlotsUsed = 
globalMetrics.get(ScalingMetric.NUM_TASK_SLOTS_USED).getCurrent();
+
+        final int numTaskSlotsAfterRescale;
+        if (oldParallelismSum >= numTaskSlotsUsed) {
+            // Slot sharing is (partially) deactivated,
+            // assuming no slot sharing in absence of additional data.
+            numTaskSlotsAfterRescale = 
newParallelisms.values().stream().reduce(0, Integer::sum);
+        } else {
+            // Slot sharing is activated
+            numTaskSlotsAfterRescale = 
newParallelisms.values().stream().reduce(0, Integer::max);
+        }
+
+        int taskSlotsPerTm = 
ctx.getConfiguration().get(TaskManagerOptions.NUM_TASK_SLOTS);
+
+        int currentNumTms = (int) Math.ceil(numTaskSlotsUsed / (double) 
taskSlotsPerTm);
+        int newNumTms = (int) Math.ceil(numTaskSlotsAfterRescale / (double) 
taskSlotsPerTm);
+
+        return !resourceCheck.trySchedule(
+                currentNumTms, newNumTms, taskManagerCpu, taskManagerMemory);
+    }
+
     private static Map<String, String> getVertexParallelismOverrides(
             Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> 
evaluatedMetrics,
             Map<JobVertexID, ScalingSummary> summaries) {
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
index f67fa4ea..8f31ecf0 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
@@ -40,6 +40,7 @@ import 
org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
 import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
 import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
 import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.Metric;
 import org.apache.flink.util.Preconditions;
 
 import lombok.SneakyThrows;
@@ -127,12 +128,18 @@ public abstract class ScalingMetricCollector<KEY, Context 
extends JobAutoScalerC
         // Aggregated job vertex metrics collected from Flink based on the 
filtered metric names
         var collectedVertexMetrics = queryAllAggregatedMetrics(ctx, 
filteredVertexMetricNames);
 
+        var collectedJmMetrics = queryJmMetrics(ctx);
         var collectedTmMetrics = queryTmMetrics(ctx);
 
         // The computed scaling metrics based on the collected aggregated 
vertex metrics
         var scalingMetrics =
                 convertToScalingMetrics(
-                        jobKey, collectedVertexMetrics, collectedTmMetrics, 
topology, conf);
+                        jobKey,
+                        collectedVertexMetrics,
+                        collectedJmMetrics,
+                        collectedTmMetrics,
+                        topology,
+                        conf);
 
         // Add scaling metrics to history if they were computed successfully
         metricHistory.put(now, scalingMetrics);
@@ -155,6 +162,8 @@ public abstract class ScalingMetricCollector<KEY, Context 
extends JobAutoScalerC
         return collectedMetrics;
     }
 
+    protected abstract Map<FlinkMetric, Metric> queryJmMetrics(Context ctx) 
throws Exception;
+
     protected abstract Map<FlinkMetric, AggregatedMetric> 
queryTmMetrics(Context ctx)
             throws Exception;
 
@@ -240,18 +249,13 @@ public abstract class ScalingMetricCollector<KEY, Context 
extends JobAutoScalerC
     }
 
     /**
-     * Given a map of collected Flink vertex metrics we compute the scaling 
metrics for each job
+     * Given a series of collected Flink vertex metrics we compute the scaling 
metrics for each job
      * vertex.
-     *
-     * @param jobKey KEY
-     * @param collectedMetrics Collected Flink metrics
-     * @param jobTopology Topology
-     * @param conf Configuration
-     * @return Collected metrics.
      */
     private CollectedMetrics convertToScalingMetrics(
             KEY jobKey,
             Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> 
collectedMetrics,
+            Map<FlinkMetric, Metric> collectedJmMetrics,
             Map<FlinkMetric, AggregatedMetric> collectedTmMetrics,
             JobTopology jobTopology,
             Configuration conf) {
@@ -314,7 +318,8 @@ public abstract class ScalingMetricCollector<KEY, Context 
extends JobAutoScalerC
         var outputRatios = 
ScalingMetrics.computeOutputRatios(collectedMetrics, jobTopology);
         LOG.debug("Output ratios: {}", outputRatios);
 
-        var globalMetrics = 
ScalingMetrics.computeGlobalMetrics(collectedTmMetrics);
+        var globalMetrics =
+                ScalingMetrics.computeGlobalMetrics(collectedJmMetrics, 
collectedTmMetrics);
         LOG.debug("Global metrics: {}", globalMetrics);
 
         return new CollectedMetrics(out, outputRatios, globalMetrics);
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
index e4c5f0d4..08700235 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
@@ -54,6 +54,7 @@ import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.HEAP_USAGE;
 import static org.apache.flink.autoscaler.metrics.ScalingMetric.LAG;
 import static org.apache.flink.autoscaler.metrics.ScalingMetric.LOAD;
 import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.NUM_TASK_SLOTS_USED;
 import static org.apache.flink.autoscaler.metrics.ScalingMetric.OBSERVED_TPR;
 import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
 import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
@@ -320,6 +321,11 @@ public class ScalingMetricEvaluator {
                 HEAP_USAGE,
                 new EvaluatedScalingMetric(
                         lastHeapUsage, getAverageGlobalMetric(HEAP_USAGE, 
metricHistory)));
+
+        out.put(
+                NUM_TASK_SLOTS_USED,
+                
EvaluatedScalingMetric.of(latest.getOrDefault(NUM_TASK_SLOTS_USED, 
Double.NaN)));
+
         return out;
     }
 
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
index f007c364..24ccdc98 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
@@ -43,7 +43,9 @@ public enum FlinkMetric {
 
     HEAP_MAX(s -> s.equals("Status.JVM.Memory.Heap.Max")),
     HEAP_USED(s -> s.equals("Status.JVM.Memory.Heap.Used")),
-    TOTAL_GC_TIME_PER_SEC(s -> 
s.equals("Status.JVM.GarbageCollector.All.TimeMsPerSecond"));
+    TOTAL_GC_TIME_PER_SEC(s -> 
s.equals("Status.JVM.GarbageCollector.All.TimeMsPerSecond")),
+    NUM_TASK_SLOTS_TOTAL(s -> s.equals("taskSlotsTotal")),
+    NUM_TASK_SLOTS_AVAILABLE(s -> s.equals("taskSlotsAvailable"));
 
     public static final Map<FlinkMetric, AggregatedMetric> FINISHED_METRICS =
             Map.of(
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
index 02250e75..1ab1bd3d 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
@@ -78,7 +78,9 @@ public enum ScalingMetric {
     GC_PRESSURE(false),
 
     /** Percentage of max heap used (between 0 and 1). */
-    HEAP_USAGE(true);
+    HEAP_USAGE(true),
+
+    NUM_TASK_SLOTS_USED(false);
 
     @Getter private final boolean calculateAverage;
 
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
index c1548f87..516e0046 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
@@ -23,6 +23,7 @@ import org.apache.flink.autoscaler.utils.AutoScalerUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import org.apache.flink.runtime.rest.messages.job.metrics.Metric;
 
 import org.apache.commons.math3.util.Precision;
 import org.slf4j.Logger;
@@ -161,6 +162,7 @@ public class ScalingMetrics {
     }
 
     public static Map<ScalingMetric, Double> computeGlobalMetrics(
+            Map<FlinkMetric, Metric> collectedJmMetrics,
             Map<FlinkMetric, AggregatedMetric> collectedTmMetrics) {
         if (collectedTmMetrics == null) {
             return null;
@@ -168,6 +170,20 @@ public class ScalingMetrics {
 
         var out = new HashMap<ScalingMetric, Double>();
 
+        try {
+            var numTotalTaskSlots =
+                    Double.valueOf(
+                            
collectedJmMetrics.get(FlinkMetric.NUM_TASK_SLOTS_TOTAL).getValue());
+            var numTaskSlotsAvailable =
+                    Double.valueOf(
+                            collectedJmMetrics
+                                    .get(FlinkMetric.NUM_TASK_SLOTS_AVAILABLE)
+                                    .getValue());
+            out.put(ScalingMetric.NUM_TASK_SLOTS_USED, numTotalTaskSlots - 
numTaskSlotsAvailable);
+        } catch (Exception e) {
+            LOG.debug("Slot metrics and registered task managers not 
available");
+        }
+
         var gcTime = collectedTmMetrics.get(FlinkMetric.TOTAL_GC_TIME_PER_SEC);
         if (gcTime != null) {
             out.put(ScalingMetric.GC_PRESSURE, gcTime.getMax() / 1000);
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/resources/NoopResourceCheck.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/resources/NoopResourceCheck.java
new file mode 100644
index 00000000..ed2fc356
--- /dev/null
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/resources/NoopResourceCheck.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.resources;
+
+import org.apache.flink.configuration.MemorySize;
+
+/** Noop implementation for checking for available resources. */
+public class NoopResourceCheck implements ResourceCheck {
+
+    @Override
+    public boolean trySchedule(
+            int currentInstances, int newInstances, double numCpu, MemorySize 
numMemory) {
+        return true;
+    }
+}
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/resources/ResourceCheck.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/resources/ResourceCheck.java
new file mode 100644
index 00000000..ec096036
--- /dev/null
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/resources/ResourceCheck.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.resources;
+
+import org.apache.flink.configuration.MemorySize;
+
+/** An interface for checking the available capacity of the underlying 
resources. */
+public interface ResourceCheck {
+
+    /**
+     * Simulates scheduling the provided number of TaskManager instances.
+     *
+     * @param currentInstances The current number of instances.
+     * @param newInstances The new number of instances.
+     * @param cpuPerInstance The number of CPU per instances.
+     * @param memoryPerInstance The total memory size per instances.
+     * @return true if a scheduling configuration was found, false otherwise.
+     */
+    boolean trySchedule(
+            int currentInstances,
+            int newInstances,
+            double cpuPerInstance,
+            MemorySize memoryPerInstance);
+}
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
index f87fbebe..46f85489 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.autoscaler.event.TestingEventCollector;
 import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
 import org.apache.flink.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
@@ -71,6 +72,8 @@ public class JobVertexScalerTest {
                         JobStatus.RUNNING,
                         conf,
                         new UnregisteredMetricsGroup(),
+                        0,
+                        MemorySize.ZERO,
                         null);
         restartTime = conf.get(AutoScalerOptions.RESTART_TIME);
     }
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
index 9f10e19f..cf866c01 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.autoscaler.state.AutoScalerStateStore;
 import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
 import org.apache.flink.autoscaler.topology.JobTopology;
 import org.apache.flink.autoscaler.topology.VertexInfo;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
 
@@ -251,6 +252,8 @@ public class RecommendedParallelismTest {
                         JobStatus.CREATED,
                         context.getConfiguration(),
                         context.getMetricGroup(),
+                        0,
+                        MemorySize.ZERO,
                         getRestClusterClientSupplier());
     }
 
@@ -263,6 +266,8 @@ public class RecommendedParallelismTest {
                         JobStatus.RUNNING,
                         context.getConfiguration(),
                         context.getMetricGroup(),
+                        0,
+                        MemorySize.ZERO,
                         getRestClusterClientSupplier());
     }
 
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java
index c16e1106..462fd21c 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java
@@ -22,9 +22,12 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.autoscaler.metrics.FlinkMetric;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import 
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.MessageParameters;
@@ -35,10 +38,12 @@ import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
 import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
 import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
 import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedTaskManagerMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.Metric;
 import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 
 import java.util.HashMap;
 import java.util.List;
@@ -110,6 +115,8 @@ public class RestApiMetricsCollectorTest {
                         JobStatus.RUNNING,
                         conf,
                         new UnregisteredMetricsGroup(),
+                        0,
+                        MemorySize.ZERO,
                         () -> restClusterClient);
 
         var jobVertexIDMapMap = collector.queryAllAggregatedMetrics(context, 
metrics);
@@ -122,6 +129,51 @@ public class RestApiMetricsCollectorTest {
         assertEquals(pendingRecordsMetric.getSum(), 200);
     }
 
+    @Test
+    @Timeout(60)
+    public void testJmMetricCollection() throws Exception {
+        try (MiniCluster miniCluster =
+                new MiniCluster(
+                        new MiniClusterConfiguration.Builder()
+                                .setNumTaskManagers(1)
+                                .setNumSlotsPerTaskManager(3)
+                                .build())) {
+            miniCluster.start();
+            var client =
+                    new RestClusterClient<>(
+                            new Configuration(),
+                            "cluster",
+                            (c, e) ->
+                                    new StandaloneClientHAServices(
+                                            
miniCluster.getRestAddress().get().toString()));
+            do {
+                var collector = new RestApiMetricsCollector<>();
+                Map<FlinkMetric, Metric> flinkMetricMetricMap =
+                        collector.queryJmMetrics(
+                                client,
+                                Map.of(
+                                        "taskSlotsTotal", 
FlinkMetric.NUM_TASK_SLOTS_TOTAL,
+                                        "taskSlotsAvailable",
+                                                
FlinkMetric.NUM_TASK_SLOTS_AVAILABLE));
+                try {
+                    assertEquals(
+                            "3",
+                            
flinkMetricMetricMap.get(FlinkMetric.NUM_TASK_SLOTS_TOTAL).getValue());
+                    assertEquals(
+                            "3",
+                            flinkMetricMetricMap
+                                    .get(FlinkMetric.NUM_TASK_SLOTS_AVAILABLE)
+                                    .getValue());
+                    break;
+                } catch (NullPointerException e) {
+                    // Metrics might not be available yet (timeout above will 
eventually kill this
+                    // test)
+                    Thread.sleep(100);
+                }
+            } while (true);
+        }
+    }
+
     @Test
     public void testTmMetricCollection() throws Exception {
 
@@ -176,6 +228,8 @@ public class RestApiMetricsCollectorTest {
                         JobStatus.RUNNING,
                         conf,
                         new UnregisteredMetricsGroup(),
+                        0,
+                        MemorySize.ZERO,
                         () -> client);
         var collector = new RestApiMetricsCollector<JobID, 
JobAutoScalerContext<JobID>>();
 
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
index a2f5c8fb..c3ef9296 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
@@ -23,9 +23,11 @@ import 
org.apache.flink.autoscaler.event.TestingEventCollector;
 import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
 import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
 import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.resources.ResourceCheck;
 import org.apache.flink.autoscaler.state.AutoScalerStateStore;
 import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import org.junit.jupiter.api.BeforeEach;
@@ -206,6 +208,54 @@ public class ScalingExecutorTest {
                         context, metrics, new HashMap<>(), new 
ScalingTracking(), now));
     }
 
+    @Test
+    public void testBlockScalingOnFailedResourceCheck() throws Exception {
+        var sourceHexString = "0bfd135746ac8efb3cce668b12e16d3a";
+        var source = JobVertexID.fromHexString(sourceHexString);
+        var sinkHexString = "a6b7102b8d3e3a9564998c1ffeb5e2b7";
+        var sink = JobVertexID.fromHexString(sinkHexString);
+        var now = Instant.now();
+        var metrics =
+                new EvaluatedMetrics(
+                        Map.of(source, evaluated(10, 100, 50), sink, 
evaluated(10, 100, 50)),
+                        Map.of(
+                                ScalingMetric.NUM_TASK_SLOTS_USED,
+                                EvaluatedScalingMetric.of(9),
+                                ScalingMetric.GC_PRESSURE,
+                                EvaluatedScalingMetric.of(Double.NaN),
+                                ScalingMetric.HEAP_USAGE,
+                                EvaluatedScalingMetric.of(Double.NaN)));
+
+        // Would normally scale without resource usage check
+        assertTrue(
+                scalingDecisionExecutor.scaleResource(
+                        context, metrics, new HashMap<>(), new 
ScalingTracking(), now));
+
+        scalingDecisionExecutor =
+                new ScalingExecutor<>(
+                        eventCollector,
+                        stateStore,
+                        new ResourceCheck() {
+                            @Override
+                            public boolean trySchedule(
+                                    int currentInstances,
+                                    int newInstances,
+                                    double cpuPerInstance,
+                                    MemorySize memoryPerInstance) {
+                                return false;
+                            }
+                        });
+
+        // Scaling blocked due to unavailable resources
+        assertFalse(
+                scalingDecisionExecutor.scaleResource(
+                        TestingAutoscalerUtils.createResourceAwareContext(),
+                        metrics,
+                        new HashMap<>(),
+                        new ScalingTracking(),
+                        now));
+    }
+
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     public void testScalingEventsWith0IntervalConfig(boolean scalingEnabled) 
throws Exception {
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
index 8202cf37..260687e0 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
@@ -579,6 +579,8 @@ public class ScalingMetricEvaluatorTest {
                         ScalingMetric.HEAP_USAGE,
                         EvaluatedScalingMetric.of(Double.NaN),
                         ScalingMetric.GC_PRESSURE,
+                        EvaluatedScalingMetric.of(Double.NaN),
+                        ScalingMetric.NUM_TASK_SLOTS_USED,
                         EvaluatedScalingMetric.of(Double.NaN)),
                 ScalingMetricEvaluator.evaluateGlobalMetrics(globalMetrics));
 
@@ -593,7 +595,9 @@ public class ScalingMetricEvaluatorTest {
                         ScalingMetric.HEAP_USAGE,
                         new EvaluatedScalingMetric(0.5, 0.5),
                         ScalingMetric.GC_PRESSURE,
-                        EvaluatedScalingMetric.of(0.6)),
+                        EvaluatedScalingMetric.of(0.6),
+                        ScalingMetric.NUM_TASK_SLOTS_USED,
+                        EvaluatedScalingMetric.of(Double.NaN)),
                 ScalingMetricEvaluator.evaluateGlobalMetrics(globalMetrics));
 
         globalMetrics.put(
@@ -601,13 +605,21 @@ public class ScalingMetricEvaluatorTest {
                 new CollectedMetrics(
                         Map.of(),
                         Map.of(),
-                        Map.of(ScalingMetric.HEAP_USAGE, 0.7, 
ScalingMetric.GC_PRESSURE, 0.8)));
+                        Map.of(
+                                ScalingMetric.HEAP_USAGE,
+                                0.7,
+                                ScalingMetric.GC_PRESSURE,
+                                0.8,
+                                ScalingMetric.NUM_TASK_SLOTS_USED,
+                                42.)));
         assertEquals(
                 Map.of(
                         ScalingMetric.HEAP_USAGE,
                         new EvaluatedScalingMetric(0.7, 0.6),
                         ScalingMetric.GC_PRESSURE,
-                        EvaluatedScalingMetric.of(0.8)),
+                        EvaluatedScalingMetric.of(0.8),
+                        ScalingMetric.NUM_TASK_SLOTS_USED,
+                        EvaluatedScalingMetric.of(42.)),
                 ScalingMetricEvaluator.evaluateGlobalMetrics(globalMetrics));
     }
 
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/TestingAutoscalerUtils.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/TestingAutoscalerUtils.java
index 4a47e9ae..163134d9 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/TestingAutoscalerUtils.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/TestingAutoscalerUtils.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
 import 
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
@@ -46,20 +47,34 @@ public class TestingAutoscalerUtils {
                 JobStatus.RUNNING,
                 new Configuration(),
                 metricGroup,
+                0,
+                MemorySize.ZERO,
                 getRestClusterClientSupplier());
     }
 
-    public static JobAutoScalerContext<JobID> 
createJobAutoScalerContext(JobStatus jobStatus) {
+    public static JobAutoScalerContext<JobID> createResourceAwareContext() {
+        JobID jobId = JobID.generate();
         MetricRegistry registry = NoOpMetricRegistry.INSTANCE;
         GenericMetricGroup metricGroup = new GenericMetricGroup(registry, 
null, "test");
-        final JobID jobID = new JobID();
-        return new JobAutoScalerContext<JobID>(
-                jobID,
-                jobID,
-                jobStatus,
+        return new JobAutoScalerContext<>(
+                jobId,
+                jobId,
+                JobStatus.RUNNING,
                 new Configuration(),
                 metricGroup,
-                getRestClusterClientSupplier());
+                0,
+                MemorySize.ZERO,
+                TestingAutoscalerUtils.getRestClusterClientSupplier()) {
+            @Override
+            public double getTaskManagerCpu() {
+                return 100;
+            }
+
+            @Override
+            public MemorySize getTaskManagerMemory() {
+                return MemorySize.parse("65536 bytes");
+            }
+        };
     }
 
     public static SupplierWithException<RestClusterClient<String>, Exception>
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/TestingMetricsCollector.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/TestingMetricsCollector.java
index 98b36119..1731bc64 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/TestingMetricsCollector.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/TestingMetricsCollector.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
 import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import org.apache.flink.runtime.rest.messages.job.metrics.Metric;
 
 import lombok.Setter;
 
@@ -77,6 +78,11 @@ public class TestingMetricsCollector<KEY, Context extends 
JobAutoScalerContext<K
         return metricNames.getOrDefault(jobVertexID, Collections.emptyList());
     }
 
+    @Override
+    protected Map<FlinkMetric, Metric> queryJmMetrics(Context ctx) throws 
Exception {
+        return Map.of();
+    }
+
     @Override
     protected Map<FlinkMetric, AggregatedMetric> queryTmMetrics(Context ctx) {
         return Map.of();
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
index 19107ea7..9fb8b8b9 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
@@ -482,13 +482,15 @@ public class ScalingMetricsTest {
 
     @Test
     public void testGlobalMetrics() {
-        assertEquals(Map.of(), ScalingMetrics.computeGlobalMetrics(Map.of()));
+        assertEquals(Map.of(), ScalingMetrics.computeGlobalMetrics(Map.of(), 
Map.of()));
         assertEquals(
                 Map.of(),
-                
ScalingMetrics.computeGlobalMetrics(Map.of(FlinkMetric.HEAP_USED, 
aggMax(100))));
+                ScalingMetrics.computeGlobalMetrics(
+                        Map.of(), Map.of(FlinkMetric.HEAP_USED, aggMax(100))));
         assertEquals(
                 Map.of(ScalingMetric.HEAP_USAGE, 0.5, 
ScalingMetric.GC_PRESSURE, 0.25),
                 ScalingMetrics.computeGlobalMetrics(
+                        Map.of(),
                         Map.of(
                                 FlinkMetric.HEAP_USED,
                                 aggMax(100),
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 842eb978..0ecd7c83 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -43,6 +43,7 @@ import 
org.apache.flink.kubernetes.operator.observer.deployment.FlinkDeploymentO
 import 
org.apache.flink.kubernetes.operator.observer.sessionjob.FlinkSessionJobObserver;
 import 
org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
 import 
org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler;
+import org.apache.flink.kubernetes.operator.resources.ClusterResourceManager;
 import 
org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
 import org.apache.flink.kubernetes.operator.utils.EnvUtils;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
@@ -165,7 +166,9 @@ public class FlinkOperator {
         var metricManager =
                 MetricManager.createFlinkDeploymentMetricManager(baseConfig, 
metricGroup);
         var statusRecorder = StatusRecorder.create(client, metricManager, 
listeners);
-        var autoscaler = AutoscalerFactory.create(client, eventRecorder);
+        var clusterResourceManager =
+                ClusterResourceManager.of(configManager.getDefaultConfig(), 
client);
+        var autoscaler = AutoscalerFactory.create(client, eventRecorder, 
clusterResourceManager);
         var reconcilerFactory = new ReconcilerFactory(eventRecorder, 
statusRecorder, autoscaler);
         var observerFactory = new 
FlinkDeploymentObserverFactory(eventRecorder);
         var canaryResourceManager = new 
CanaryResourceManager<FlinkDeployment>(configManager);
@@ -189,7 +192,7 @@ public class FlinkOperator {
         var metricManager =
                 MetricManager.createFlinkSessionJobMetricManager(baseConfig, 
metricGroup);
         var statusRecorder = StatusRecorder.create(client, metricManager, 
listeners);
-        var autoscaler = AutoscalerFactory.create(client, eventRecorder);
+        var autoscaler = AutoscalerFactory.create(client, eventRecorder, null);
         var reconciler = new SessionJobReconciler(eventRecorder, 
statusRecorder, autoscaler);
         var observer = new FlinkSessionJobObserver(eventRecorder);
         var canaryResourceManager = new 
CanaryResourceManager<FlinkSessionJob>(configManager);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactory.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactory.java
index dea6df0b..7bbb5492 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactory.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactory.java
@@ -24,6 +24,7 @@ import org.apache.flink.autoscaler.ScalingExecutor;
 import org.apache.flink.autoscaler.ScalingMetricEvaluator;
 import org.apache.flink.kubernetes.operator.autoscaler.state.ConfigMapStore;
 import 
org.apache.flink.kubernetes.operator.autoscaler.state.KubernetesAutoScalerStateStore;
+import org.apache.flink.kubernetes.operator.resources.ClusterResourceManager;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
@@ -33,7 +34,9 @@ import 
io.javaoperatorsdk.operator.processing.event.ResourceID;
 public class AutoscalerFactory {
 
     public static JobAutoScaler<ResourceID, KubernetesJobAutoScalerContext> 
create(
-            KubernetesClient client, EventRecorder eventRecorder) {
+            KubernetesClient client,
+            EventRecorder eventRecorder,
+            ClusterResourceManager clusterResourceManager) {
 
         var stateStore = new KubernetesAutoScalerStateStore(new 
ConfigMapStore(client));
         var eventHandler = new KubernetesAutoScalerEventHandler(eventRecorder);
@@ -41,7 +44,7 @@ public class AutoscalerFactory {
         return new JobAutoScalerImpl<>(
                 new RestApiMetricsCollector<>(),
                 new ScalingMetricEvaluator(),
-                new ScalingExecutor<>(eventHandler, stateStore),
+                new ScalingExecutor<>(eventHandler, stateStore, 
clusterResourceManager),
                 eventHandler,
                 new KubernetesScalingRealizer(),
                 stateStore);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContext.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContext.java
index 187a7118..45a11c3e 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContext.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContext.java
@@ -22,6 +22,9 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.autoscaler.JobAutoScalerContext;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
 import org.apache.flink.metrics.MetricGroup;
@@ -33,6 +36,8 @@ import lombok.Getter;
 
 import javax.annotation.Nullable;
 
+import java.util.Optional;
+
 /** An implementation of JobAutoscalerContext for Kubernetes. */
 public class KubernetesJobAutoScalerContext extends 
JobAutoScalerContext<ResourceID> {
 
@@ -51,6 +56,10 @@ public class KubernetesJobAutoScalerContext extends 
JobAutoScalerContext<Resourc
                 jobStatus,
                 configuration,
                 metricGroup,
+                
Optional.ofNullable(configuration.get(KubernetesConfigOptions.TASK_MANAGER_CPU))
+                        .orElse(0.),
+                
Optional.ofNullable(configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY))
+                        .orElse(MemorySize.ZERO),
                 restClientSupplier);
         this.resourceContext = resourceContext;
     }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index 629df78b..62e3ba3e 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -604,4 +604,12 @@ public class KubernetesOperatorConfigOptions {
                     .defaultValue(false)
                     .withDescription(
                             "Indicate whether the job should be drained when 
stopping with savepoint.");
+
+    @Documentation.Section(SECTION_ADVANCED)
+    public static final ConfigOption<Duration> REFRESH_CLUSTER_RESOURCE_VIEW =
+            operatorConfig("cluster.resource-view.refresh-interval")
+                    .durationType()
+                    .defaultValue(Duration.ofMinutes(-1))
+                    .withDescription(
+                            "How often to retrieve Kubernetes cluster resource 
usage information. This information is used to avoid running out of cluster 
resources when scaling up resources. Negative values disable the feature.");
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/resources/ClusterResourceManager.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/resources/ClusterResourceManager.java
new file mode 100644
index 00000000..605397d8
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/resources/ClusterResourceManager.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.resources;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.autoscaler.resources.ResourceCheck;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.util.Preconditions;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.Node;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.Quantity;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * A cluster resource manager which provides a view over the allocatable 
resources within a
+ * Kubernetes cluster and allows to simulate scheduling pods with a defined 
number of required
+ * resources.
+ *
+ * <p>The goal is to provide a good indicator for whether resources needed for 
autoscaling are going
+ * to be available. This is achieved by pulling the node resource usage from 
the Kubernetes cluster
+ * at a regular configurable interval, after which we use this data to 
simulate adding / removing
+ * resources (pods). Note that this is merely a (pretty good) heuristic 
because the Kubernetes
+ * scheduler has the final saying. However, we prevent 99% of the scenarios 
after pipeline outages
+ * which can lead to massive scale up where all pipelines may be scaled up at 
the same time and
+ * exhaust the number of available resources.
+ *
+ * <p>The simulation can run on a fixed set of Kubernetes nodes. Additionally, 
if we detect that the
+ * cluster is using the Kubernetes Cluster Autoscaler, we will use this data 
to extrapolate the
+ * number of nodes to the maximum defined nodes in the autoscaler 
configuration.
+ *
+ * <p>We currently track CPU and memory. Ephemeral storage is missing because 
there is no easy way
+ * to get node statics on free storage.
+ */
+public class ClusterResourceManager implements ResourceCheck {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ClusterResourceManager.class);
+
+    /** ConfigMap name of the Kubernetes Cluster Autoscaler. */
+    static final String CLUSTER_AUTOSCALER_CONFIG_MAP = 
"cluster-autoscaler-status";
+
+    /** EKS specific node group information. Code still works without this 
label. */
+    static final String LABEL_NODE_GROUP = "eks.amazonaws.com/nodegroup";
+
+    private final Duration refreshInterval;
+    private final KubernetesClient kubernetesClient;
+
+    @VisibleForTesting ClusterResourceView clusterResourceView;
+
+    public static ClusterResourceManager of(Configuration config, 
KubernetesClient client) {
+        return new ClusterResourceManager(
+                
config.get(KubernetesOperatorConfigOptions.REFRESH_CLUSTER_RESOURCE_VIEW), 
client);
+    }
+
+    public ClusterResourceManager(Duration refreshInterval, KubernetesClient 
kubernetesClient) {
+        this.refreshInterval = refreshInterval;
+        this.kubernetesClient = kubernetesClient;
+    }
+
+    @Override
+    public synchronized boolean trySchedule(
+            int currentInstances,
+            int newInstances,
+            double cpuPerInstance,
+            MemorySize memoryPerInstance) {
+
+        if (refreshInterval.isNegative()) {
+            // Feature disabled
+            return true;
+        }
+
+        if (shouldRefreshView(clusterResourceView, refreshInterval)) {
+            try {
+                clusterResourceView = createResourceView(kubernetesClient);
+            } catch (KubernetesClientException e) {
+                if (e.getCode() == 403) {
+                    LOG.warn(
+                            "No permission to retrieve node resource usage. 
Resource check disabled.");
+                    return true;
+                }
+                throw e;
+            }
+        }
+
+        return trySchedule(
+                clusterResourceView,
+                currentInstances,
+                newInstances,
+                cpuPerInstance,
+                memoryPerInstance);
+    }
+
+    /**
+     * Simple check whether the new resource requirements can be scheduled. 
Note: This is not a
+     * full-blown scheduler. We may return false-negatives, i.e. we may 
indicate scheduling is not
+     * possible when it actually is. This is still better than false positives 
where we suffer from
+     * downtime due to non-schedulable pods.
+     */
+    private static boolean trySchedule(
+            ClusterResourceView resourceView,
+            int currentInstances,
+            int newInstances,
+            double cpuPerInstance,
+            MemorySize memoryPerInstance) {
+
+        resourceView.cancelPending();
+
+        for (int i = 0; i < currentInstances; i++) {
+            resourceView.release(cpuPerInstance, memoryPerInstance);
+        }
+
+        for (int i = 0; i < newInstances; i++) {
+            if (!resourceView.tryReserve(cpuPerInstance, memoryPerInstance)) {
+                return false;
+            }
+        }
+
+        resourceView.commit();
+
+        return true;
+    }
+
+    private boolean shouldRefreshView(
+            ClusterResourceView clusterResourceView, Duration refreshInterval) 
{
+        return clusterResourceView == null
+                || Instant.now()
+                        
.isAfter(clusterResourceView.getCreationTime().plus(refreshInterval));
+    }
+
+    private static ClusterResourceView createResourceView(KubernetesClient 
kubernetesClient) {
+        List<KubernetesNodeResourceInfo> nodes = new ArrayList<>();
+
+        for (Node item : kubernetesClient.nodes().list().getItems()) {
+
+            String nodeName = item.getMetadata().getName();
+            String nodeGroup = 
item.getMetadata().getLabels().get(LABEL_NODE_GROUP);
+
+            Map<String, Quantity> usage =
+                    
kubernetesClient.top().nodes().metrics(nodeName).getUsage();
+            Map<String, Quantity> allocatable = 
item.getStatus().getAllocatable();
+
+            KubernetesResource cpuInfo = getResourceInfo("cpu", allocatable, 
usage);
+            KubernetesResource memInfo = getResourceInfo("memory", 
allocatable, usage);
+            // Ephemeral storage is currently missing because there is no easy 
way to get node
+            // statics on free storage.
+
+            nodes.add(new KubernetesNodeResourceInfo(nodeName, nodeGroup, 
cpuInfo, memInfo));
+        }
+
+        try {
+            addClusterAutoscalableNodes(kubernetesClient, nodes);
+        } catch (ClusterAutoscalerUnavailableException e) {
+            LOG.info("No cluster autoscaler information available: {}", 
e.getMessage());
+        }
+
+        return new ClusterResourceView(nodes);
+    }
+
+    private static void addClusterAutoscalableNodes(
+            KubernetesClient kubernetesClient, 
List<KubernetesNodeResourceInfo> nodes)
+            throws ClusterAutoscalerUnavailableException {
+        Map<String, Integer> nodeGroupsWithMaxSize = 
getMaxClusterSizeByNodeGroup(kubernetesClient);
+
+        Map<String, List<KubernetesNodeResourceInfo>> 
nodeGroupsWithCurrentSize = new HashMap<>();
+        for (KubernetesNodeResourceInfo node : nodes) {
+            List<KubernetesNodeResourceInfo> nodeGroupInfos =
+                    nodeGroupsWithCurrentSize.computeIfAbsent(
+                            node.getNodeGroup(), key -> new ArrayList<>());
+            nodeGroupInfos.add(node);
+        }
+
+        // Add the extra number of nodes which can be added via cluster 
autoscaling
+        for (Map.Entry<String, Integer> entry : 
nodeGroupsWithMaxSize.entrySet()) {
+            String nodeGroup = entry.getKey();
+            int nodeGroupMaxSize = entry.getValue();
+            List<KubernetesNodeResourceInfo> nodeGroupNodes =
+                    nodeGroupsWithCurrentSize.get(nodeGroup);
+            if (nodeGroupNodes == null) {
+                // Node group not autoscaled
+                continue;
+            }
+            int nodeGroupCurrentSize = nodeGroupNodes.size();
+
+            for (int i = nodeGroupCurrentSize; i < nodeGroupMaxSize; i++) {
+                KubernetesNodeResourceInfo exmplaryNode = 
nodeGroupNodes.get(0);
+                nodes.add(
+                        new KubernetesNodeResourceInfo(
+                                "future-node-" + i,
+                                exmplaryNode.getNodeGroup(),
+                                new 
KubernetesResource(exmplaryNode.getCpu().getAllocatable(), 0),
+                                new KubernetesResource(
+                                        
exmplaryNode.getMemory().getAllocatable(), 0)));
+            }
+        }
+    }
+
+    private static Map<String, Integer> getMaxClusterSizeByNodeGroup(
+            KubernetesClient kubernetesClient) throws 
ClusterAutoscalerUnavailableException {
+
+        ConfigMap configMap = new ConfigMap();
+        ObjectMeta metadata = new ObjectMeta();
+        metadata.setName(CLUSTER_AUTOSCALER_CONFIG_MAP);
+        metadata.setNamespace("kube-system");
+        configMap.setMetadata(metadata);
+        configMap = kubernetesClient.configMaps().resource(configMap).get();
+        if (configMap == null) {
+            LOG.info("ConfigMap {} not found", CLUSTER_AUTOSCALER_CONFIG_MAP);
+            throw new ClusterAutoscalerUnavailableException(
+                    "ConfigMap not found " + CLUSTER_AUTOSCALER_CONFIG_MAP);
+        }
+        String status = configMap.getData().get("status");
+        if (status == null) {
+            throw new RuntimeException(
+                    "status field not found in " + 
CLUSTER_AUTOSCALER_CONFIG_MAP);
+        }
+
+        Matcher matcher = 
Pattern.compile("Name:\\s*(\\S+)\n.*\n.*maxSize=(\\d+)").matcher(status);
+        Map<String, Integer> nodeGroupsBySize = new HashMap<>();
+        while (matcher.find()) {
+            String nodeGroupName = matcher.group(1);
+            int numNodes = Integer.parseInt(matcher.group(2));
+            Integer existingValue = nodeGroupsBySize.put(nodeGroupName, 
numNodes);
+            LOG.debug("Extracted nodeGroup {} maxSize: {}", nodeGroupName, 
nodeGroupsBySize);
+            Preconditions.checkState(
+                    existingValue == null, "NodeGroup %s found twice", 
nodeGroupName);
+        }
+
+        if (nodeGroupsBySize.isEmpty()) {
+            throw new RuntimeException("Cluster size could not be determined");
+        }
+
+        return nodeGroupsBySize;
+    }
+
+    @VisibleForTesting
+    void refresh() {
+        clusterResourceView = null;
+    }
+
+    private static KubernetesResource getResourceInfo(
+            String type, Map<String, Quantity> allocatableMap, Map<String, 
Quantity> usageMap) {
+        Quantity allocatableQuantity = 
Preconditions.checkNotNull(allocatableMap.get(type));
+        Quantity usageQuantity = 
Preconditions.checkNotNull(usageMap.get(type));
+
+        double allocatable = 
allocatableQuantity.getNumericalAmount().doubleValue();
+        double usage = usageQuantity.getNumericalAmount().doubleValue();
+        return new KubernetesResource(allocatable, usage);
+    }
+
+    private static class ClusterAutoscalerUnavailableException extends 
Exception {
+        public ClusterAutoscalerUnavailableException(String message) {
+            super(message);
+        }
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/resources/ClusterResourceView.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/resources/ClusterResourceView.java
new file mode 100644
index 00000000..9b623ae4
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/resources/ClusterResourceView.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.resources;
+
+import org.apache.flink.configuration.MemorySize;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+import java.time.Instant;
+import java.util.List;
+
+/** A view on Kubernetes cluster resources by nodes and their cpu/memory. */
+@RequiredArgsConstructor
+@Getter
+public class ClusterResourceView {
+
+    private final Instant creationTime = Instant.now();
+
+    private final List<KubernetesNodeResourceInfo> nodes;
+
+    public boolean tryReserve(double cpu, MemorySize memory) {
+        for (KubernetesNodeResourceInfo node : nodes) {
+            if (node.tryReserve(cpu, memory)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public void release(double cpu, MemorySize memory) {
+        for (KubernetesNodeResourceInfo node : nodes) {
+            if (node.tryRelease(cpu, memory.getBytes())) {
+                return;
+            }
+        }
+    }
+
+    public void commit() {
+        for (KubernetesNodeResourceInfo node : nodes) {
+            node.commitPending();
+        }
+    }
+
+    public void cancelPending() {
+        for (KubernetesNodeResourceInfo node : nodes) {
+            node.resetPending();
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "ClusterResourceView{" + "creationTime=" + creationTime + ", 
nodes=" + nodes + '}';
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/resources/KubernetesNodeResourceInfo.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/resources/KubernetesNodeResourceInfo.java
new file mode 100644
index 00000000..86ede0b8
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/resources/KubernetesNodeResourceInfo.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.resources;
+
+import org.apache.flink.configuration.MemorySize;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+/** A single Kubernetes node and its resources (cpu / memory). */
+@AllArgsConstructor
+@Getter
+public class KubernetesNodeResourceInfo {
+    private String name;
+    private String nodeGroup;
+    private KubernetesResource cpu;
+    private KubernetesResource memory;
+
+    public boolean tryReserve(double cpuAmount, MemorySize memoryAmount) {
+        if (cpu.getFree() >= cpuAmount && memory.getFree() >= 
memoryAmount.getBytes()) {
+            cpu.setPending(cpu.getPending() + cpuAmount);
+            memory.setPending(memory.getPending() + memoryAmount.getBytes());
+            return true;
+        }
+        return false;
+    }
+
+    public void commitPending() {
+        cpu.commitPending();
+        memory.commitPending();
+    }
+
+    public void resetPending() {
+        cpu.setPending(0);
+        memory.setPending(0);
+    }
+
+    public boolean tryRelease(double cpuAmount, double memoryAmount) {
+        if (cpu.getUsed() >= cpuAmount && memory.getUsed() >= memoryAmount) {
+            cpu.release(cpuAmount);
+            memory.release(memoryAmount);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return "KubernetesNodeResourceInfo{"
+                + "name='"
+                + name
+                + '\''
+                + ", cpu="
+                + cpu
+                + ", memory="
+                + memory
+                + '}';
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/resources/KubernetesResource.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/resources/KubernetesResource.java
new file mode 100644
index 00000000..82f1db94
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/resources/KubernetesResource.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.resources;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+
+/** A Kubernetes resource and its current allocation. */
+@RequiredArgsConstructor
+@Getter
+public class KubernetesResource {
+
+    /** Allocatable as per Kubernetes cluster API. */
+    private final double allocatable;
+
+    /** Used as per Kubernetes cluster API. */
+    private final double used;
+
+    /** Reserved via the corresponding Setter. */
+    @Getter private double reserved;
+
+    /** Pending reservation which is not yet committed. */
+    @Getter @Setter private double pending;
+
+    public void commitPending() {
+        this.reserved += pending;
+        this.pending = 0;
+    }
+
+    public void release(double amount) {
+        this.pending -= Math.min(amount, getUsed());
+    }
+
+    public double getFree() {
+        return allocatable - getUsed();
+    }
+
+    public double getUsed() {
+        return used + reserved + pending;
+    }
+
+    @Override
+    public String toString() {
+        return "KubernetesResource{"
+                + "allocatable="
+                + allocatable
+                + ", used="
+                + used
+                + ", reserved="
+                + reserved
+                + ", pending="
+                + pending
+                + '}';
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactoryTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactoryTest.java
index 55110bce..8eccca5a 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactoryTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactoryTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.autoscaler.JobAutoScaler;
 import org.apache.flink.autoscaler.JobAutoScalerImpl;
+import org.apache.flink.kubernetes.operator.resources.ClusterResourceManager;
 import org.apache.flink.kubernetes.operator.utils.EventCollector;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 
@@ -29,6 +30,8 @@ import lombok.Getter;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.time.Duration;
+
 /** Test loading the default autoscaling implementation from the classpath. */
 @EnableKubernetesMockClient(crud = true)
 public class AutoscalerFactoryTest {
@@ -38,7 +41,10 @@ public class AutoscalerFactoryTest {
     @Test
     void testLoadDefaultImplementation() {
         JobAutoScaler<ResourceID, KubernetesJobAutoScalerContext> autoScaler =
-                AutoscalerFactory.create(kubernetesClient, new 
EventRecorder(new EventCollector()));
+                AutoscalerFactory.create(
+                        kubernetesClient,
+                        new EventRecorder(new EventCollector()),
+                        new ClusterResourceManager(Duration.ZERO, 
kubernetesClient));
         Assertions.assertTrue(autoScaler instanceof JobAutoScalerImpl);
     }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContextTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContextTest.java
new file mode 100644
index 00000000..09d19299
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContextTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentContext;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Tests for {@link KubernetesJobAutoScalerContext}. */
+public class KubernetesJobAutoScalerContextTest {
+
+    @Test
+    void test() {
+        Configuration configuration = new Configuration();
+        configuration.set(KubernetesConfigOptions.TASK_MANAGER_CPU, 23.);
+        configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.parse("1024mb"));
+
+        var context =
+                new KubernetesJobAutoScalerContext(
+                        JobID.generate(),
+                        JobStatus.RUNNING,
+                        configuration,
+                        new UnregisteredMetricsGroup(),
+                        () -> new RestClusterClient<>(new Configuration(), 
"test-cluster"),
+                        new FlinkDeploymentContext(
+                                new FlinkDeployment(),
+                                new TestUtils.TestingContext<>() {
+                                    @Override
+                                    public KubernetesClient getClient() {
+                                        return null;
+                                    }
+                                },
+                                null,
+                                new FlinkConfigManager(new Configuration()),
+                                null));
+
+        assertThat(context.getTaskManagerCpu()).isEqualTo(23.);
+        
assertThat(context.getTaskManagerMemory()).isEqualTo(MemorySize.parse("1024mb"));
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
index 3d241b6c..8056a0ee 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.kubernetes.operator.metrics.MetricManager;
 import 
org.apache.flink.kubernetes.operator.observer.deployment.FlinkDeploymentObserverFactory;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import 
org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
+import org.apache.flink.kubernetes.operator.resources.ClusterResourceManager;
 import org.apache.flink.kubernetes.operator.utils.EventCollector;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
@@ -52,6 +53,7 @@ import 
io.javaoperatorsdk.operator.processing.event.source.EventSource;
 import lombok.Getter;
 import org.junit.jupiter.api.Assertions;
 
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Queue;
@@ -95,7 +97,10 @@ public class TestingFlinkDeploymentController
                         eventRecorder,
                         statusRecorder,
                         AutoscalerFactory.create(
-                                flinkService.getKubernetesClient(), 
eventRecorder));
+                                flinkService.getKubernetesClient(),
+                                eventRecorder,
+                                new ClusterResourceManager(
+                                        Duration.ZERO, 
flinkService.getKubernetesClient())));
         canaryResourceManager = new CanaryResourceManager<>(configManager);
         flinkDeploymentController =
                 new FlinkDeploymentController(
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/resources/ClusterResourceManagerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/resources/ClusterResourceManagerTest.java
new file mode 100644
index 00000000..9639105c
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/resources/ClusterResourceManagerTest.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.resources;
+
+import org.apache.flink.configuration.MemorySize;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.Node;
+import io.fabric8.kubernetes.api.model.NodeStatus;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.Quantity;
+import io.fabric8.kubernetes.api.model.metrics.v1beta1.NodeMetrics;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.time.Duration;
+import java.util.Map;
+
+import static 
org.apache.flink.kubernetes.operator.resources.ClusterResourceManager.CLUSTER_AUTOSCALER_CONFIG_MAP;
+import static 
org.apache.flink.kubernetes.operator.resources.ClusterResourceManager.LABEL_NODE_GROUP;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Tests for {@link ClusterResourceManager}. */
+@EnableKubernetesMockClient(crud = true)
+public class ClusterResourceManagerTest {
+
+    KubernetesClient kubernetesClient;
+
+    ClusterResourceManager manager;
+
+    @BeforeEach
+    void beforeEach() {
+        manager = new ClusterResourceManager(Duration.ofHours(1), 
kubernetesClient);
+    }
+
+    @Test
+    void testNoResources() {
+        var currentInstances = 0;
+        var newInstances = 1;
+        var cpu = 0.5;
+        var memory = MemorySize.parse("128 mb");
+
+        // Cluster is at size zero and no autoscaling information is available
+        assertThat(manager.trySchedule(currentInstances, newInstances, cpu, 
memory)).isFalse();
+    }
+
+    @Test
+    void testFixedAmountOfResources() {
+        var currentInstances = 0;
+        var newInstances = 1;
+        var cpu = 0.5;
+        var memory = MemorySize.parse("128 mb");
+
+        createNodes(newInstances, cpu, memory);
+
+        // Free capacity just fits the requested resources
+        assertThat(manager.trySchedule(currentInstances, newInstances, cpu, 
memory)).isTrue();
+        // No further capacity
+        assertThat(manager.trySchedule(currentInstances, newInstances, cpu, 
memory)).isFalse();
+        // But we can schedule if we free resources first
+        assertThat(manager.trySchedule(newInstances, newInstances, cpu, 
memory)).isTrue();
+    }
+
+    @Test
+    void testMoreResourcesAvailable() {
+        var currentInstances = 0;
+        var newInstances = 10;
+        var cpu = 1;
+        var memory = MemorySize.parse("1024 mb");
+
+        createNodes(1, cpu, memory);
+        assertThat(manager.trySchedule(currentInstances, newInstances, cpu, 
memory)).isFalse();
+
+        createClusterAutoscalerConfigMap();
+        manager.refresh();
+        assertThat(manager.trySchedule(currentInstances, newInstances, cpu, 
memory)).isTrue();
+    }
+
+    @Test
+    void testAllOrNothing() {
+        var currentInstances = 0;
+        var newInstances = 1;
+        var cpu = 1;
+        var memory = MemorySize.parse("1024 mb");
+
+        createNodes(newInstances, cpu, memory);
+
+        assertThat(manager.trySchedule(currentInstances, newInstances * 2, 
cpu, memory)).isFalse();
+        assertThat(manager.trySchedule(currentInstances, newInstances, cpu, 
memory)).isTrue();
+        assertThat(manager.trySchedule(currentInstances, newInstances, cpu, 
memory)).isFalse();
+    }
+
+    @Test
+    void testCaching() {
+        var currentInstances = 0;
+        var newInstances = 1;
+        var cpu = 1;
+        var memory = MemorySize.parse("1024 mb");
+
+        createNodes(newInstances, cpu, memory);
+
+        assertThat(manager.clusterResourceView).isNull();
+        assertThat(manager.trySchedule(currentInstances, newInstances, cpu, 
memory)).isTrue();
+        assertThat(manager.clusterResourceView).isNotNull();
+
+        var resourceViewBackup = manager.clusterResourceView;
+        assertThat(manager.trySchedule(currentInstances, newInstances, cpu, 
memory)).isFalse();
+        assertThat(manager.clusterResourceView).isSameAs(resourceViewBackup);
+    }
+
+    @Test
+    void testRefresh() {
+        // Refresh every time
+        manager = new ClusterResourceManager(Duration.ZERO, kubernetesClient);
+
+        assertThat(manager.clusterResourceView).isNull();
+        assertThat(manager.trySchedule(0, 1, 1, MemorySize.parse("128 
mb"))).isFalse();
+        assertThat(manager.clusterResourceView).isNotNull();
+
+        var resourceViewBackup = manager.clusterResourceView;
+        assertThat(manager.trySchedule(0, 1, 1, MemorySize.parse("128 
mb"))).isFalse();
+        
assertThat(manager.clusterResourceView).isNotSameAs(resourceViewBackup);
+    }
+
+    @Test
+    void testDisabled() {
+        manager = new ClusterResourceManager(Duration.ofSeconds(-1), 
kubernetesClient);
+        assertThat(
+                        manager.trySchedule(
+                                0, Integer.MAX_VALUE, Double.MAX_VALUE, 
MemorySize.MAX_VALUE))
+                .isTrue();
+    }
+
+    private void createNodes(int numNodes, double cpuPerNode, MemorySize 
memPerNode) {
+        for (int i = 1; i <= numNodes; i++) {
+            createNode("node" + i, cpuPerNode, memPerNode);
+        }
+    }
+
+    private void createNode(String name, double cpuPerNode, MemorySize 
memPerNode) {
+        Node node = new Node();
+        node.setMetadata(new ObjectMeta());
+        node.getMetadata().setName(name);
+        node.getMetadata().setLabels(Map.of(LABEL_NODE_GROUP, "node-group-2"));
+
+        node.setStatus(new NodeStatus());
+        node.getStatus().setAllocatable(createResourceMap(cpuPerNode, 
memPerNode));
+        kubernetesClient.resource(node).create();
+
+        NodeMetrics nodeMetrics = new NodeMetrics();
+        nodeMetrics.setMetadata(new ObjectMeta());
+        nodeMetrics.getMetadata().setName(name);
+        nodeMetrics.setUsage(createResourceMap(0, MemorySize.ZERO));
+        kubernetesClient.resource(nodeMetrics).create();
+    }
+
+    private void createClusterAutoscalerConfigMap() {
+        ConfigMap configMap = new ConfigMap();
+        configMap.setMetadata(new ObjectMeta());
+        configMap.getMetadata().setName(CLUSTER_AUTOSCALER_CONFIG_MAP);
+        configMap.getMetadata().setNamespace("kube-system");
+        configMap.setData(Map.of("status", CLUSTER_AUTOSCALING_STATUS));
+
+        kubernetesClient.resource(configMap).create();
+    }
+
+    private static Map<String, Quantity> createResourceMap(double cpu, 
MemorySize memory) {
+        return Map.of(
+                "cpu",
+                Quantity.fromNumericalAmount(BigDecimal.valueOf(cpu), null),
+                "memory",
+                
Quantity.fromNumericalAmount(BigDecimal.valueOf(memory.getBytes()), null));
+    }
+
+    static final String CLUSTER_AUTOSCALING_STATUS =
+            "    Cluster-autoscaler status at 2024-01-05 14:42:56.660050258 
+0000 UTC:\n"
+                    + "    Cluster-wide:\n"
+                    + "      Health:      Healthy (ready=82 unready=0 
notStarted=0 longNotStarted=0 registered=82 longUnregistered=0)\n"
+                    + "                   LastProbeTime:      2024-01-05 
14:42:56.011239739 +0000 UTC m=+1081500.500509416\n"
+                    + "                   LastTransitionTime: 2023-12-24 
02:19:08.392928745 +0000 UTC m=+72.882198411\n"
+                    + "      ScaleUp:     NoActivity (ready=82 
registered=82)\n"
+                    + "                   LastProbeTime:      2024-01-05 
14:42:56.011239739 +0000 UTC m=+1081500.500509416\n"
+                    + "                   LastTransitionTime: 2024-01-05 
04:54:28.692544653 +0000 UTC m=+1046193.181814402\n"
+                    + "      ScaleDown:   NoCandidates (candidates=0)\n"
+                    + "                   LastProbeTime:      2024-01-05 
14:42:56.011239739 +0000 UTC m=+1081500.500509416\n"
+                    + "                   LastTransitionTime: 2024-01-05 
07:49:23.659004365 +0000 UTC m=+1056688.148274040\n"
+                    + "\n"
+                    + "    NodeGroups:\n"
+                    + "      Name:        node-group-1\n"
+                    + "      Health:      Healthy (ready=27 unready=0 
notStarted=0 longNotStarted=0 registered=27 longUnregistered=0 
cloudProviderTarget=27\n"
+                    + "(minSize=10, maxSize=100))\n"
+                    + "                   LastProbeTime:      2024-01-05 
14:42:56.011239739 +0000 UTC m=+1081500.500509416\n"
+                    + "                   LastTransitionTime: 2023-12-24 
02:19:08.392928745 +0000 UTC m=+72.882198411\n"
+                    + "      ScaleUp:     NoActivity (ready=27 
cloudProviderTarget=27)\n"
+                    + "                   LastProbeTime:      2024-01-05 
14:42:56.011239739 +0000 UTC m=+1081500.500509416\n"
+                    + "                   LastTransitionTime: 2023-12-24 
02:19:08.392928745 +0000 UTC m=+72.882198411\n"
+                    + "      ScaleDown:   NoCandidates (candidates=0)\n"
+                    + "                   LastProbeTime:      2024-01-05 
14:42:56.011239739 +0000 UTC m=+1081500.500509416\n"
+                    + "                   LastTransitionTime: 2024-01-03 
19:34:56.195698291 +0000 UTC m=+926220.684967979\n"
+                    + "\n"
+                    + "      Name:        node-group-2\n"
+                    + "      Health:      Healthy (ready=30 unready=0 
notStarted=0 longNotStarted=0 registered=30 longUnregistered=0 
cloudProviderTarget=30\n"
+                    + "(minSize=10, maxSize=100))\n"
+                    + "                   LastProbeTime:      2024-01-05 
14:42:56.011239739 +0000 UTC m=+1081500.500509416\n"
+                    + "                   LastTransitionTime: 2023-12-24 
02:19:08.392928745 +0000 UTC m=+72.882198411\n"
+                    + "      ScaleUp:     NoActivity (ready=30 
cloudProviderTarget=30)\n"
+                    + "                   LastProbeTime:      2024-01-05 
14:42:56.011239739 +0000 UTC m=+1081500.500509416\n"
+                    + "                   LastTransitionTime: 2023-12-24 
02:19:08.392928745 +0000 UTC m=+72.882198411\n"
+                    + "      ScaleDown:   NoCandidates (candidates=0)\n"
+                    + "                   LastProbeTime:      2024-01-05 
14:42:56.011239739 +0000 UTC m=+1081500.500509416\n"
+                    + "                   LastTransitionTime: 2024-01-03 
19:39:59.613210811 +0000 UTC m=+926524.102480488\n"
+                    + "\n"
+                    + "      Name:        node-group-3\n"
+                    + "      Health:      Healthy (ready=25 unready=0 
notStarted=0 longNotStarted=0 registered=25 longUnregistered=0 
cloudProviderTarget=25\n"
+                    + "(minSize=1, maxSize=100))\n"
+                    + "                   LastProbeTime:      2024-01-05 
14:42:56.011239739 +0000 UTC m=+1081500.500509416\n"
+                    + "                   LastTransitionTime: 2023-12-24 
02:19:08.392928745 +0000 UTC m=+72.882198411\n"
+                    + "      ScaleUp:     NoActivity (ready=25 
cloudProviderTarget=25)\n"
+                    + "                   LastProbeTime:      2024-01-05 
14:42:56.011239739 +0000 UTC m=+1081500.500509416\n"
+                    + "                   LastTransitionTime: 2024-01-05 
04:54:28.692544653 +0000 UTC m=+1046193.181814402\n"
+                    + "      ScaleDown:   NoCandidates (candidates=0)\n"
+                    + "                   LastProbeTime:      2024-01-05 
14:42:56.011239739 +0000 UTC m=+1081500.500509416\n"
+                    + "                   LastTransitionTime: 2024-01-05 
07:49:23.659004365 +0000 UTC m=+1056688.148274040";
+}
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/resources/ClusterResourceViewTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/resources/ClusterResourceViewTest.java
new file mode 100644
index 00000000..c2d98203
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/resources/ClusterResourceViewTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.resources;
+
+import org.apache.flink.configuration.MemorySize;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Tests for {@link ClusterResourceView}. */
+public class ClusterResourceViewTest {
+
+    @Test
+    void testStagingAndCommitting() {
+        var nodes =
+                List.of(
+                        new KubernetesNodeResourceInfo(
+                                "node1",
+                                "node-group-1",
+                                new KubernetesResource(11, 1),
+                                new KubernetesResource(256, 128)));
+
+        var clusterResourceView = new ClusterResourceView(nodes);
+
+        var cpu = 10;
+        var mem = MemorySize.parse("128 bytes");
+
+        assertThat(clusterResourceView.tryReserve(cpu, mem)).isTrue();
+        assertThat(clusterResourceView.tryReserve(cpu, mem)).isFalse();
+
+        clusterResourceView.cancelPending();
+
+        assertThat(clusterResourceView.tryReserve(cpu, mem)).isTrue();
+
+        clusterResourceView.commit();
+
+        assertThat(clusterResourceView.tryReserve(cpu, mem)).isFalse();
+
+        clusterResourceView.release(cpu, mem);
+        assertThat(clusterResourceView.tryReserve(cpu, mem)).isTrue();
+        clusterResourceView.cancelPending();
+        clusterResourceView.release(cpu, mem);
+        assertThat(clusterResourceView.tryReserve(cpu, mem)).isTrue();
+
+        clusterResourceView.commit();
+
+        assertThat(clusterResourceView.tryReserve(cpu, mem)).isFalse();
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/resources/KubernetesResourceTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/resources/KubernetesResourceTest.java
new file mode 100644
index 00000000..ceee2c41
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/resources/KubernetesResourceTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.resources;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Tests for {@link KubernetesResource}. */
+public class KubernetesResourceTest {
+
+    @Test
+    void testFreeCapacity() {
+        assertThat(new KubernetesResource(100, 0).getFree()).isEqualTo(100);
+        assertThat(new KubernetesResource(100, 0).getUsed()).isEqualTo(0);
+        assertThat(new KubernetesResource(100, 1).getFree()).isEqualTo(99);
+        assertThat(new KubernetesResource(100, 1).getUsed()).isEqualTo(1);
+
+        KubernetesResource kubernetesResource = new KubernetesResource(100, 1);
+        kubernetesResource.setPending(1);
+        assertThat(kubernetesResource.getFree()).isEqualTo(98);
+        assertThat(kubernetesResource.getUsed()).isEqualTo(2);
+    }
+
+    @Test
+    void testCommit() {
+        KubernetesResource kubernetesResource = new KubernetesResource(100, 1);
+        kubernetesResource.setPending(1);
+        assertThat(kubernetesResource.getFree()).isEqualTo(98);
+
+        kubernetesResource.commitPending();
+        assertThat(kubernetesResource.getFree()).isEqualTo(98);
+
+        kubernetesResource.setPending(1);
+        assertThat(kubernetesResource.getFree()).isEqualTo(97);
+    }
+
+    @Test
+    void testRelease() {
+        KubernetesResource kubernetesResource = new KubernetesResource(100, 1);
+        assertThat(kubernetesResource.getUsed()).isEqualTo(1);
+
+        kubernetesResource.release(1);
+        assertThat(kubernetesResource.getUsed()).isEqualTo(0);
+
+        kubernetesResource.release(1);
+        assertThat(kubernetesResource.getUsed()).isEqualTo(0);
+    }
+}

Reply via email to