This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 3f69cff2f314 [SPARK-54505][UI][SHUFFLE] Correct the arguments order of
createMetrics call in makeNegative
3f69cff2f314 is described below
commit 3f69cff2f314fa93765d2184588dafb0a6526afe
Author: Eric Yang <[email protected]>
AuthorDate: Fri Nov 28 20:36:31 2025 +0800
[SPARK-54505][UI][SHUFFLE] Correct the arguments order of createMetrics
call in makeNegative
### What changes were proposed in this pull request?
In the method `org.apache.spark.status.LiveEntityHelpers.makeNegative`, it
calls `createMetrics` with a wrong arguments order which passes metric values
to wrong positions. The order is wrong starting from the
`shufflePushReadMetrics. corruptMergedBlockChunks`
### Why are the changes needed?
Passing argument values to wrong positions messes up the method call to
`createMetrics` and eventually negates metrics with wrong values.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added a test case.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #53218 from jiwen624/makeNegative-arg-order.
Authored-by: Eric Yang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit a75815e7b7e79a01cae9b595842aae5328e809be)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../scala/org/apache/spark/status/LiveEntity.scala | 77 ++++++------
.../org/apache/spark/status/LiveEntitySuite.scala | 129 +++++++++++++++++++++
2 files changed, 172 insertions(+), 34 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
index efc670440bc6..3c4efd8a5ead 100644
--- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
+++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
@@ -859,40 +859,49 @@ private[spark] object LiveEntityHelpers {
}
createMetrics(
- updateMetricValue(m.executorDeserializeTime),
- updateMetricValue(m.executorDeserializeCpuTime),
- updateMetricValue(m.executorRunTime),
- updateMetricValue(m.executorCpuTime),
- updateMetricValue(m.resultSize),
- updateMetricValue(m.jvmGcTime),
- updateMetricValue(m.resultSerializationTime),
- updateMetricValue(m.memoryBytesSpilled),
- updateMetricValue(m.diskBytesSpilled),
- updateMetricValue(m.peakExecutionMemory),
- updateMetricValue(m.inputMetrics.bytesRead),
- updateMetricValue(m.inputMetrics.recordsRead),
-
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.corruptMergedBlockChunks),
-
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.mergedFetchFallbackCount),
-
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedBlocksFetched),
-
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.localMergedBlocksFetched),
-
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedChunksFetched),
-
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.localMergedChunksFetched),
-
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedBytesRead),
-
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.localMergedBytesRead),
- updateMetricValue(m.shuffleReadMetrics.remoteReqsDuration),
-
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedReqsDuration),
- updateMetricValue(m.outputMetrics.bytesWritten),
- updateMetricValue(m.outputMetrics.recordsWritten),
- updateMetricValue(m.shuffleReadMetrics.remoteBlocksFetched),
- updateMetricValue(m.shuffleReadMetrics.localBlocksFetched),
- updateMetricValue(m.shuffleReadMetrics.fetchWaitTime),
- updateMetricValue(m.shuffleReadMetrics.remoteBytesRead),
- updateMetricValue(m.shuffleReadMetrics.remoteBytesReadToDisk),
- updateMetricValue(m.shuffleReadMetrics.localBytesRead),
- updateMetricValue(m.shuffleReadMetrics.recordsRead),
- updateMetricValue(m.shuffleWriteMetrics.bytesWritten),
- updateMetricValue(m.shuffleWriteMetrics.writeTime),
- updateMetricValue(m.shuffleWriteMetrics.recordsWritten))
+ executorDeserializeTime = updateMetricValue(m.executorDeserializeTime),
+ executorDeserializeCpuTime =
updateMetricValue(m.executorDeserializeCpuTime),
+ executorRunTime = updateMetricValue(m.executorRunTime),
+ executorCpuTime = updateMetricValue(m.executorCpuTime),
+ resultSize = updateMetricValue(m.resultSize),
+ jvmGcTime = updateMetricValue(m.jvmGcTime),
+ resultSerializationTime = updateMetricValue(m.resultSerializationTime),
+ memoryBytesSpilled = updateMetricValue(m.memoryBytesSpilled),
+ diskBytesSpilled = updateMetricValue(m.diskBytesSpilled),
+ peakExecutionMemory = updateMetricValue(m.peakExecutionMemory),
+ inputBytesRead = updateMetricValue(m.inputMetrics.bytesRead),
+ inputRecordsRead = updateMetricValue(m.inputMetrics.recordsRead),
+ outputBytesWritten = updateMetricValue(m.outputMetrics.bytesWritten),
+ outputRecordsWritten = updateMetricValue(m.outputMetrics.recordsWritten),
+ shuffleRemoteBlocksFetched =
updateMetricValue(m.shuffleReadMetrics.remoteBlocksFetched),
+ shuffleLocalBlocksFetched =
updateMetricValue(m.shuffleReadMetrics.localBlocksFetched),
+ shuffleFetchWaitTime =
updateMetricValue(m.shuffleReadMetrics.fetchWaitTime),
+ shuffleRemoteBytesRead =
updateMetricValue(m.shuffleReadMetrics.remoteBytesRead),
+ shuffleRemoteBytesReadToDisk =
updateMetricValue(m.shuffleReadMetrics.remoteBytesReadToDisk),
+ shuffleLocalBytesRead =
updateMetricValue(m.shuffleReadMetrics.localBytesRead),
+ shuffleRecordsRead = updateMetricValue(m.shuffleReadMetrics.recordsRead),
+ shuffleCorruptMergedBlockChunks =
+
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.corruptMergedBlockChunks),
+ shuffleMergedFetchFallbackCount =
+
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.mergedFetchFallbackCount),
+ shuffleMergedRemoteBlocksFetched =
+
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedBlocksFetched),
+ shuffleMergedLocalBlocksFetched =
+
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.localMergedBlocksFetched),
+ shuffleMergedRemoteChunksFetched =
+
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedChunksFetched),
+ shuffleMergedLocalChunksFetched =
+
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.localMergedChunksFetched),
+ shuffleMergedRemoteBytesRead =
+
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedBytesRead),
+ shuffleMergedLocalBytesRead =
+
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.localMergedBytesRead),
+ shuffleRemoteReqsDuration =
updateMetricValue(m.shuffleReadMetrics.remoteReqsDuration),
+ shuffleMergedRemoteReqsDuration =
+
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedReqsDuration),
+ shuffleBytesWritten =
updateMetricValue(m.shuffleWriteMetrics.bytesWritten),
+ shuffleWriteTime = updateMetricValue(m.shuffleWriteMetrics.writeTime),
+ shuffleRecordsWritten =
updateMetricValue(m.shuffleWriteMetrics.recordsWritten))
}
private def addMetrics(m1: v1.TaskMetrics, m2: v1.TaskMetrics, mult: Int):
v1.TaskMetrics = {
diff --git a/core/src/test/scala/org/apache/spark/status/LiveEntitySuite.scala
b/core/src/test/scala/org/apache/spark/status/LiveEntitySuite.scala
index 35e8a62c93c9..bed822f0b457 100644
--- a/core/src/test/scala/org/apache/spark/status/LiveEntitySuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/LiveEntitySuite.scala
@@ -66,6 +66,135 @@ class LiveEntitySuite extends SparkFunSuite {
assert(accuInfo.value == "[1,2,3,4,5,... 5 more items]")
}
+ test("makeNegative correctly negates all metrics with proper argument
order") {
+ import LiveEntityHelpers._
+
+ val originalMetrics = createMetrics(
+ executorDeserializeTime = 1L,
+ executorDeserializeCpuTime = 2L,
+ executorRunTime = 3L,
+ executorCpuTime = 4L,
+ resultSize = 5L,
+ jvmGcTime = 6L,
+ resultSerializationTime = 7L,
+ memoryBytesSpilled = 8L,
+ diskBytesSpilled = 9L,
+ peakExecutionMemory = 10L,
+ inputBytesRead = 11L,
+ inputRecordsRead = 12L,
+ outputBytesWritten = 13L,
+ outputRecordsWritten = 14L,
+ shuffleRemoteBlocksFetched = 15L,
+ shuffleLocalBlocksFetched = 16L,
+ shuffleFetchWaitTime = 17L,
+ shuffleRemoteBytesRead = 18L,
+ shuffleRemoteBytesReadToDisk = 19L,
+ shuffleLocalBytesRead = 20L,
+ shuffleRecordsRead = 21L,
+ shuffleCorruptMergedBlockChunks = 22L,
+ shuffleMergedFetchFallbackCount = 23L,
+ shuffleMergedRemoteBlocksFetched = 24L,
+ shuffleMergedLocalBlocksFetched = 25L,
+ shuffleMergedRemoteChunksFetched = 26L,
+ shuffleMergedLocalChunksFetched = 27L,
+ shuffleMergedRemoteBytesRead = 28L,
+ shuffleMergedLocalBytesRead = 29L,
+ shuffleRemoteReqsDuration = 30L,
+ shuffleMergedRemoteReqsDuration = 31L,
+ shuffleBytesWritten = 32L,
+ shuffleWriteTime = 33L,
+ shuffleRecordsWritten = 34L
+ )
+
+ val negatedMetrics = makeNegative(originalMetrics)
+
+ def expectedNegated(v: Long): Long = v * -1L - 1L
+
+ // Verify all fields are correctly negated
+ assert(negatedMetrics.executorDeserializeTime === expectedNegated(1L))
+ assert(negatedMetrics.executorDeserializeCpuTime === expectedNegated(2L))
+ assert(negatedMetrics.executorRunTime === expectedNegated(3L))
+ assert(negatedMetrics.executorCpuTime === expectedNegated(4L))
+ assert(negatedMetrics.resultSize === expectedNegated(5L))
+ assert(negatedMetrics.jvmGcTime === expectedNegated(6L))
+ assert(negatedMetrics.resultSerializationTime === expectedNegated(7L))
+ assert(negatedMetrics.memoryBytesSpilled === expectedNegated(8L))
+ assert(negatedMetrics.diskBytesSpilled === expectedNegated(9L))
+ assert(negatedMetrics.peakExecutionMemory === expectedNegated(10L))
+
+ // Verify input metrics
+ assert(negatedMetrics.inputMetrics.bytesRead === expectedNegated(11L))
+ assert(negatedMetrics.inputMetrics.recordsRead === expectedNegated(12L))
+
+ // Verify output metrics (these were in wrong position in current master)
+ assert(negatedMetrics.outputMetrics.bytesWritten === expectedNegated(13L),
+ "outputMetrics.bytesWritten should be correctly negated")
+ assert(negatedMetrics.outputMetrics.recordsWritten ===
expectedNegated(14L),
+ "outputMetrics.recordsWritten should be correctly negated")
+
+ // Verify shuffle read metrics (these were in wrong position in current
master)
+ assert(negatedMetrics.shuffleReadMetrics.remoteBlocksFetched ===
expectedNegated(15L),
+ "shuffleReadMetrics.remoteBlocksFetched should be correctly negated")
+ assert(negatedMetrics.shuffleReadMetrics.localBlocksFetched ===
expectedNegated(16L),
+ "shuffleReadMetrics.localBlocksFetched should be correctly negated")
+ assert(negatedMetrics.shuffleReadMetrics.fetchWaitTime ===
expectedNegated(17L),
+ "shuffleReadMetrics.fetchWaitTime should be correctly negated")
+ assert(negatedMetrics.shuffleReadMetrics.remoteBytesRead ===
expectedNegated(18L),
+ "shuffleReadMetrics.remoteBytesRead should be correctly negated")
+ assert(negatedMetrics.shuffleReadMetrics.remoteBytesReadToDisk ===
expectedNegated(19L),
+ "shuffleReadMetrics.remoteBytesReadToDisk should be correctly negated")
+ assert(negatedMetrics.shuffleReadMetrics.localBytesRead ===
expectedNegated(20L),
+ "shuffleReadMetrics.localBytesRead should be correctly negated")
+ assert(negatedMetrics.shuffleReadMetrics.recordsRead ===
expectedNegated(21L),
+ "shuffleReadMetrics.recordsRead should be correctly negated")
+
+ // Verify shuffle push read metrics (these were in wrong position in
current master)
+
assert(negatedMetrics.shuffleReadMetrics.shufflePushReadMetrics.corruptMergedBlockChunks
===
+ expectedNegated(22L),
+ "shufflePushReadMetrics.corruptMergedBlockChunks should be correctly
negated")
+
assert(negatedMetrics.shuffleReadMetrics.shufflePushReadMetrics.mergedFetchFallbackCount
===
+ expectedNegated(23L),
+ "shufflePushReadMetrics.mergedFetchFallbackCount should be correctly
negated")
+
assert(negatedMetrics.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedBlocksFetched
===
+ expectedNegated(24L),
+ "shufflePushReadMetrics.remoteMergedBlocksFetched should be correctly
negated")
+
assert(negatedMetrics.shuffleReadMetrics.shufflePushReadMetrics.localMergedBlocksFetched
===
+ expectedNegated(25L),
+ "shufflePushReadMetrics.localMergedBlocksFetched should be correctly
negated")
+
assert(negatedMetrics.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedChunksFetched
===
+ expectedNegated(26L),
+ "shufflePushReadMetrics.remoteMergedChunksFetched should be correctly
negated")
+
assert(negatedMetrics.shuffleReadMetrics.shufflePushReadMetrics.localMergedChunksFetched
===
+ expectedNegated(27L),
+ "shufflePushReadMetrics.localMergedChunksFetched should be correctly
negated")
+
assert(negatedMetrics.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedBytesRead
===
+ expectedNegated(28L),
+ "shufflePushReadMetrics.remoteMergedBytesRead should be correctly
negated")
+
assert(negatedMetrics.shuffleReadMetrics.shufflePushReadMetrics.localMergedBytesRead
===
+ expectedNegated(29L),
+ "shufflePushReadMetrics.localMergedBytesRead should be correctly
negated")
+ assert(negatedMetrics.shuffleReadMetrics.remoteReqsDuration ===
expectedNegated(30L),
+ "shuffleReadMetrics.remoteReqsDuration should be correctly negated")
+
assert(negatedMetrics.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedReqsDuration
===
+ expectedNegated(31L),
+ "shufflePushReadMetrics.remoteMergedReqsDuration should be correctly
negated")
+
+ // Verify shuffle write metrics
+ assert(negatedMetrics.shuffleWriteMetrics.bytesWritten ===
expectedNegated(32L))
+ assert(negatedMetrics.shuffleWriteMetrics.writeTime ===
expectedNegated(33L))
+ assert(negatedMetrics.shuffleWriteMetrics.recordsWritten ===
expectedNegated(34L))
+
+ // Verify zero handling: 0 should become -1
+ val zeroMetrics = createMetrics(default = 0L)
+ val negatedZeroMetrics = makeNegative(zeroMetrics)
+ assert(negatedZeroMetrics.executorDeserializeTime === -1L,
+ "Zero value should be converted to -1")
+ assert(negatedZeroMetrics.inputMetrics.bytesRead === -1L,
+ "Zero input metric should be converted to -1")
+ assert(negatedZeroMetrics.outputMetrics.bytesWritten === -1L,
+ "Zero output metric should be converted to -1")
+ }
+
private def checkSize(seq: Seq[_], expected: Int): Unit = {
assert(seq.length === expected)
var count = 0
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]