This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 7b67f23af0e5 [SPARK-51666][CORE] Fix sparkStageCompleted 
executorRunTime metric calculation
7b67f23af0e5 is described below

commit 7b67f23af0e51addda2f57389a025f8da5a352e4
Author: Weichen Xu <[email protected]>
AuthorDate: Tue Apr 1 16:09:24 2025 +0800

    [SPARK-51666][CORE] Fix sparkStageCompleted executorRunTime metric 
calculation
    
    ### What changes were proposed in this pull request?
    
    Fix sparkStageCompleted executorRunTime metric calculation:
    
    In case of when a spark task uses multiple CPU’s, the CPU seconds should 
capture the total execution seconds across all CPU’s. i.e. if a stage set 
cpus-of-task to be 48, if we used 10 seconds on each CPU, the total CPU seconds 
for that stage should be 10 seconds X 1 Tasks X 48 CPU = 480 CPU-seconds. If 
another task only used 1 CPU then its total CPU seconds is 10 seconds X 1 CPU = 
10 CPU-Seconds.
    
    This is very important fix since spark introduces stage level scheduling 
(so that different stage tasks are configured with different number of CPUs) , 
without this fix, in stage level scheduling case, the metric calculation is 
wrong.
    
    ### Why are the changes needed?
    
    Bugfix
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    No.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #50459 from WeichenXu123/SPARK-51666.
    
    Authored-by: Weichen Xu <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit 4e5ed454fb292bc22cbdb6fc69b7de322e0afeff)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 core/src/main/scala/org/apache/spark/executor/Executor.scala | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 96dfbd5c853e..e77f54f64134 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -545,7 +545,8 @@ private[spark] class Executor(
         t.metrics.setExecutorRunTime(TimeUnit.NANOSECONDS.toMillis(
           // SPARK-32898: it's possible that a task is killed when 
taskStartTimeNs has the initial
           // value(=0) still. In this case, the executorRunTime should be 
considered as 0.
-          if (taskStartTimeNs > 0) System.nanoTime() - taskStartTimeNs else 0))
+          if (taskStartTimeNs > 0) (System.nanoTime() - taskStartTimeNs) * 
taskDescription.cpus
+          else 0))
         t.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
       })
 
@@ -702,7 +703,8 @@ private[spark] class Executor(
           (taskStartCpu - deserializeStartCpuTime) + 
task.executorDeserializeCpuTime)
         // We need to subtract Task.run()'s deserialization time to avoid 
double-counting
         task.metrics.setExecutorRunTime(TimeUnit.NANOSECONDS.toMillis(
-          (taskFinishNs - taskStartTimeNs) - task.executorDeserializeTimeNs))
+          (taskFinishNs - taskStartTimeNs) * taskDescription.cpus
+            - task.executorDeserializeTimeNs))
         task.metrics.setExecutorCpuTime(
           (taskFinishCpu - taskStartCpu) - task.executorDeserializeCpuTime)
         task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to