This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new 10a958b3 [SPARK-37049][K8S] executorIdleTimeout should check `creationTimestamp` instead of `startTime` 10a958b3 is described below commit 10a958b3480dd536c869247fe3e83e823a533331 Author: Weiwei Yang <wy...@cloudera.com> AuthorDate: Tue Oct 19 22:42:06 2021 -0700 [SPARK-37049][K8S] executorIdleTimeout should check `creationTimestamp` instead of `startTime` SPARK-33099 added the support to respect `spark.dynamicAllocation.executorIdleTimeout` in `ExecutorPodsAllocator`. However, when it checks if a pending executor pod is timed out, it checks against the pod's [startTime](https://github.com/kubernetes/api/blob/2a5dae08c42b1e8fdc1379432d8898efece65363/core/v1/types.go#L3664-L3667), see code [here](https://github.com/apache/spark/blob/c2ba498ff678ddda034cedf45cc17fbeefe922fd/resource-managers/kubernetes/core/src/main/scala/org/apache/spark [...] This can be reproduced locally, run the following job ``` ${SPARK_HOME}/bin/spark-submit --master k8s://http://localhost:8001 --deploy-mode cluster --name spark-group-example \ --master k8s://http://localhost:8001 --deploy-mode cluster \ --class org.apache.spark.examples.GroupByTest \ --conf spark.executor.instances=1 \ --conf spark.kubernetes.namespace=spark-test \ --conf spark.kubernetes.executor.request.cores=1 \ --conf spark.dynamicAllocation.enabled=true \ --conf spark.shuffle.service.enabled=true \ --conf spark.dynamicAllocation.shuffleTracking.enabled=true \ --conf spark.shuffle.service.enabled=false \ --conf spark.kubernetes.container.image=local/spark:3.3.0 \ --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \ local:///opt/spark/examples/jars/spark-examples_2.12-3.3.0-SNAPSHOT.jar \ 1000 1000 100 1000 ``` the local cluster doesn't have enough resources to run more than 4 executors, the rest of the executor pods will be pending. The job will have task backlogs and triggers to request more executors from K8s: ``` 21/10/19 22:51:45 INFO ExecutorPodsAllocator: Going to request 1 executors from Kubernetes for ResourceProfile Id: 0, target: 1 running: 0. 21/10/19 22:51:51 INFO ExecutorPodsAllocator: Going to request 1 executors from Kubernetes for ResourceProfile Id: 0, target: 2 running: 1. 21/10/19 22:51:52 INFO ExecutorPodsAllocator: Going to request 2 executors from Kubernetes for ResourceProfile Id: 0, target: 4 running: 2. 21/10/19 22:51:53 INFO ExecutorPodsAllocator: Going to request 4 executors from Kubernetes for ResourceProfile Id: 0, target: 8 running: 4. ... 21/10/19 22:52:14 INFO ExecutorPodsAllocator: Deleting 39 excess pod requests (23,59,32,41,50,68,35,44,17,8,53,62,26,71,11,56,29,38,47,20,65,5,14,46,64,73,55,49,40,67,58,13,22,31,7,16,52,70,43). 21/10/19 22:52:18 INFO ExecutorPodsAllocator: Deleting 28 excess pod requests (25,34,61,37,10,19,28,60,69,63,45,54,72,36,18,9,27,21,57,12,48,30,39,66,15,42,24,33). ``` At `22:51:45`, it starts to request executors; and at `22:52:14` it starts to delete excess executor pods. This is 29s but spark.dynamicAllocation.executorIdleTimeout is set to 60s. The config was not honored. ### What changes were proposed in this pull request? Change the check from using pod's `startTime` to `creationTimestamp`. [creationTimestamp](https://github.com/kubernetes/apimachinery/blob/e6c90c4366be1504309a6aafe0d816856450f36a/pkg/apis/meta/v1/types.go#L193-L201) is the timestamp when a pod gets created on K8s: ``` // CreationTimestamp is a timestamp representing the server time when this object was // created. It is not guaranteed to be set in happens-before order across separate operations. // Clients may not set this value. It is represented in RFC3339 form and is in UTC. ``` [startTime](https://github.com/kubernetes/api/blob/2a5dae08c42b1e8fdc1379432d8898efece65363/core/v1/types.go#L3664-L3667) is the timestamp when pod gets started: ``` // RFC 3339 date and time at which the object was acknowledged by the Kubelet. // This is before the Kubelet pulled the container image(s) for the pod. // +optional ``` a pending pod's startTime is empty. Here is a example of a pending pod: ``` NAMESPACE NAME READY STATUS RESTARTS AGE default pending-pod-example 0/1 Pending 0 2s kubectl get pod pending-pod-example -o yaml | grep creationTimestamp ---> creationTimestamp: "2021-10-19T16:17:52Z" // pending pod has no startTime kubectl get pod pending-pod-example -o yaml | grep startTime ---> // empty // running pod has startTime set to the timestamp when the pod gets started kubectl get pod coredns-558bd4d5db-6qrtx -n kube-system -o yaml | grep startTime f:startTime: {} ---> startTime: "2021-08-04T23:44:44Z" ``` ### Why are the changes needed? This fixed the issue that `spark.dynamicAllocation.executorIdleTimeout` currently is not honored by pending executor pods. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? The PR includes the UT changes, that has the testing coverage for this issue. Closes #34319 from yangwwei/SPARK-37049. Authored-by: Weiwei Yang <wy...@cloudera.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> (cherry picked from commit 041cd5d7d15ec4184ae51a8a10a26bef05bd261f) Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala | 8 ++++---- .../spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala | 4 +++- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index cee5360..dc79f22 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -453,11 +453,11 @@ private[spark] class ExecutorPodsAllocator( private def isExecutorIdleTimedOut(state: ExecutorPodState, currentTime: Long): Boolean = { try { - val startTime = Instant.parse(state.pod.getStatus.getStartTime).toEpochMilli() - currentTime - startTime > executorIdleTimeout + val creationTime = Instant.parse(state.pod.getMetadata.getCreationTimestamp).toEpochMilli() + currentTime - creationTime > executorIdleTimeout } catch { - case _: Exception => - logDebug(s"Cannot get startTime of pod ${state.pod}") + case e: Exception => + logError(s"Cannot get the creationTimestamp of the pod: ${state.pod}", e) true } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala index 78f11f9..de9da0d 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala @@ -64,9 +64,11 @@ object ExecutorLifecycleTestUtils { def pendingExecutor(executorId: Long, rpId: Int = DEFAULT_RESOURCE_PROFILE_ID): Pod = { new PodBuilder(podWithAttachedContainerForId(executorId, rpId)) + .editOrNewMetadata() + .withCreationTimestamp(Instant.now.toString) + .endMetadata() .editOrNewStatus() .withPhase("pending") - .withStartTime(Instant.now.toString) .endStatus() .build() } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org