This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fca621ce157 [SPARK-55432][K8S] Support built-in K8s 
`ExecutorResizePlugin`
1fca621ce157 is described below

commit 1fca621ce157d8507f38b248cbeac222b231a7a0
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Mon Feb 9 03:04:49 2026 -0800

    [SPARK-55432][K8S] Support built-in K8s `ExecutorResizePlugin`
    
    ### What changes were proposed in this pull request?
    
    This PR aims to support built-in K8s executor in-place vertical scale 
plugin, `ExecutorResizePlugin`.
    
    ### Why are the changes needed?
    
    Since `In-Place Pod Resize` graduates to `Stable` at K8s v1.35+, Apache 
Spark users can use the features with the built-in plugin.
    
    - 
https://kubernetes.io/blog/2025/12/19/kubernetes-v1-35-in-place-pod-resize-ga/
    - 
https://kubernetes.io/docs/tasks/configure-pod-container/resize-container-resources/
    
    ### Does this PR introduce _any_ user-facing change?
    
    No because this built-in plugin is not enabled by default.
    
    ### How was this patch tested?
    
    Pass the CIs with the newly add test suite.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: `Gemini 3 Pro (High)` on `Antigravity`
    
    Closes #54215 from dongjoon-hyun/SPARK-55432.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../scala/org/apache/spark/deploy/k8s/Config.scala |  24 ++
 .../cluster/k8s/ExecutorResizePlugin.scala         | 171 ++++++++++++
 .../cluster/k8s/ExecutorResizePluginSuite.scala    | 290 +++++++++++++++++++++
 3 files changed, 485 insertions(+)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 3f131874fc5e..011b05f6542a 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -238,6 +238,30 @@ private[spark] object Config extends Logging {
       .checkValue(_ >= 0, "The minimum number of tasks should be 
non-negative.")
       .createWithDefault(0)
 
+  val EXECUTOR_RESIZE_INTERVAL =
+    ConfigBuilder("spark.kubernetes.executor.resizeInterval")
+      .doc("Interval between executor resize operations. To disable, set 0 
(default)")
+      .version("4.2.0")
+      .timeConf(TimeUnit.SECONDS)
+      .checkValue(_ >= 0, "Interval should be non-negative")
+      .createWithDefault(0)
+
+  val EXECUTOR_RESIZE_THRESHOLD =
+    ConfigBuilder("spark.kubernetes.executor.resizeThreshold")
+      .doc("The threshold to resize.")
+      .version("4.2.0")
+      .doubleConf
+      .checkValue(v => 0 < v && v < 1, "The threshold should be in (0, 1)")
+      .createWithDefault(0.9)
+
+  val EXECUTOR_RESIZE_FACTOR =
+    ConfigBuilder("spark.kubernetes.executor.resizeFactor")
+      .doc("The factor to resize.")
+      .version("4.2.0")
+      .doubleConf
+      .checkValue(v => 0 < v && v <= 1, "The factor should be in (0, 1]")
+      .createWithDefault(0.1)
+
   val KUBERNETES_AUTH_DRIVER_CONF_PREFIX = 
"spark.kubernetes.authenticate.driver"
   val KUBERNETES_AUTH_EXECUTOR_CONF_PREFIX = 
"spark.kubernetes.authenticate.executor"
   val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX = 
"spark.kubernetes.authenticate.driver.mounted"
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorResizePlugin.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorResizePlugin.scala
new file mode 100644
index 000000000000..77fd37dd538b
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorResizePlugin.scala
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.{Map => JMap}
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
+import scala.jdk.CollectionConverters._
+
+import io.fabric8.kubernetes.api.model.{PodBuilder, Quantity}
+import io.fabric8.kubernetes.client.KubernetesClient
+import io.fabric8.kubernetes.client.dsl.base.PatchContext
+import io.fabric8.kubernetes.client.dsl.base.PatchType
+
+import org.apache.spark.SparkContext
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, 
PluginContext, SparkPlugin}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.LogKeys.{EXECUTOR_ID, MEMORY_SIZE}
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * Spark plugin to monitor executor pod memory usage and increase the memory 
limit
+ * if the usage exceeds a threshold.
+ */
+class ExecutorResizePlugin extends SparkPlugin {
+  override def driverPlugin(): DriverPlugin = new ExecutorResizeDriverPlugin()
+
+  override def executorPlugin(): ExecutorPlugin = null
+}
+
+class ExecutorResizeDriverPlugin extends DriverPlugin with Logging {
+  private var sparkContext: SparkContext = _
+  private var kubernetesClient: KubernetesClient = _
+
+  private val periodicService: ScheduledExecutorService =
+    
ThreadUtils.newDaemonSingleThreadScheduledExecutor("executor-resize-plugin")
+
+  override def init(sc: SparkContext, ctx: PluginContext): JMap[String, 
String] = {
+    val interval = Utils.timeStringAsSeconds(
+      sc.conf.get(EXECUTOR_RESIZE_INTERVAL.key, "1m"))
+    val threshold = sc.conf.getDouble(EXECUTOR_RESIZE_THRESHOLD.key, 0.9)
+    val factor = sc.conf.getDouble(EXECUTOR_RESIZE_FACTOR.key, 0.1)
+    val namespace = sc.conf.get(KUBERNETES_NAMESPACE)
+
+    sparkContext = sc
+
+    try {
+      kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient(
+        sc.conf.get(KUBERNETES_DRIVER_MASTER_URL),
+        Option(namespace),
+        KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX,
+        SparkKubernetesClientFactory.ClientType.Driver,
+        sc.conf,
+        None)
+
+      periodicService.scheduleAtFixedRate(() => {
+        try {
+          checkAndIncreaseMemory(namespace, threshold, factor)
+        } catch {
+          case e: Throwable => logError("Error in memory check thread", e)
+        }
+      }, interval, interval, TimeUnit.SECONDS)
+    } catch {
+      case e: Exception =>
+        logError("Failed to initialize", e)
+    }
+
+    Map.empty[String, String].asJava
+  }
+
+  override def shutdown(): Unit = {
+    periodicService.shutdown()
+    if (kubernetesClient != null) {
+      kubernetesClient.close()
+    }
+  }
+
+  private def checkAndIncreaseMemory(namespace: String, threshold: Double, 
factor: Double): Unit = {
+    val appId = sparkContext.applicationId
+
+    // Get all running executor pods for this application
+    val pods = kubernetesClient.pods()
+      .inNamespace(namespace)
+      .withLabel(SPARK_APP_ID_LABEL, appId)
+      .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+      .list()
+      .getItems.asScala
+
+    pods.filter(_.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL) != 
null).foreach { pod =>
+      val execId = pod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL)
+      try {
+        val metrics = kubernetesClient.top().pods().metrics(namespace, 
pod.getMetadata.getName)
+
+        val containerMetrics = metrics.getContainers.asScala
+          .find(_.getName == DEFAULT_EXECUTOR_CONTAINER_NAME)
+          .orElse(metrics.getContainers.asScala.headOption)
+
+        containerMetrics.filter(_.getUsage.get("memory") != null).foreach { cm 
=>
+          val usageQuantity = cm.getUsage.get("memory")
+          val usage = Quantity.getAmountInBytes(usageQuantity).longValue()
+
+          // Identify the Spark container in the Pod spec to get the limit
+          val container = pod.getSpec.getContainers.asScala
+            .find(_.getName == DEFAULT_EXECUTOR_CONTAINER_NAME)
+            .orElse(pod.getSpec.getContainers.asScala.headOption)
+
+          container.filter(c => c.getResources.getLimits != null &&
+              c.getResources.getLimits.containsKey("memory")).foreach { c =>
+            val limit = 
Quantity.getAmountInBytes(c.getResources.getLimits.get("memory"))
+                .longValue()
+            if (usage > limit * threshold) {
+              val newLimit = (limit * (1.0 + factor)).toLong
+              val newQuantity = new Quantity(newLimit.toString)
+
+              logInfo(log"Increase executor ${MDC(EXECUTOR_ID, execId)} 
container memory " +
+                log"from ${MDC(MEMORY_SIZE, limit)} to ${MDC(MEMORY_SIZE, 
newLimit)} " +
+                log"as usage ${MDC(MEMORY_SIZE, usage)} exceeded threshold.")
+
+              // Patch the pod to update both memory request and limit
+              try {
+                kubernetesClient.pods()
+                  .inNamespace(namespace)
+                  .withName(pod.getMetadata.getName)
+                  .subresource("resize")
+                  .patch(PatchContext.of(PatchType.STRATEGIC_MERGE), new 
PodBuilder()
+                    .withNewMetadata()
+                    .endMetadata()
+                    .withNewSpec()
+                    .addNewContainer()
+                    .withName(c.getName)
+                    .withNewResources()
+                    .addToLimits("memory", newQuantity)
+                    .addToRequests("memory", newQuantity)
+                    .endResources()
+                    .endContainer()
+                    .endSpec()
+                    .build())
+              } catch {
+                case e: Throwable =>
+                  logInfo(log"Failed to update ${MDC(EXECUTOR_ID, execId)}", e)
+              }
+            } else {
+              logDebug(log"Executor ${MDC(EXECUTOR_ID, execId)} limit " +
+                log"${MDC(MEMORY_SIZE, limit)}, usage ${MDC(MEMORY_SIZE, 
usage)}")
+            }
+          }
+        }
+      } catch {
+        case e: Throwable =>
+          logDebug(log"Failed to handle ${MDC(EXECUTOR_ID, execId)}", e)
+      }
+    }
+  }
+}
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorResizePluginSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorResizePluginSuite.scala
new file mode 100644
index 000000000000..649ccd8a945d
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorResizePluginSuite.scala
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.Collections
+
+import scala.jdk.CollectionConverters._
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.api.model.metrics.v1beta1.{ContainerMetrics, 
PodMetrics}
+import io.fabric8.kubernetes.client.KubernetesClient
+import io.fabric8.kubernetes.client.dsl.{MetricAPIGroupDSL, PodMetricOperation}
+import org.mockito.ArgumentMatchers.{any, anyString}
+import org.mockito.Mockito.{mock, never, times, verify, when}
+import org.scalatest.BeforeAndAfter
+import org.scalatest.PrivateMethodTester
+
+import org.apache.spark.{SparkContext, SparkFunSuite}
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.Fabric8Aliases._
+
+class ExecutorResizePluginSuite
+    extends SparkFunSuite with BeforeAndAfter with PrivateMethodTester {
+
+  private val namespace = "test-namespace"
+  private val appId = "spark-test-app"
+
+  private var kubernetesClient: KubernetesClient = _
+  private var sparkContext: SparkContext = _
+  private var podOperations: PODS = _
+  private var podsWithNamespace: PODS_WITH_NAMESPACE = _
+  private var labeledPods: LABELED_PODS = _
+  private var podList: PodList = _
+  private var topOperations: MetricAPIGroupDSL = _
+  private var podMetricOperations: PodMetricOperation = _
+
+  private val _checkAndIncreaseMemory =
+    PrivateMethod[Unit](Symbol("checkAndIncreaseMemory"))
+
+  before {
+    kubernetesClient = mock(classOf[KubernetesClient])
+    sparkContext = mock(classOf[SparkContext])
+    podOperations = mock(classOf[PODS])
+    podsWithNamespace = mock(classOf[PODS_WITH_NAMESPACE])
+    labeledPods = mock(classOf[LABELED_PODS])
+    podList = mock(classOf[PodList])
+    topOperations = mock(classOf[MetricAPIGroupDSL])
+    podMetricOperations = mock(classOf[PodMetricOperation])
+
+    when(sparkContext.applicationId).thenReturn(appId)
+    when(kubernetesClient.pods()).thenReturn(podOperations)
+    when(podOperations.inNamespace(namespace)).thenReturn(podsWithNamespace)
+    when(podsWithNamespace.withLabel(SPARK_APP_ID_LABEL, 
appId)).thenReturn(labeledPods)
+    when(labeledPods.withLabel(SPARK_ROLE_LABEL, 
SPARK_POD_EXECUTOR_ROLE)).thenReturn(labeledPods)
+    when(labeledPods.list()).thenReturn(podList)
+    when(kubernetesClient.top()).thenReturn(topOperations)
+    when(topOperations.pods()).thenReturn(podMetricOperations)
+  }
+
+  private def createPlugin(): ExecutorResizeDriverPlugin = {
+    val plugin = new ExecutorResizeDriverPlugin()
+    // Use reflection to set private fields
+    val scField = plugin.getClass.getDeclaredField("sparkContext")
+    scField.setAccessible(true)
+    scField.set(plugin, sparkContext)
+
+    val clientField = plugin.getClass.getDeclaredField("kubernetesClient")
+    clientField.setAccessible(true)
+    clientField.set(plugin, kubernetesClient)
+
+    plugin
+  }
+
+  private def createPodWithMemoryLimit(
+      executorId: Long,
+      memoryLimit: String,
+      containerName: String = DEFAULT_EXECUTOR_CONTAINER_NAME): Pod = {
+    new PodBuilder()
+      .withNewMetadata()
+        .withName(s"spark-executor-$executorId")
+        .addToLabels(SPARK_APP_ID_LABEL, appId)
+        .addToLabels(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+        .addToLabels(SPARK_EXECUTOR_ID_LABEL, executorId.toString)
+      .endMetadata()
+      .withNewSpec()
+        .addNewContainer()
+          .withName(containerName)
+          .withNewResources()
+            .addToLimits("memory", new Quantity(memoryLimit))
+          .endResources()
+        .endContainer()
+      .endSpec()
+      .build()
+  }
+
+  private def createPodMetrics(
+      podName: String,
+      memoryUsage: String,
+      containerName: String = DEFAULT_EXECUTOR_CONTAINER_NAME): PodMetrics = {
+    val containerMetrics = new ContainerMetrics()
+    containerMetrics.setName(containerName)
+    containerMetrics.setUsage(Map("memory" -> new 
Quantity(memoryUsage)).asJava)
+
+    val podMetrics = new PodMetrics()
+    podMetrics.setContainers(Collections.singletonList(containerMetrics))
+    podMetrics
+  }
+
+  test("Empty pod list should not trigger any action") {
+    val plugin = createPlugin()
+    when(podList.getItems).thenReturn(Collections.emptyList())
+
+    plugin.invokePrivate(_checkAndIncreaseMemory(namespace, 0.9, 0.1))
+
+    verify(podMetricOperations, never()).metrics(anyString(), anyString())
+  }
+
+  test("Pod without executor ID label should be skipped") {
+    val plugin = createPlugin()
+    val pod = new PodBuilder()
+      .withNewMetadata()
+        .withName("spark-executor-1")
+        .addToLabels(SPARK_APP_ID_LABEL, appId)
+        .addToLabels(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+        // No SPARK_EXECUTOR_ID_LABEL
+      .endMetadata()
+      .build()
+
+    when(podList.getItems).thenReturn(Collections.singletonList(pod))
+
+    plugin.invokePrivate(_checkAndIncreaseMemory(namespace, 0.9, 0.1))
+
+    verify(podMetricOperations, never()).metrics(anyString(), anyString())
+  }
+
+  test("Memory usage below threshold should not trigger resize") {
+    val plugin = createPlugin()
+    val pod = createPodWithMemoryLimit(1, "1000000000") // 1GB limit
+    val metrics = createPodMetrics("spark-executor-1", "500000000") // 500MB 
usage (50%)
+
+    when(podList.getItems).thenReturn(Collections.singletonList(pod))
+    when(podMetricOperations.metrics(namespace, 
"spark-executor-1")).thenReturn(metrics)
+
+    val podResource = mock(classOf[SINGLE_POD])
+    
when(podsWithNamespace.withName("spark-executor-1")).thenReturn(podResource)
+
+    plugin.invokePrivate(_checkAndIncreaseMemory(namespace, 0.9, 0.1))
+
+    verify(podResource, never()).patch(any(), any(classOf[Pod]))
+  }
+
+  test("Memory usage above threshold should trigger resize") {
+    val plugin = createPlugin()
+    val pod = createPodWithMemoryLimit(1, "1000000000") // 1GB limit
+    val metrics = createPodMetrics("spark-executor-1", "950000000") // 950MB 
usage (95%)
+
+    when(podList.getItems).thenReturn(Collections.singletonList(pod))
+    when(podMetricOperations.metrics(namespace, 
"spark-executor-1")).thenReturn(metrics)
+
+    val podResource = mock(classOf[SINGLE_POD])
+    
when(podsWithNamespace.withName("spark-executor-1")).thenReturn(podResource)
+    when(podResource.subresource(anyString())).thenReturn(podResource)
+
+    plugin.invokePrivate(_checkAndIncreaseMemory(namespace, 0.9, 0.1))
+
+    verify(podResource, times(1)).patch(any(), any(classOf[Pod]))
+  }
+
+  test("Memory usage exactly at threshold should not trigger resize") {
+    val plugin = createPlugin()
+    val pod = createPodWithMemoryLimit(1, "1000000000") // 1GB limit
+    val metrics = createPodMetrics("spark-executor-1", "900000000") // 900MB 
usage (90%)
+
+    when(podList.getItems).thenReturn(Collections.singletonList(pod))
+    when(podMetricOperations.metrics(namespace, 
"spark-executor-1")).thenReturn(metrics)
+
+    val podResource = mock(classOf[SINGLE_POD])
+    
when(podsWithNamespace.withName("spark-executor-1")).thenReturn(podResource)
+
+    plugin.invokePrivate(_checkAndIncreaseMemory(namespace, 0.9, 0.1))
+
+    verify(podResource, never()).patch(any(), any(classOf[Pod]))
+  }
+
+  test("Pod without memory limit should be skipped") {
+    val plugin = createPlugin()
+    val pod = new PodBuilder()
+      .withNewMetadata()
+        .withName("spark-executor-1")
+        .addToLabels(SPARK_APP_ID_LABEL, appId)
+        .addToLabels(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+        .addToLabels(SPARK_EXECUTOR_ID_LABEL, "1")
+      .endMetadata()
+      .withNewSpec()
+        .addNewContainer()
+          .withName(DEFAULT_EXECUTOR_CONTAINER_NAME)
+          .withNewResources()
+            // No memory limit
+          .endResources()
+        .endContainer()
+      .endSpec()
+      .build()
+
+    val metrics = createPodMetrics("spark-executor-1", "500000000")
+
+    when(podList.getItems).thenReturn(Collections.singletonList(pod))
+    when(podMetricOperations.metrics(namespace, 
"spark-executor-1")).thenReturn(metrics)
+
+    val podResource = mock(classOf[SINGLE_POD])
+    
when(podsWithNamespace.withName("spark-executor-1")).thenReturn(podResource)
+
+    plugin.invokePrivate(_checkAndIncreaseMemory(namespace, 0.9, 0.1))
+
+    verify(podResource, never()).patch(any(), any(classOf[Pod]))
+  }
+
+  test("Multiple pods with mixed memory usage") {
+    val plugin = createPlugin()
+    val pod1 = createPodWithMemoryLimit(1, "1000000000") // 1GB limit
+    val pod2 = createPodWithMemoryLimit(2, "1000000000") // 1GB limit
+
+    val metrics1 = createPodMetrics("spark-executor-1", "500000000") // 50% - 
below threshold
+    val metrics2 = createPodMetrics("spark-executor-2", "950000000") // 95% - 
above threshold
+
+    when(podList.getItems).thenReturn(List(pod1, pod2).asJava)
+    when(podMetricOperations.metrics(namespace, 
"spark-executor-1")).thenReturn(metrics1)
+    when(podMetricOperations.metrics(namespace, 
"spark-executor-2")).thenReturn(metrics2)
+
+    val podResource1 = mock(classOf[SINGLE_POD])
+    val podResource2 = mock(classOf[SINGLE_POD])
+    
when(podsWithNamespace.withName("spark-executor-1")).thenReturn(podResource1)
+    
when(podsWithNamespace.withName("spark-executor-2")).thenReturn(podResource2)
+    when(podResource2.subresource(anyString())).thenReturn(podResource2)
+
+    plugin.invokePrivate(_checkAndIncreaseMemory(namespace, 0.9, 0.1))
+
+    verify(podResource1, never()).patch(any(), any(classOf[Pod]))
+    verify(podResource2, times(1)).patch(any(), any(classOf[Pod]))
+  }
+
+  test("Lower threshold should trigger resize more aggressively") {
+    val plugin = createPlugin()
+    val pod = createPodWithMemoryLimit(1, "1000000000") // 1GB limit
+    val metrics = createPodMetrics("spark-executor-1", "600000000") // 60% 
usage
+
+    when(podList.getItems).thenReturn(Collections.singletonList(pod))
+    when(podMetricOperations.metrics(namespace, 
"spark-executor-1")).thenReturn(metrics)
+
+    val podResource = mock(classOf[SINGLE_POD])
+    
when(podsWithNamespace.withName("spark-executor-1")).thenReturn(podResource)
+    when(podResource.subresource(anyString())).thenReturn(podResource)
+
+    // Use 50% threshold - 60% usage should trigger resize
+    plugin.invokePrivate(_checkAndIncreaseMemory(namespace, 0.5, 0.1))
+
+    verify(podResource, times(1)).patch(any(), any(classOf[Pod]))
+  }
+
+  test("Fallback to first container when default container name not found") {
+    val plugin = createPlugin()
+    val customContainerName = "custom-executor"
+    val pod = createPodWithMemoryLimit(1, "1000000000", customContainerName)
+    val metrics = createPodMetrics("spark-executor-1", "950000000", 
customContainerName)
+
+    when(podList.getItems).thenReturn(Collections.singletonList(pod))
+    when(podMetricOperations.metrics(namespace, 
"spark-executor-1")).thenReturn(metrics)
+
+    val podResource = mock(classOf[SINGLE_POD])
+    
when(podsWithNamespace.withName("spark-executor-1")).thenReturn(podResource)
+    when(podResource.subresource(anyString())).thenReturn(podResource)
+
+    plugin.invokePrivate(_checkAndIncreaseMemory(namespace, 0.9, 0.1))
+
+    verify(podResource, times(1)).patch(any(), any(classOf[Pod]))
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to