This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 3f69cff2f314 [SPARK-54505][UI][SHUFFLE] Correct the arguments order of 
createMetrics call in makeNegative
3f69cff2f314 is described below

commit 3f69cff2f314fa93765d2184588dafb0a6526afe
Author: Eric Yang <[email protected]>
AuthorDate: Fri Nov 28 20:36:31 2025 +0800

    [SPARK-54505][UI][SHUFFLE] Correct the arguments order of createMetrics 
call in makeNegative
    
    ### What changes were proposed in this pull request?
    In the method `org.apache.spark.status.LiveEntityHelpers.makeNegative`, it 
calls `createMetrics` with a wrong arguments order which passes metric values 
to wrong positions. The order is wrong starting from the 
`shufflePushReadMetrics. corruptMergedBlockChunks`
    
    ### Why are the changes needed?
    Passing argument values to wrong positions messes up the method call to 
`createMetrics` and eventually negates metrics with wrong values.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added a test case.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #53218 from jiwen624/makeNegative-arg-order.
    
    Authored-by: Eric Yang <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit a75815e7b7e79a01cae9b595842aae5328e809be)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../scala/org/apache/spark/status/LiveEntity.scala |  77 ++++++------
 .../org/apache/spark/status/LiveEntitySuite.scala  | 129 +++++++++++++++++++++
 2 files changed, 172 insertions(+), 34 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala 
b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
index efc670440bc6..3c4efd8a5ead 100644
--- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
+++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
@@ -859,40 +859,49 @@ private[spark] object LiveEntityHelpers {
     }
 
     createMetrics(
-      updateMetricValue(m.executorDeserializeTime),
-      updateMetricValue(m.executorDeserializeCpuTime),
-      updateMetricValue(m.executorRunTime),
-      updateMetricValue(m.executorCpuTime),
-      updateMetricValue(m.resultSize),
-      updateMetricValue(m.jvmGcTime),
-      updateMetricValue(m.resultSerializationTime),
-      updateMetricValue(m.memoryBytesSpilled),
-      updateMetricValue(m.diskBytesSpilled),
-      updateMetricValue(m.peakExecutionMemory),
-      updateMetricValue(m.inputMetrics.bytesRead),
-      updateMetricValue(m.inputMetrics.recordsRead),
-      
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.corruptMergedBlockChunks),
-      
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.mergedFetchFallbackCount),
-      
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedBlocksFetched),
-      
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.localMergedBlocksFetched),
-      
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedChunksFetched),
-      
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.localMergedChunksFetched),
-      
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedBytesRead),
-      
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.localMergedBytesRead),
-      updateMetricValue(m.shuffleReadMetrics.remoteReqsDuration),
-      
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedReqsDuration),
-      updateMetricValue(m.outputMetrics.bytesWritten),
-      updateMetricValue(m.outputMetrics.recordsWritten),
-      updateMetricValue(m.shuffleReadMetrics.remoteBlocksFetched),
-      updateMetricValue(m.shuffleReadMetrics.localBlocksFetched),
-      updateMetricValue(m.shuffleReadMetrics.fetchWaitTime),
-      updateMetricValue(m.shuffleReadMetrics.remoteBytesRead),
-      updateMetricValue(m.shuffleReadMetrics.remoteBytesReadToDisk),
-      updateMetricValue(m.shuffleReadMetrics.localBytesRead),
-      updateMetricValue(m.shuffleReadMetrics.recordsRead),
-      updateMetricValue(m.shuffleWriteMetrics.bytesWritten),
-      updateMetricValue(m.shuffleWriteMetrics.writeTime),
-      updateMetricValue(m.shuffleWriteMetrics.recordsWritten))
+      executorDeserializeTime = updateMetricValue(m.executorDeserializeTime),
+      executorDeserializeCpuTime = 
updateMetricValue(m.executorDeserializeCpuTime),
+      executorRunTime = updateMetricValue(m.executorRunTime),
+      executorCpuTime = updateMetricValue(m.executorCpuTime),
+      resultSize = updateMetricValue(m.resultSize),
+      jvmGcTime = updateMetricValue(m.jvmGcTime),
+      resultSerializationTime = updateMetricValue(m.resultSerializationTime),
+      memoryBytesSpilled = updateMetricValue(m.memoryBytesSpilled),
+      diskBytesSpilled = updateMetricValue(m.diskBytesSpilled),
+      peakExecutionMemory = updateMetricValue(m.peakExecutionMemory),
+      inputBytesRead = updateMetricValue(m.inputMetrics.bytesRead),
+      inputRecordsRead = updateMetricValue(m.inputMetrics.recordsRead),
+      outputBytesWritten = updateMetricValue(m.outputMetrics.bytesWritten),
+      outputRecordsWritten = updateMetricValue(m.outputMetrics.recordsWritten),
+      shuffleRemoteBlocksFetched = 
updateMetricValue(m.shuffleReadMetrics.remoteBlocksFetched),
+      shuffleLocalBlocksFetched = 
updateMetricValue(m.shuffleReadMetrics.localBlocksFetched),
+      shuffleFetchWaitTime = 
updateMetricValue(m.shuffleReadMetrics.fetchWaitTime),
+      shuffleRemoteBytesRead = 
updateMetricValue(m.shuffleReadMetrics.remoteBytesRead),
+      shuffleRemoteBytesReadToDisk = 
updateMetricValue(m.shuffleReadMetrics.remoteBytesReadToDisk),
+      shuffleLocalBytesRead = 
updateMetricValue(m.shuffleReadMetrics.localBytesRead),
+      shuffleRecordsRead = updateMetricValue(m.shuffleReadMetrics.recordsRead),
+      shuffleCorruptMergedBlockChunks =
+        
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.corruptMergedBlockChunks),
+      shuffleMergedFetchFallbackCount =
+        
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.mergedFetchFallbackCount),
+      shuffleMergedRemoteBlocksFetched =
+        
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedBlocksFetched),
+      shuffleMergedLocalBlocksFetched =
+        
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.localMergedBlocksFetched),
+      shuffleMergedRemoteChunksFetched =
+        
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedChunksFetched),
+      shuffleMergedLocalChunksFetched =
+        
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.localMergedChunksFetched),
+      shuffleMergedRemoteBytesRead =
+        
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedBytesRead),
+      shuffleMergedLocalBytesRead =
+        
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.localMergedBytesRead),
+      shuffleRemoteReqsDuration = 
updateMetricValue(m.shuffleReadMetrics.remoteReqsDuration),
+      shuffleMergedRemoteReqsDuration =
+        
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedReqsDuration),
+      shuffleBytesWritten = 
updateMetricValue(m.shuffleWriteMetrics.bytesWritten),
+      shuffleWriteTime = updateMetricValue(m.shuffleWriteMetrics.writeTime),
+      shuffleRecordsWritten = 
updateMetricValue(m.shuffleWriteMetrics.recordsWritten))
   }
 
   private def addMetrics(m1: v1.TaskMetrics, m2: v1.TaskMetrics, mult: Int): 
v1.TaskMetrics = {
diff --git a/core/src/test/scala/org/apache/spark/status/LiveEntitySuite.scala 
b/core/src/test/scala/org/apache/spark/status/LiveEntitySuite.scala
index 35e8a62c93c9..bed822f0b457 100644
--- a/core/src/test/scala/org/apache/spark/status/LiveEntitySuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/LiveEntitySuite.scala
@@ -66,6 +66,135 @@ class LiveEntitySuite extends SparkFunSuite {
     assert(accuInfo.value == "[1,2,3,4,5,... 5 more items]")
   }
 
+  test("makeNegative correctly negates all metrics with proper argument 
order") {
+    import LiveEntityHelpers._
+
+    val originalMetrics = createMetrics(
+      executorDeserializeTime = 1L,
+      executorDeserializeCpuTime = 2L,
+      executorRunTime = 3L,
+      executorCpuTime = 4L,
+      resultSize = 5L,
+      jvmGcTime = 6L,
+      resultSerializationTime = 7L,
+      memoryBytesSpilled = 8L,
+      diskBytesSpilled = 9L,
+      peakExecutionMemory = 10L,
+      inputBytesRead = 11L,
+      inputRecordsRead = 12L,
+      outputBytesWritten = 13L,
+      outputRecordsWritten = 14L,
+      shuffleRemoteBlocksFetched = 15L,
+      shuffleLocalBlocksFetched = 16L,
+      shuffleFetchWaitTime = 17L,
+      shuffleRemoteBytesRead = 18L,
+      shuffleRemoteBytesReadToDisk = 19L,
+      shuffleLocalBytesRead = 20L,
+      shuffleRecordsRead = 21L,
+      shuffleCorruptMergedBlockChunks = 22L,
+      shuffleMergedFetchFallbackCount = 23L,
+      shuffleMergedRemoteBlocksFetched = 24L,
+      shuffleMergedLocalBlocksFetched = 25L,
+      shuffleMergedRemoteChunksFetched = 26L,
+      shuffleMergedLocalChunksFetched = 27L,
+      shuffleMergedRemoteBytesRead = 28L,
+      shuffleMergedLocalBytesRead = 29L,
+      shuffleRemoteReqsDuration = 30L,
+      shuffleMergedRemoteReqsDuration = 31L,
+      shuffleBytesWritten = 32L,
+      shuffleWriteTime = 33L,
+      shuffleRecordsWritten = 34L
+    )
+
+    val negatedMetrics = makeNegative(originalMetrics)
+
+    def expectedNegated(v: Long): Long = v * -1L - 1L
+
+    // Verify all fields are correctly negated
+    assert(negatedMetrics.executorDeserializeTime === expectedNegated(1L))
+    assert(negatedMetrics.executorDeserializeCpuTime === expectedNegated(2L))
+    assert(negatedMetrics.executorRunTime === expectedNegated(3L))
+    assert(negatedMetrics.executorCpuTime === expectedNegated(4L))
+    assert(negatedMetrics.resultSize === expectedNegated(5L))
+    assert(negatedMetrics.jvmGcTime === expectedNegated(6L))
+    assert(negatedMetrics.resultSerializationTime === expectedNegated(7L))
+    assert(negatedMetrics.memoryBytesSpilled === expectedNegated(8L))
+    assert(negatedMetrics.diskBytesSpilled === expectedNegated(9L))
+    assert(negatedMetrics.peakExecutionMemory === expectedNegated(10L))
+
+    // Verify input metrics
+    assert(negatedMetrics.inputMetrics.bytesRead === expectedNegated(11L))
+    assert(negatedMetrics.inputMetrics.recordsRead === expectedNegated(12L))
+
+    // Verify output metrics (these were in wrong position in current master)
+    assert(negatedMetrics.outputMetrics.bytesWritten === expectedNegated(13L),
+      "outputMetrics.bytesWritten should be correctly negated")
+    assert(negatedMetrics.outputMetrics.recordsWritten === 
expectedNegated(14L),
+      "outputMetrics.recordsWritten should be correctly negated")
+
+    // Verify shuffle read metrics (these were in wrong position in current 
master)
+    assert(negatedMetrics.shuffleReadMetrics.remoteBlocksFetched === 
expectedNegated(15L),
+      "shuffleReadMetrics.remoteBlocksFetched should be correctly negated")
+    assert(negatedMetrics.shuffleReadMetrics.localBlocksFetched === 
expectedNegated(16L),
+      "shuffleReadMetrics.localBlocksFetched should be correctly negated")
+    assert(negatedMetrics.shuffleReadMetrics.fetchWaitTime === 
expectedNegated(17L),
+      "shuffleReadMetrics.fetchWaitTime should be correctly negated")
+    assert(negatedMetrics.shuffleReadMetrics.remoteBytesRead === 
expectedNegated(18L),
+      "shuffleReadMetrics.remoteBytesRead should be correctly negated")
+    assert(negatedMetrics.shuffleReadMetrics.remoteBytesReadToDisk === 
expectedNegated(19L),
+      "shuffleReadMetrics.remoteBytesReadToDisk should be correctly negated")
+    assert(negatedMetrics.shuffleReadMetrics.localBytesRead === 
expectedNegated(20L),
+      "shuffleReadMetrics.localBytesRead should be correctly negated")
+    assert(negatedMetrics.shuffleReadMetrics.recordsRead === 
expectedNegated(21L),
+      "shuffleReadMetrics.recordsRead should be correctly negated")
+
+    // Verify shuffle push read metrics (these were in wrong position in 
current master)
+    
assert(negatedMetrics.shuffleReadMetrics.shufflePushReadMetrics.corruptMergedBlockChunks
 ===
+      expectedNegated(22L),
+      "shufflePushReadMetrics.corruptMergedBlockChunks should be correctly 
negated")
+    
assert(negatedMetrics.shuffleReadMetrics.shufflePushReadMetrics.mergedFetchFallbackCount
 ===
+      expectedNegated(23L),
+      "shufflePushReadMetrics.mergedFetchFallbackCount should be correctly 
negated")
+    
assert(negatedMetrics.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedBlocksFetched
 ===
+      expectedNegated(24L),
+      "shufflePushReadMetrics.remoteMergedBlocksFetched should be correctly 
negated")
+    
assert(negatedMetrics.shuffleReadMetrics.shufflePushReadMetrics.localMergedBlocksFetched
 ===
+      expectedNegated(25L),
+      "shufflePushReadMetrics.localMergedBlocksFetched should be correctly 
negated")
+    
assert(negatedMetrics.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedChunksFetched
 ===
+      expectedNegated(26L),
+      "shufflePushReadMetrics.remoteMergedChunksFetched should be correctly 
negated")
+    
assert(negatedMetrics.shuffleReadMetrics.shufflePushReadMetrics.localMergedChunksFetched
 ===
+      expectedNegated(27L),
+      "shufflePushReadMetrics.localMergedChunksFetched should be correctly 
negated")
+    
assert(negatedMetrics.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedBytesRead
 ===
+      expectedNegated(28L),
+      "shufflePushReadMetrics.remoteMergedBytesRead should be correctly 
negated")
+    
assert(negatedMetrics.shuffleReadMetrics.shufflePushReadMetrics.localMergedBytesRead
 ===
+      expectedNegated(29L),
+      "shufflePushReadMetrics.localMergedBytesRead should be correctly 
negated")
+    assert(negatedMetrics.shuffleReadMetrics.remoteReqsDuration === 
expectedNegated(30L),
+      "shuffleReadMetrics.remoteReqsDuration should be correctly negated")
+    
assert(negatedMetrics.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedReqsDuration
 ===
+      expectedNegated(31L),
+      "shufflePushReadMetrics.remoteMergedReqsDuration should be correctly 
negated")
+
+    // Verify shuffle write metrics
+    assert(negatedMetrics.shuffleWriteMetrics.bytesWritten === 
expectedNegated(32L))
+    assert(negatedMetrics.shuffleWriteMetrics.writeTime === 
expectedNegated(33L))
+    assert(negatedMetrics.shuffleWriteMetrics.recordsWritten === 
expectedNegated(34L))
+
+    // Verify zero handling: 0 should become -1
+    val zeroMetrics = createMetrics(default = 0L)
+    val negatedZeroMetrics = makeNegative(zeroMetrics)
+    assert(negatedZeroMetrics.executorDeserializeTime === -1L,
+      "Zero value should be converted to -1")
+    assert(negatedZeroMetrics.inputMetrics.bytesRead === -1L,
+      "Zero input metric should be converted to -1")
+    assert(negatedZeroMetrics.outputMetrics.bytesWritten === -1L,
+      "Zero output metric should be converted to -1")
+  }
+
   private def checkSize(seq: Seq[_], expected: Int): Unit = {
     assert(seq.length === expected)
     var count = 0


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to