gyfora commented on code in PR #751:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/751#discussion_r1449992691
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java:
##########
@@ -165,7 +167,12 @@ void registerDeploymentController() {
var metricManager =
MetricManager.createFlinkDeploymentMetricManager(baseConfig,
metricGroup);
var statusRecorder = StatusRecorder.create(client, metricManager,
listeners);
- var autoscaler = AutoscalerFactory.create(client, eventRecorder);
+ Duration refreshClusterViewInterval =
+ configManager
+ .getDefaultConfig()
+
.get(KubernetesOperatorConfigOptions.REFRESH_CLUSTER_RESOURCE_VIEW);
+ var clusterResourceManager = new
ClusterResourceManager(refreshClusterViewInterval, client);
Review Comment:
We should add a method to create the `ClusterResourceManager` and we should
probably have a config to disable it by default. This is only applicable in
certain Kubernetes envs which are actually resource constrained only by node
capacity and not by other quotas
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java:
##########
@@ -165,7 +167,12 @@ void registerDeploymentController() {
var metricManager =
MetricManager.createFlinkDeploymentMetricManager(baseConfig,
metricGroup);
var statusRecorder = StatusRecorder.create(client, metricManager,
listeners);
- var autoscaler = AutoscalerFactory.create(client, eventRecorder);
+ Duration refreshClusterViewInterval =
Review Comment:
Can we use `var` for consistency?
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/resources/ClusterResourceManager.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.resources;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.autoscaler.resources.ResourceCheck;
+import org.apache.flink.util.Preconditions;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.Node;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.Quantity;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * A cluster resource manager which provides a view over the allocatable
resources within a
+ * Kubernetes cluster and allows to simulate scheduling pods with a defined
number of required
+ * resources.
+ *
+ * <p>The goal is to provide a good indicator for whether resources needed for
autoscaling are going
+ * to be available. This is achieved by pulling the node resource usage from
the Kubernetes cluster
+ * at a regular configurable interval, after which we use this data to
simulate adding / removing
+ * resources (pods). Note that this is merely a (pretty good) heuristic
because the Kubernetes
+ * scheduler has the final saying. However, we prevent 99% of the scenarios
after pipeline outages
+ * which can lead to massive scale up where all pipelines may be scaled up at
the same time and
+ * exhaust the number of available resources.
+ *
+ * <p>The simulation can run on a fixed set of Kubernetes nodes. Additionally,
if we detect that the
+ * cluster is using the Kubernetes Cluster Autoscaler, we will use this data
to extrapolate the
+ * number of nodes to the maximum defined nodes in the autoscaler
configuration.
+ *
+ * <p>We currently track CPU and memory. Ephemeral storage is missing because
there is no easy way
+ * to get node statics on free storage.
+ */
+public class ClusterResourceManager implements ResourceCheck {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ClusterResourceManager.class);
+
+ /** ConfigMap name of the Kubernetes Cluster Autoscaler. */
+ static final String CLUSTER_AUTOSCALER_CONFIG_MAP =
"cluster-autoscaler-status";
+
+ /** EKS specific node group information. Code still works without this
label. */
+ static final String LABEL_NODE_GROUP = "eks.amazonaws.com/nodegroup";
+
+ private final Duration refreshInterval;
+ private final KubernetesClient kubernetesClient;
+
+ @VisibleForTesting ClusterResourceView clusterResourceView;
+
+ public ClusterResourceManager(Duration refreshInterval, KubernetesClient
kubernetesClient) {
+ this.refreshInterval = refreshInterval;
+ this.kubernetesClient = kubernetesClient;
+ }
+
+ @Override
+ public synchronized boolean trySchedule(
+ int currentInstances,
+ int newInstances,
+ double cpuPerInstance,
+ double memoryPerInstance) {
+
+ if (refreshInterval.isNegative()) {
+ // Feature disabled
+ return true;
+ }
+
+ if (shouldRefreshView(clusterResourceView, refreshInterval)) {
+ try {
+ clusterResourceView = createResourceView(kubernetesClient);
+ } catch (KubernetesClientException e) {
+ if (e.getCode() == 403) {
+ LOG.warn(
+ "No permission to retrieve node resource usage.
Resource check disabled.");
+ return true;
+ }
+ throw e;
+ }
+ }
+
+ if (newInstances <= currentInstances) {
+ LOG.debug("Fewer or same amount of resources used after scaling.");
+ return true;
+ }
Review Comment:
This should be moved in front of everything
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##########
@@ -247,6 +254,69 @@ private boolean isJobUnderMemoryPressure(
return false;
}
+ private boolean scalingWouldExceedClusterResources(
+ EvaluatedMetrics evaluatedMetrics,
+ Map<JobVertexID, ScalingSummary> scalingSummaries,
+ Context ctx) {
+
+ final double taskManagerCpu = ctx.getTaskManagerCpu();
+ final double taskManagerMemory = ctx.getTaskManagerMemory();
+
+ if (taskManagerCpu <= 0 || taskManagerMemory <= 0) {
+ // We can't extract the requirements, we can't make any assumptions
+ return false;
+ }
+
+ var globalMetrics = evaluatedMetrics.getGlobalMetrics();
+ if (!(globalMetrics.containsKey(ScalingMetric.NUM_TASK_MANAGERS)
+ &&
globalMetrics.containsKey(ScalingMetric.NUM_TOTAL_TASK_SLOTS)
+ &&
globalMetrics.containsKey(ScalingMetric.NUM_TASK_SLOTS_USED))) {
+ LOG.info("JM metrics not ready yet");
+ return true;
+ }
+
+ var vertexMetrics = evaluatedMetrics.getVertexMetrics();
+
+ int oldParallelismSum =
+ vertexMetrics.values().stream()
+ .map(map -> (int)
map.get(ScalingMetric.PARALLELISM).getCurrent())
+ .reduce(0, Integer::sum);
+
+ Map<JobVertexID, Integer> newParallelisms = new HashMap<>();
+ for (Map.Entry<JobVertexID, Map<ScalingMetric,
EvaluatedScalingMetric>> entry :
+ vertexMetrics.entrySet()) {
+ JobVertexID jobVertexID = entry.getKey();
+ ScalingSummary scalingSummary = scalingSummaries.get(jobVertexID);
+ if (scalingSummary != null) {
+ newParallelisms.put(jobVertexID,
scalingSummary.getNewParallelism());
+ } else {
+ newParallelisms.put(
+ jobVertexID,
+ (int)
entry.getValue().get(ScalingMetric.PARALLELISM).getCurrent());
+ }
+ }
+
+ double numTaskSlotsUsed =
globalMetrics.get(ScalingMetric.NUM_TASK_SLOTS_USED).getCurrent();
+
+ final int numTaskSlotsAfterRescale;
+ if (oldParallelismSum == numTaskSlotsUsed) {
+ // Slot sharing activated
+ numTaskSlotsAfterRescale =
newParallelisms.values().stream().reduce(0, Integer::sum);
Review Comment:
It could be `oldParallelism >= numTaskSlotsUsed`
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##########
@@ -247,6 +254,69 @@ private boolean isJobUnderMemoryPressure(
return false;
}
+ private boolean scalingWouldExceedClusterResources(
+ EvaluatedMetrics evaluatedMetrics,
+ Map<JobVertexID, ScalingSummary> scalingSummaries,
+ Context ctx) {
+
+ final double taskManagerCpu = ctx.getTaskManagerCpu();
+ final double taskManagerMemory = ctx.getTaskManagerMemory();
+
+ if (taskManagerCpu <= 0 || taskManagerMemory <= 0) {
+ // We can't extract the requirements, we can't make any assumptions
+ return false;
+ }
+
+ var globalMetrics = evaluatedMetrics.getGlobalMetrics();
+ if (!(globalMetrics.containsKey(ScalingMetric.NUM_TASK_MANAGERS)
+ &&
globalMetrics.containsKey(ScalingMetric.NUM_TOTAL_TASK_SLOTS)
+ &&
globalMetrics.containsKey(ScalingMetric.NUM_TASK_SLOTS_USED))) {
+ LOG.info("JM metrics not ready yet");
+ return true;
+ }
+
+ var vertexMetrics = evaluatedMetrics.getVertexMetrics();
+
+ int oldParallelismSum =
+ vertexMetrics.values().stream()
+ .map(map -> (int)
map.get(ScalingMetric.PARALLELISM).getCurrent())
+ .reduce(0, Integer::sum);
+
+ Map<JobVertexID, Integer> newParallelisms = new HashMap<>();
+ for (Map.Entry<JobVertexID, Map<ScalingMetric,
EvaluatedScalingMetric>> entry :
+ vertexMetrics.entrySet()) {
+ JobVertexID jobVertexID = entry.getKey();
+ ScalingSummary scalingSummary = scalingSummaries.get(jobVertexID);
+ if (scalingSummary != null) {
+ newParallelisms.put(jobVertexID,
scalingSummary.getNewParallelism());
+ } else {
+ newParallelisms.put(
+ jobVertexID,
+ (int)
entry.getValue().get(ScalingMetric.PARALLELISM).getCurrent());
+ }
+ }
+
+ double numTaskSlotsUsed =
globalMetrics.get(ScalingMetric.NUM_TASK_SLOTS_USED).getCurrent();
+
+ final int numTaskSlotsAfterRescale;
+ if (oldParallelismSum == numTaskSlotsUsed) {
+ // Slot sharing activated
+ numTaskSlotsAfterRescale =
newParallelisms.values().stream().reduce(0, Integer::sum);
+ } else {
+ // Assuming slot sharing is not activated
+ numTaskSlotsAfterRescale =
newParallelisms.values().stream().reduce(0, Integer::max);
+ }
+
+ var numTotalTaskSlots =
globalMetrics.get(ScalingMetric.NUM_TOTAL_TASK_SLOTS).getCurrent();
+ int currentNumTms = (int)
globalMetrics.get(ScalingMetric.NUM_TASK_MANAGERS).getCurrent();
+
+ var numSlotsPerTm = numTotalTaskSlots / currentNumTms;
Review Comment:
We could also get this directly from the config, then we only need to query
the slots used metrics.
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/resources/ClusterResourceManager.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.resources;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.autoscaler.resources.ResourceCheck;
+import org.apache.flink.util.Preconditions;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.Node;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.Quantity;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * A cluster resource manager which provides a view over the allocatable
resources within a
+ * Kubernetes cluster and allows to simulate scheduling pods with a defined
number of required
+ * resources.
+ *
+ * <p>The goal is to provide a good indicator for whether resources needed for
autoscaling are going
+ * to be available. This is achieved by pulling the node resource usage from
the Kubernetes cluster
+ * at a regular configurable interval, after which we use this data to
simulate adding / removing
+ * resources (pods). Note that this is merely a (pretty good) heuristic
because the Kubernetes
+ * scheduler has the final saying. However, we prevent 99% of the scenarios
after pipeline outages
+ * which can lead to massive scale up where all pipelines may be scaled up at
the same time and
+ * exhaust the number of available resources.
+ *
+ * <p>The simulation can run on a fixed set of Kubernetes nodes. Additionally,
if we detect that the
+ * cluster is using the Kubernetes Cluster Autoscaler, we will use this data
to extrapolate the
+ * number of nodes to the maximum defined nodes in the autoscaler
configuration.
+ *
+ * <p>We currently track CPU and memory. Ephemeral storage is missing because
there is no easy way
+ * to get node statics on free storage.
+ */
+public class ClusterResourceManager implements ResourceCheck {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ClusterResourceManager.class);
+
+ /** ConfigMap name of the Kubernetes Cluster Autoscaler. */
+ static final String CLUSTER_AUTOSCALER_CONFIG_MAP =
"cluster-autoscaler-status";
+
+ /** EKS specific node group information. Code still works without this
label. */
+ static final String LABEL_NODE_GROUP = "eks.amazonaws.com/nodegroup";
+
+ private final Duration refreshInterval;
+ private final KubernetesClient kubernetesClient;
+
+ @VisibleForTesting ClusterResourceView clusterResourceView;
+
+ public ClusterResourceManager(Duration refreshInterval, KubernetesClient
kubernetesClient) {
+ this.refreshInterval = refreshInterval;
+ this.kubernetesClient = kubernetesClient;
+ }
+
+ @Override
+ public synchronized boolean trySchedule(
+ int currentInstances,
+ int newInstances,
+ double cpuPerInstance,
+ double memoryPerInstance) {
+
+ if (refreshInterval.isNegative()) {
+ // Feature disabled
+ return true;
+ }
+
+ if (shouldRefreshView(clusterResourceView, refreshInterval)) {
+ try {
+ clusterResourceView = createResourceView(kubernetesClient);
+ } catch (KubernetesClientException e) {
+ if (e.getCode() == 403) {
+ LOG.warn(
+ "No permission to retrieve node resource usage.
Resource check disabled.");
+ return true;
Review Comment:
Should we set a field and not try again and again?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]