This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 1fca621ce157 [SPARK-55432][K8S] Support built-in K8s
`ExecutorResizePlugin`
1fca621ce157 is described below
commit 1fca621ce157d8507f38b248cbeac222b231a7a0
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Mon Feb 9 03:04:49 2026 -0800
[SPARK-55432][K8S] Support built-in K8s `ExecutorResizePlugin`
### What changes were proposed in this pull request?
This PR aims to support built-in K8s executor in-place vertical scale
plugin, `ExecutorResizePlugin`.
### Why are the changes needed?
Since `In-Place Pod Resize` graduates to `Stable` at K8s v1.35+, Apache
Spark users can use the features with the built-in plugin.
-
https://kubernetes.io/blog/2025/12/19/kubernetes-v1-35-in-place-pod-resize-ga/
-
https://kubernetes.io/docs/tasks/configure-pod-container/resize-container-resources/
### Does this PR introduce _any_ user-facing change?
No because this built-in plugin is not enabled by default.
### How was this patch tested?
Pass the CIs with the newly add test suite.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: `Gemini 3 Pro (High)` on `Antigravity`
Closes #54215 from dongjoon-hyun/SPARK-55432.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../scala/org/apache/spark/deploy/k8s/Config.scala | 24 ++
.../cluster/k8s/ExecutorResizePlugin.scala | 171 ++++++++++++
.../cluster/k8s/ExecutorResizePluginSuite.scala | 290 +++++++++++++++++++++
3 files changed, 485 insertions(+)
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 3f131874fc5e..011b05f6542a 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -238,6 +238,30 @@ private[spark] object Config extends Logging {
.checkValue(_ >= 0, "The minimum number of tasks should be
non-negative.")
.createWithDefault(0)
+ val EXECUTOR_RESIZE_INTERVAL =
+ ConfigBuilder("spark.kubernetes.executor.resizeInterval")
+ .doc("Interval between executor resize operations. To disable, set 0
(default)")
+ .version("4.2.0")
+ .timeConf(TimeUnit.SECONDS)
+ .checkValue(_ >= 0, "Interval should be non-negative")
+ .createWithDefault(0)
+
+ val EXECUTOR_RESIZE_THRESHOLD =
+ ConfigBuilder("spark.kubernetes.executor.resizeThreshold")
+ .doc("The threshold to resize.")
+ .version("4.2.0")
+ .doubleConf
+ .checkValue(v => 0 < v && v < 1, "The threshold should be in (0, 1)")
+ .createWithDefault(0.9)
+
+ val EXECUTOR_RESIZE_FACTOR =
+ ConfigBuilder("spark.kubernetes.executor.resizeFactor")
+ .doc("The factor to resize.")
+ .version("4.2.0")
+ .doubleConf
+ .checkValue(v => 0 < v && v <= 1, "The factor should be in (0, 1]")
+ .createWithDefault(0.1)
+
val KUBERNETES_AUTH_DRIVER_CONF_PREFIX =
"spark.kubernetes.authenticate.driver"
val KUBERNETES_AUTH_EXECUTOR_CONF_PREFIX =
"spark.kubernetes.authenticate.executor"
val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
"spark.kubernetes.authenticate.driver.mounted"
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorResizePlugin.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorResizePlugin.scala
new file mode 100644
index 000000000000..77fd37dd538b
--- /dev/null
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorResizePlugin.scala
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.{Map => JMap}
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
+import scala.jdk.CollectionConverters._
+
+import io.fabric8.kubernetes.api.model.{PodBuilder, Quantity}
+import io.fabric8.kubernetes.client.KubernetesClient
+import io.fabric8.kubernetes.client.dsl.base.PatchContext
+import io.fabric8.kubernetes.client.dsl.base.PatchType
+
+import org.apache.spark.SparkContext
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin,
PluginContext, SparkPlugin}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.LogKeys.{EXECUTOR_ID, MEMORY_SIZE}
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * Spark plugin to monitor executor pod memory usage and increase the memory
limit
+ * if the usage exceeds a threshold.
+ */
+class ExecutorResizePlugin extends SparkPlugin {
+ override def driverPlugin(): DriverPlugin = new ExecutorResizeDriverPlugin()
+
+ override def executorPlugin(): ExecutorPlugin = null
+}
+
+class ExecutorResizeDriverPlugin extends DriverPlugin with Logging {
+ private var sparkContext: SparkContext = _
+ private var kubernetesClient: KubernetesClient = _
+
+ private val periodicService: ScheduledExecutorService =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("executor-resize-plugin")
+
+ override def init(sc: SparkContext, ctx: PluginContext): JMap[String,
String] = {
+ val interval = Utils.timeStringAsSeconds(
+ sc.conf.get(EXECUTOR_RESIZE_INTERVAL.key, "1m"))
+ val threshold = sc.conf.getDouble(EXECUTOR_RESIZE_THRESHOLD.key, 0.9)
+ val factor = sc.conf.getDouble(EXECUTOR_RESIZE_FACTOR.key, 0.1)
+ val namespace = sc.conf.get(KUBERNETES_NAMESPACE)
+
+ sparkContext = sc
+
+ try {
+ kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient(
+ sc.conf.get(KUBERNETES_DRIVER_MASTER_URL),
+ Option(namespace),
+ KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX,
+ SparkKubernetesClientFactory.ClientType.Driver,
+ sc.conf,
+ None)
+
+ periodicService.scheduleAtFixedRate(() => {
+ try {
+ checkAndIncreaseMemory(namespace, threshold, factor)
+ } catch {
+ case e: Throwable => logError("Error in memory check thread", e)
+ }
+ }, interval, interval, TimeUnit.SECONDS)
+ } catch {
+ case e: Exception =>
+ logError("Failed to initialize", e)
+ }
+
+ Map.empty[String, String].asJava
+ }
+
+ override def shutdown(): Unit = {
+ periodicService.shutdown()
+ if (kubernetesClient != null) {
+ kubernetesClient.close()
+ }
+ }
+
+ private def checkAndIncreaseMemory(namespace: String, threshold: Double,
factor: Double): Unit = {
+ val appId = sparkContext.applicationId
+
+ // Get all running executor pods for this application
+ val pods = kubernetesClient.pods()
+ .inNamespace(namespace)
+ .withLabel(SPARK_APP_ID_LABEL, appId)
+ .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+ .list()
+ .getItems.asScala
+
+ pods.filter(_.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL) !=
null).foreach { pod =>
+ val execId = pod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL)
+ try {
+ val metrics = kubernetesClient.top().pods().metrics(namespace,
pod.getMetadata.getName)
+
+ val containerMetrics = metrics.getContainers.asScala
+ .find(_.getName == DEFAULT_EXECUTOR_CONTAINER_NAME)
+ .orElse(metrics.getContainers.asScala.headOption)
+
+ containerMetrics.filter(_.getUsage.get("memory") != null).foreach { cm
=>
+ val usageQuantity = cm.getUsage.get("memory")
+ val usage = Quantity.getAmountInBytes(usageQuantity).longValue()
+
+ // Identify the Spark container in the Pod spec to get the limit
+ val container = pod.getSpec.getContainers.asScala
+ .find(_.getName == DEFAULT_EXECUTOR_CONTAINER_NAME)
+ .orElse(pod.getSpec.getContainers.asScala.headOption)
+
+ container.filter(c => c.getResources.getLimits != null &&
+ c.getResources.getLimits.containsKey("memory")).foreach { c =>
+ val limit =
Quantity.getAmountInBytes(c.getResources.getLimits.get("memory"))
+ .longValue()
+ if (usage > limit * threshold) {
+ val newLimit = (limit * (1.0 + factor)).toLong
+ val newQuantity = new Quantity(newLimit.toString)
+
+ logInfo(log"Increase executor ${MDC(EXECUTOR_ID, execId)}
container memory " +
+ log"from ${MDC(MEMORY_SIZE, limit)} to ${MDC(MEMORY_SIZE,
newLimit)} " +
+ log"as usage ${MDC(MEMORY_SIZE, usage)} exceeded threshold.")
+
+ // Patch the pod to update both memory request and limit
+ try {
+ kubernetesClient.pods()
+ .inNamespace(namespace)
+ .withName(pod.getMetadata.getName)
+ .subresource("resize")
+ .patch(PatchContext.of(PatchType.STRATEGIC_MERGE), new
PodBuilder()
+ .withNewMetadata()
+ .endMetadata()
+ .withNewSpec()
+ .addNewContainer()
+ .withName(c.getName)
+ .withNewResources()
+ .addToLimits("memory", newQuantity)
+ .addToRequests("memory", newQuantity)
+ .endResources()
+ .endContainer()
+ .endSpec()
+ .build())
+ } catch {
+ case e: Throwable =>
+ logInfo(log"Failed to update ${MDC(EXECUTOR_ID, execId)}", e)
+ }
+ } else {
+ logDebug(log"Executor ${MDC(EXECUTOR_ID, execId)} limit " +
+ log"${MDC(MEMORY_SIZE, limit)}, usage ${MDC(MEMORY_SIZE,
usage)}")
+ }
+ }
+ }
+ } catch {
+ case e: Throwable =>
+ logDebug(log"Failed to handle ${MDC(EXECUTOR_ID, execId)}", e)
+ }
+ }
+ }
+}
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorResizePluginSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorResizePluginSuite.scala
new file mode 100644
index 000000000000..649ccd8a945d
--- /dev/null
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorResizePluginSuite.scala
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.Collections
+
+import scala.jdk.CollectionConverters._
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.api.model.metrics.v1beta1.{ContainerMetrics,
PodMetrics}
+import io.fabric8.kubernetes.client.KubernetesClient
+import io.fabric8.kubernetes.client.dsl.{MetricAPIGroupDSL, PodMetricOperation}
+import org.mockito.ArgumentMatchers.{any, anyString}
+import org.mockito.Mockito.{mock, never, times, verify, when}
+import org.scalatest.BeforeAndAfter
+import org.scalatest.PrivateMethodTester
+
+import org.apache.spark.{SparkContext, SparkFunSuite}
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.Fabric8Aliases._
+
+class ExecutorResizePluginSuite
+ extends SparkFunSuite with BeforeAndAfter with PrivateMethodTester {
+
+ private val namespace = "test-namespace"
+ private val appId = "spark-test-app"
+
+ private var kubernetesClient: KubernetesClient = _
+ private var sparkContext: SparkContext = _
+ private var podOperations: PODS = _
+ private var podsWithNamespace: PODS_WITH_NAMESPACE = _
+ private var labeledPods: LABELED_PODS = _
+ private var podList: PodList = _
+ private var topOperations: MetricAPIGroupDSL = _
+ private var podMetricOperations: PodMetricOperation = _
+
+ private val _checkAndIncreaseMemory =
+ PrivateMethod[Unit](Symbol("checkAndIncreaseMemory"))
+
+ before {
+ kubernetesClient = mock(classOf[KubernetesClient])
+ sparkContext = mock(classOf[SparkContext])
+ podOperations = mock(classOf[PODS])
+ podsWithNamespace = mock(classOf[PODS_WITH_NAMESPACE])
+ labeledPods = mock(classOf[LABELED_PODS])
+ podList = mock(classOf[PodList])
+ topOperations = mock(classOf[MetricAPIGroupDSL])
+ podMetricOperations = mock(classOf[PodMetricOperation])
+
+ when(sparkContext.applicationId).thenReturn(appId)
+ when(kubernetesClient.pods()).thenReturn(podOperations)
+ when(podOperations.inNamespace(namespace)).thenReturn(podsWithNamespace)
+ when(podsWithNamespace.withLabel(SPARK_APP_ID_LABEL,
appId)).thenReturn(labeledPods)
+ when(labeledPods.withLabel(SPARK_ROLE_LABEL,
SPARK_POD_EXECUTOR_ROLE)).thenReturn(labeledPods)
+ when(labeledPods.list()).thenReturn(podList)
+ when(kubernetesClient.top()).thenReturn(topOperations)
+ when(topOperations.pods()).thenReturn(podMetricOperations)
+ }
+
+ private def createPlugin(): ExecutorResizeDriverPlugin = {
+ val plugin = new ExecutorResizeDriverPlugin()
+ // Use reflection to set private fields
+ val scField = plugin.getClass.getDeclaredField("sparkContext")
+ scField.setAccessible(true)
+ scField.set(plugin, sparkContext)
+
+ val clientField = plugin.getClass.getDeclaredField("kubernetesClient")
+ clientField.setAccessible(true)
+ clientField.set(plugin, kubernetesClient)
+
+ plugin
+ }
+
+ private def createPodWithMemoryLimit(
+ executorId: Long,
+ memoryLimit: String,
+ containerName: String = DEFAULT_EXECUTOR_CONTAINER_NAME): Pod = {
+ new PodBuilder()
+ .withNewMetadata()
+ .withName(s"spark-executor-$executorId")
+ .addToLabels(SPARK_APP_ID_LABEL, appId)
+ .addToLabels(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+ .addToLabels(SPARK_EXECUTOR_ID_LABEL, executorId.toString)
+ .endMetadata()
+ .withNewSpec()
+ .addNewContainer()
+ .withName(containerName)
+ .withNewResources()
+ .addToLimits("memory", new Quantity(memoryLimit))
+ .endResources()
+ .endContainer()
+ .endSpec()
+ .build()
+ }
+
+ private def createPodMetrics(
+ podName: String,
+ memoryUsage: String,
+ containerName: String = DEFAULT_EXECUTOR_CONTAINER_NAME): PodMetrics = {
+ val containerMetrics = new ContainerMetrics()
+ containerMetrics.setName(containerName)
+ containerMetrics.setUsage(Map("memory" -> new
Quantity(memoryUsage)).asJava)
+
+ val podMetrics = new PodMetrics()
+ podMetrics.setContainers(Collections.singletonList(containerMetrics))
+ podMetrics
+ }
+
+ test("Empty pod list should not trigger any action") {
+ val plugin = createPlugin()
+ when(podList.getItems).thenReturn(Collections.emptyList())
+
+ plugin.invokePrivate(_checkAndIncreaseMemory(namespace, 0.9, 0.1))
+
+ verify(podMetricOperations, never()).metrics(anyString(), anyString())
+ }
+
+ test("Pod without executor ID label should be skipped") {
+ val plugin = createPlugin()
+ val pod = new PodBuilder()
+ .withNewMetadata()
+ .withName("spark-executor-1")
+ .addToLabels(SPARK_APP_ID_LABEL, appId)
+ .addToLabels(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+ // No SPARK_EXECUTOR_ID_LABEL
+ .endMetadata()
+ .build()
+
+ when(podList.getItems).thenReturn(Collections.singletonList(pod))
+
+ plugin.invokePrivate(_checkAndIncreaseMemory(namespace, 0.9, 0.1))
+
+ verify(podMetricOperations, never()).metrics(anyString(), anyString())
+ }
+
+ test("Memory usage below threshold should not trigger resize") {
+ val plugin = createPlugin()
+ val pod = createPodWithMemoryLimit(1, "1000000000") // 1GB limit
+ val metrics = createPodMetrics("spark-executor-1", "500000000") // 500MB
usage (50%)
+
+ when(podList.getItems).thenReturn(Collections.singletonList(pod))
+ when(podMetricOperations.metrics(namespace,
"spark-executor-1")).thenReturn(metrics)
+
+ val podResource = mock(classOf[SINGLE_POD])
+
when(podsWithNamespace.withName("spark-executor-1")).thenReturn(podResource)
+
+ plugin.invokePrivate(_checkAndIncreaseMemory(namespace, 0.9, 0.1))
+
+ verify(podResource, never()).patch(any(), any(classOf[Pod]))
+ }
+
+ test("Memory usage above threshold should trigger resize") {
+ val plugin = createPlugin()
+ val pod = createPodWithMemoryLimit(1, "1000000000") // 1GB limit
+ val metrics = createPodMetrics("spark-executor-1", "950000000") // 950MB
usage (95%)
+
+ when(podList.getItems).thenReturn(Collections.singletonList(pod))
+ when(podMetricOperations.metrics(namespace,
"spark-executor-1")).thenReturn(metrics)
+
+ val podResource = mock(classOf[SINGLE_POD])
+
when(podsWithNamespace.withName("spark-executor-1")).thenReturn(podResource)
+ when(podResource.subresource(anyString())).thenReturn(podResource)
+
+ plugin.invokePrivate(_checkAndIncreaseMemory(namespace, 0.9, 0.1))
+
+ verify(podResource, times(1)).patch(any(), any(classOf[Pod]))
+ }
+
+ test("Memory usage exactly at threshold should not trigger resize") {
+ val plugin = createPlugin()
+ val pod = createPodWithMemoryLimit(1, "1000000000") // 1GB limit
+ val metrics = createPodMetrics("spark-executor-1", "900000000") // 900MB
usage (90%)
+
+ when(podList.getItems).thenReturn(Collections.singletonList(pod))
+ when(podMetricOperations.metrics(namespace,
"spark-executor-1")).thenReturn(metrics)
+
+ val podResource = mock(classOf[SINGLE_POD])
+
when(podsWithNamespace.withName("spark-executor-1")).thenReturn(podResource)
+
+ plugin.invokePrivate(_checkAndIncreaseMemory(namespace, 0.9, 0.1))
+
+ verify(podResource, never()).patch(any(), any(classOf[Pod]))
+ }
+
+ test("Pod without memory limit should be skipped") {
+ val plugin = createPlugin()
+ val pod = new PodBuilder()
+ .withNewMetadata()
+ .withName("spark-executor-1")
+ .addToLabels(SPARK_APP_ID_LABEL, appId)
+ .addToLabels(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+ .addToLabels(SPARK_EXECUTOR_ID_LABEL, "1")
+ .endMetadata()
+ .withNewSpec()
+ .addNewContainer()
+ .withName(DEFAULT_EXECUTOR_CONTAINER_NAME)
+ .withNewResources()
+ // No memory limit
+ .endResources()
+ .endContainer()
+ .endSpec()
+ .build()
+
+ val metrics = createPodMetrics("spark-executor-1", "500000000")
+
+ when(podList.getItems).thenReturn(Collections.singletonList(pod))
+ when(podMetricOperations.metrics(namespace,
"spark-executor-1")).thenReturn(metrics)
+
+ val podResource = mock(classOf[SINGLE_POD])
+
when(podsWithNamespace.withName("spark-executor-1")).thenReturn(podResource)
+
+ plugin.invokePrivate(_checkAndIncreaseMemory(namespace, 0.9, 0.1))
+
+ verify(podResource, never()).patch(any(), any(classOf[Pod]))
+ }
+
+ test("Multiple pods with mixed memory usage") {
+ val plugin = createPlugin()
+ val pod1 = createPodWithMemoryLimit(1, "1000000000") // 1GB limit
+ val pod2 = createPodWithMemoryLimit(2, "1000000000") // 1GB limit
+
+ val metrics1 = createPodMetrics("spark-executor-1", "500000000") // 50% -
below threshold
+ val metrics2 = createPodMetrics("spark-executor-2", "950000000") // 95% -
above threshold
+
+ when(podList.getItems).thenReturn(List(pod1, pod2).asJava)
+ when(podMetricOperations.metrics(namespace,
"spark-executor-1")).thenReturn(metrics1)
+ when(podMetricOperations.metrics(namespace,
"spark-executor-2")).thenReturn(metrics2)
+
+ val podResource1 = mock(classOf[SINGLE_POD])
+ val podResource2 = mock(classOf[SINGLE_POD])
+
when(podsWithNamespace.withName("spark-executor-1")).thenReturn(podResource1)
+
when(podsWithNamespace.withName("spark-executor-2")).thenReturn(podResource2)
+ when(podResource2.subresource(anyString())).thenReturn(podResource2)
+
+ plugin.invokePrivate(_checkAndIncreaseMemory(namespace, 0.9, 0.1))
+
+ verify(podResource1, never()).patch(any(), any(classOf[Pod]))
+ verify(podResource2, times(1)).patch(any(), any(classOf[Pod]))
+ }
+
+ test("Lower threshold should trigger resize more aggressively") {
+ val plugin = createPlugin()
+ val pod = createPodWithMemoryLimit(1, "1000000000") // 1GB limit
+ val metrics = createPodMetrics("spark-executor-1", "600000000") // 60%
usage
+
+ when(podList.getItems).thenReturn(Collections.singletonList(pod))
+ when(podMetricOperations.metrics(namespace,
"spark-executor-1")).thenReturn(metrics)
+
+ val podResource = mock(classOf[SINGLE_POD])
+
when(podsWithNamespace.withName("spark-executor-1")).thenReturn(podResource)
+ when(podResource.subresource(anyString())).thenReturn(podResource)
+
+ // Use 50% threshold - 60% usage should trigger resize
+ plugin.invokePrivate(_checkAndIncreaseMemory(namespace, 0.5, 0.1))
+
+ verify(podResource, times(1)).patch(any(), any(classOf[Pod]))
+ }
+
+ test("Fallback to first container when default container name not found") {
+ val plugin = createPlugin()
+ val customContainerName = "custom-executor"
+ val pod = createPodWithMemoryLimit(1, "1000000000", customContainerName)
+ val metrics = createPodMetrics("spark-executor-1", "950000000",
customContainerName)
+
+ when(podList.getItems).thenReturn(Collections.singletonList(pod))
+ when(podMetricOperations.metrics(namespace,
"spark-executor-1")).thenReturn(metrics)
+
+ val podResource = mock(classOf[SINGLE_POD])
+
when(podsWithNamespace.withName("spark-executor-1")).thenReturn(podResource)
+ when(podResource.subresource(anyString())).thenReturn(podResource)
+
+ plugin.invokePrivate(_checkAndIncreaseMemory(namespace, 0.9, 0.1))
+
+ verify(podResource, times(1)).patch(any(), any(classOf[Pod]))
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]