This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 2d5fc0a2b [#2709] fix(spark): Fix serialization error in Spark History
UI (#2710)
2d5fc0a2b is described below
commit 2d5fc0a2b8ffb0bece50cff76a864bc3a992660d
Author: Zhen Wang <[email protected]>
AuthorDate: Tue Jan 13 14:15:23 2026 +0800
[#2709] fix(spark): Fix serialization error in Spark History UI (#2710)
### What changes were proposed in this pull request?
+ Add cleaner ShuffleReadTimesSummary/ShuffleWriteTimesSummary entities
+ Make ShuffleType as an enumeration
### Why are the changes needed?
for #2709
Fix serialization error when enabled
`spark.history.store.hybridStore.enabled`
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Successfully tested in internal spark history server environment.
---------
Co-authored-by: Junfan Zhang <[email protected]>
---
.../scala/org/apache/spark/UniffleListener.scala | 6 +--
.../org/apache/spark/UniffleStatusStore.scala | 57 +++++++++++++++++----
.../scala/org/apache/spark/ui/ShufflePage.scala | 58 +++++++++++-----------
spotbugs-exclude.xml | 4 ++
4 files changed, 83 insertions(+), 42 deletions(-)
diff --git
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
index 4770b89ff..f5a504403 100644
---
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
+++
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
@@ -34,8 +34,8 @@ class UniffleListener(conf: SparkConf, kvstore:
ElementTrackingStore)
private val writeTaskInfo = ShuffleTaskSummary(shuffleType =
ShuffleType.WRITE)
private val readTaskInfo = ShuffleTaskSummary(shuffleType = ShuffleType.READ)
- private val aggregatedShuffleReadTimes = new ShuffleReadTimes()
- private val aggregatedShuffleWriteTimes = new ShuffleWriteTimes()
+ private val aggregatedShuffleReadTimes = ShuffleReadTimesSummary()
+ private val aggregatedShuffleWriteTimes = ShuffleWriteTimesSummary()
private val aggregatedShuffleWriteMetric = new ConcurrentHashMap[String,
AggregatedShuffleWriteMetric]
private val aggregatedShuffleReadMetric = new ConcurrentHashMap[String,
AggregatedShuffleReadMetric]
@@ -156,7 +156,7 @@ class UniffleListener(conf: SparkConf, kvstore:
ElementTrackingStore)
agg_metric.hadoopByteSize += rmetric.getHadoopByteSize
agg_metric.hadoopDurationMillis += rmetric.getHadoopDurationMillis
}
- aggregatedShuffleReadTimes.merge(event.getShuffleReadTimes)
+ aggregatedShuffleReadTimes.inc(event.getShuffleReadTimes)
val failureReason = event.getFailureReason
if (StringUtils.isNotEmpty(failureReason)) {
diff --git
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
index e95758297..6fde10e70 100644
---
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
+++
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
@@ -33,7 +33,7 @@ class UniffleStatusStore(store: KVStore) {
Utils.tryWithResource(view.closeableIterator())(iter =>
iter.asScala.toList)
}
- def shuffleTaskSummary(shuffleType: ShuffleType): ShuffleTaskSummary = {
+ def shuffleTaskSummary(shuffleType: ShuffleType.Value): ShuffleTaskSummary =
{
val kClass = classOf[ShuffleTaskSummary]
try {
store.read(kClass, s"${kClass.getName}_$shuffleType")
@@ -65,7 +65,7 @@ class UniffleStatusStore(store: KVStore) {
try {
store.read(kClass, kClass.getName)
} catch {
- case _: NoSuchElementException => AggregatedShuffleReadTimesUIData(new
ShuffleReadTimes())
+ case _: NoSuchElementException => AggregatedShuffleReadTimesUIData(new
ShuffleReadTimesSummary())
}
}
@@ -74,7 +74,7 @@ class UniffleStatusStore(store: KVStore) {
try {
store.read(kClass, kClass.getName)
} catch {
- case _: NoSuchElementException => AggregatedShuffleWriteTimesUIData(new
ShuffleWriteTimes())
+ case _: NoSuchElementException => AggregatedShuffleWriteTimesUIData(new
ShuffleWriteTimesSummary())
}
}
@@ -178,13 +178,52 @@ case class AggregatedTaskInfoUIData(cpuTimeMillis: Long,
def id: String = classOf[AggregatedTaskInfoUIData].getName()
}
-case class AggregatedShuffleWriteTimesUIData(times: ShuffleWriteTimes) {
+case class ShuffleReadTimesSummary(var fetch: Long = 0,
+ var backgroundFetch: Long = 0,
+ var crc: Long = 0,
+ var copy: Long = 0,
+ var deserialize: Long = 0,
+ var decompress: Long = 0,
+ var backgroundDecompress: Long = 0,
+ var total: Long = 0) {
+ def inc(times: ShuffleReadTimes): Unit = {
+ if (times == null) return
+ this.fetch += times.getFetch
+ this.crc += times.getCrc
+ this.copy += times.getCopy
+ this.deserialize += times.getDeserialize
+ this.decompress += times.getDecompress
+ this.backgroundDecompress += times.getBackgroundDecompress
+ this.backgroundFetch += times.getBackgroundFetch
+ }
+}
+
+case class ShuffleWriteTimesSummary(var copy: Long = 0,
+ var serialize: Long = 0,
+ var compress: Long = 0,
+ var sort: Long = 0,
+ var requireMemory: Long = 0,
+ var waitFinish: Long = 0,
+ var total: Long = 0) {
+ def inc(times: ShuffleWriteTimes): Unit = {
+ if (times == null) return
+ this.total += times.getTotal
+ this.copy += times.getCopy
+ this.serialize += times.getSerialize
+ this.compress += times.getCompress
+ this.sort += times.getSort
+ this.requireMemory += times.getRequireMemory
+ this.waitFinish += times.getWaitFinish
+ }
+}
+
+case class AggregatedShuffleWriteTimesUIData(times: ShuffleWriteTimesSummary) {
@JsonIgnore
@KVIndex
def id: String = classOf[AggregatedShuffleWriteTimesUIData].getName()
}
-case class AggregatedShuffleReadTimesUIData(times: ShuffleReadTimes) {
+case class AggregatedShuffleReadTimesUIData(times: ShuffleReadTimesSummary) {
@JsonIgnore
@KVIndex
def id: String = classOf[AggregatedShuffleReadTimesUIData].getName()
@@ -196,13 +235,11 @@ case class ReassignInfoUIData(event:
TaskReassignInfoEvent) {
def id: String = classOf[ReassignInfoUIData].getName()
}
-sealed abstract class ShuffleType private ()
-object ShuffleType {
- val READ: ShuffleType = new ShuffleType {}
- val WRITE: ShuffleType = new ShuffleType {}
+object ShuffleType extends Enumeration {
+ val READ, WRITE = Value
}
-case class ShuffleTaskSummary(shuffleType: ShuffleType,
+case class ShuffleTaskSummary(shuffleType: ShuffleType.Value,
var failureReasons: mutable.HashSet[String] =
new mutable.HashSet[String](),
var failedTaskNumber: Long = -1,
var failedTaskMaxAttemptNumber: Long = -1) {
diff --git
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
index ecdbd0c93..edfc1740b 100644
---
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
+++
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
@@ -162,30 +162,30 @@ class ShufflePage(parent: ShuffleTab) extends
WebUIPage("") with Logging {
// render shuffle read times
val readTimes = runtimeStatusStore.shuffleReadTimes().times
- val readTotal = if (readTimes.getTotal <= 0) -1 else readTimes.getTotal
+ val readTotal = if (readTimes.total <= 0) -1 else readTimes.total
val readTimesUI = UIUtils.listingTable(
Seq("Total", "Fetch", "Copy", "CRC", "Deserialize", "Decompress",
"Background Decompress", "Background Fetch"),
shuffleReadTimesRow,
Seq(
Seq(
UIUtils.formatDuration(readTotal),
- UIUtils.formatDuration(readTimes.getFetch),
- UIUtils.formatDuration(readTimes.getCopy),
- UIUtils.formatDuration(readTimes.getCrc),
- UIUtils.formatDuration(readTimes.getDeserialize),
- UIUtils.formatDuration(readTimes.getDecompress),
- UIUtils.formatDuration(readTimes.getBackgroundDecompress),
- UIUtils.formatDuration(readTimes.getBackgroundFetch),
+ UIUtils.formatDuration(readTimes.fetch),
+ UIUtils.formatDuration(readTimes.copy),
+ UIUtils.formatDuration(readTimes.crc),
+ UIUtils.formatDuration(readTimes.deserialize),
+ UIUtils.formatDuration(readTimes.decompress),
+ UIUtils.formatDuration(readTimes.backgroundDecompress),
+ UIUtils.formatDuration(readTimes.backgroundFetch),
),
Seq(
1,
- readTimes.getFetch.toDouble / readTotal,
- readTimes.getCopy.toDouble / readTotal,
- readTimes.getCrc.toDouble / readTotal,
- readTimes.getDeserialize.toDouble / readTotal,
- readTimes.getDecompress.toDouble / readTotal,
- readTimes.getBackgroundDecompress.toDouble / readTotal,
- readTimes.getBackgroundFetch.toDouble / readTotal,
+ readTimes.fetch.toDouble / readTotal,
+ readTimes.copy.toDouble / readTotal,
+ readTimes.crc.toDouble / readTotal,
+ readTimes.deserialize.toDouble / readTotal,
+ readTimes.decompress.toDouble / readTotal,
+ readTimes.backgroundDecompress.toDouble / readTotal,
+ readTimes.backgroundFetch.toDouble / readTotal,
).map(x => roundToTwoDecimals(x).toString)
),
fixedWidth = true
@@ -193,28 +193,28 @@ class ShufflePage(parent: ShuffleTab) extends
WebUIPage("") with Logging {
// render shuffle write times
val writeTimes = runtimeStatusStore.shuffleWriteTimes().times
- val writeTotal = if (writeTimes.getTotal <= 0) -1 else writeTimes.getTotal
+ val writeTotal = if (writeTimes.total <= 0) -1 else writeTimes.total
val writeTimesUI = UIUtils.listingTable(
Seq("Total Time", "Wait Finish Time", "Copy Time", "Serialize Time",
"Compress Time", "Sort Time", "Require Memory Time"),
shuffleWriteTimesRow,
Seq(
Seq(
- UIUtils.formatDuration(writeTimes.getTotal),
- UIUtils.formatDuration(writeTimes.getWaitFinish),
- UIUtils.formatDuration(writeTimes.getCopy),
- UIUtils.formatDuration(writeTimes.getSerialize),
- UIUtils.formatDuration(writeTimes.getCompress),
- UIUtils.formatDuration(writeTimes.getSort),
- UIUtils.formatDuration(writeTimes.getRequireMemory),
+ UIUtils.formatDuration(writeTimes.total),
+ UIUtils.formatDuration(writeTimes.waitFinish),
+ UIUtils.formatDuration(writeTimes.copy),
+ UIUtils.formatDuration(writeTimes.serialize),
+ UIUtils.formatDuration(writeTimes.compress),
+ UIUtils.formatDuration(writeTimes.sort),
+ UIUtils.formatDuration(writeTimes.requireMemory),
),
Seq(
1.toDouble,
- writeTimes.getWaitFinish.toDouble / writeTotal,
- writeTimes.getCopy.toDouble / writeTotal,
- writeTimes.getSerialize.toDouble / writeTotal,
- writeTimes.getCompress.toDouble / writeTotal,
- writeTimes.getSort.toDouble / writeTotal,
- writeTimes.getRequireMemory.toDouble / writeTotal,
+ writeTimes.waitFinish.toDouble / writeTotal,
+ writeTimes.copy.toDouble / writeTotal,
+ writeTimes.serialize.toDouble / writeTotal,
+ writeTimes.compress.toDouble / writeTotal,
+ writeTimes.sort.toDouble / writeTotal,
+ writeTimes.requireMemory.toDouble / writeTotal,
).map(x => roundToTwoDecimals(x).toString)
),
fixedWidth = true
diff --git a/spotbugs-exclude.xml b/spotbugs-exclude.xml
index 9231414f9..29da06de4 100644
--- a/spotbugs-exclude.xml
+++ b/spotbugs-exclude.xml
@@ -22,4 +22,8 @@
<Match>
<Bug
pattern="MS_EXPOSE_REP,EI_EXPOSE_REP,EI_EXPOSE_REP2,EI_EXPOSE_STATIC_REP2,THROWS_METHOD_THROWS_CLAUSE_BASIC_EXCEPTION,THROWS_METHOD_THROWS_CLAUSE_THROWABLE,MS_PKGPROTECT,MS_CANNOT_BE_FINAL,SE_BAD_FIELD,SWL_SLEEP_WITH_LOCK_HELD,IS2_INCONSISTENT_SYNC"
/>
</Match>
+ <Match>
+ <Bug pattern="NM_METHOD_NAMING_CONVENTION"/>
+ <Class name="org.apache.spark.ShuffleType"/>
+ </Match>
</FindBugsFilter>