This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d5fc0a2b [#2709] fix(spark): Fix serialization error in Spark History 
UI (#2710)
2d5fc0a2b is described below

commit 2d5fc0a2b8ffb0bece50cff76a864bc3a992660d
Author: Zhen Wang <[email protected]>
AuthorDate: Tue Jan 13 14:15:23 2026 +0800

    [#2709] fix(spark): Fix serialization error in Spark History UI (#2710)
    
    ### What changes were proposed in this pull request?
    
    + Add cleaner ShuffleReadTimesSummary/ShuffleWriteTimesSummary entities
    + Make ShuffleType as an enumeration
    
    ### Why are the changes needed?
    
    for #2709
    
    Fix serialization error when enabled 
`spark.history.store.hybridStore.enabled`
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Successfully tested in internal spark history server environment.
    
    ---------
    
    Co-authored-by: Junfan Zhang <[email protected]>
---
 .../scala/org/apache/spark/UniffleListener.scala   |  6 +--
 .../org/apache/spark/UniffleStatusStore.scala      | 57 +++++++++++++++++----
 .../scala/org/apache/spark/ui/ShufflePage.scala    | 58 +++++++++++-----------
 spotbugs-exclude.xml                               |  4 ++
 4 files changed, 83 insertions(+), 42 deletions(-)

diff --git 
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala 
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
index 4770b89ff..f5a504403 100644
--- 
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
+++ 
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
@@ -34,8 +34,8 @@ class UniffleListener(conf: SparkConf, kvstore: 
ElementTrackingStore)
   private val writeTaskInfo = ShuffleTaskSummary(shuffleType = 
ShuffleType.WRITE)
   private val readTaskInfo = ShuffleTaskSummary(shuffleType = ShuffleType.READ)
 
-  private val aggregatedShuffleReadTimes = new ShuffleReadTimes()
-  private val aggregatedShuffleWriteTimes = new ShuffleWriteTimes()
+  private val aggregatedShuffleReadTimes = ShuffleReadTimesSummary()
+  private val aggregatedShuffleWriteTimes = ShuffleWriteTimesSummary()
   private val aggregatedShuffleWriteMetric = new ConcurrentHashMap[String, 
AggregatedShuffleWriteMetric]
   private val aggregatedShuffleReadMetric = new ConcurrentHashMap[String, 
AggregatedShuffleReadMetric]
 
@@ -156,7 +156,7 @@ class UniffleListener(conf: SparkConf, kvstore: 
ElementTrackingStore)
       agg_metric.hadoopByteSize += rmetric.getHadoopByteSize
       agg_metric.hadoopDurationMillis += rmetric.getHadoopDurationMillis
     }
-    aggregatedShuffleReadTimes.merge(event.getShuffleReadTimes)
+    aggregatedShuffleReadTimes.inc(event.getShuffleReadTimes)
 
     val failureReason = event.getFailureReason
     if (StringUtils.isNotEmpty(failureReason)) {
diff --git 
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
 
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
index e95758297..6fde10e70 100644
--- 
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
+++ 
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
@@ -33,7 +33,7 @@ class UniffleStatusStore(store: KVStore) {
     Utils.tryWithResource(view.closeableIterator())(iter => 
iter.asScala.toList)
   }
 
-  def shuffleTaskSummary(shuffleType: ShuffleType): ShuffleTaskSummary = {
+  def shuffleTaskSummary(shuffleType: ShuffleType.Value): ShuffleTaskSummary = 
{
     val kClass = classOf[ShuffleTaskSummary]
     try {
       store.read(kClass, s"${kClass.getName}_$shuffleType")
@@ -65,7 +65,7 @@ class UniffleStatusStore(store: KVStore) {
     try {
       store.read(kClass, kClass.getName)
     } catch {
-      case _: NoSuchElementException => AggregatedShuffleReadTimesUIData(new 
ShuffleReadTimes())
+      case _: NoSuchElementException => AggregatedShuffleReadTimesUIData(new 
ShuffleReadTimesSummary())
     }
   }
 
@@ -74,7 +74,7 @@ class UniffleStatusStore(store: KVStore) {
     try {
       store.read(kClass, kClass.getName)
     } catch {
-      case _: NoSuchElementException => AggregatedShuffleWriteTimesUIData(new 
ShuffleWriteTimes())
+      case _: NoSuchElementException => AggregatedShuffleWriteTimesUIData(new 
ShuffleWriteTimesSummary())
     }
   }
 
@@ -178,13 +178,52 @@ case class AggregatedTaskInfoUIData(cpuTimeMillis: Long,
   def id: String = classOf[AggregatedTaskInfoUIData].getName()
 }
 
-case class AggregatedShuffleWriteTimesUIData(times: ShuffleWriteTimes) {
+case class ShuffleReadTimesSummary(var fetch: Long = 0,
+                                   var backgroundFetch: Long = 0,
+                                   var crc: Long = 0,
+                                   var copy: Long = 0,
+                                   var deserialize: Long = 0,
+                                   var decompress: Long = 0,
+                                   var backgroundDecompress: Long = 0,
+                                   var total: Long = 0) {
+  def inc(times: ShuffleReadTimes): Unit = {
+    if (times == null) return
+    this.fetch += times.getFetch
+    this.crc += times.getCrc
+    this.copy += times.getCopy
+    this.deserialize += times.getDeserialize
+    this.decompress += times.getDecompress
+    this.backgroundDecompress += times.getBackgroundDecompress
+    this.backgroundFetch += times.getBackgroundFetch
+  }
+}
+
+case class ShuffleWriteTimesSummary(var copy: Long = 0,
+                                    var serialize: Long = 0,
+                                    var compress: Long = 0,
+                                    var sort: Long = 0,
+                                    var requireMemory: Long = 0,
+                                    var waitFinish: Long = 0,
+                                    var total: Long = 0) {
+  def inc(times: ShuffleWriteTimes): Unit = {
+    if (times == null) return
+    this.total += times.getTotal
+    this.copy += times.getCopy
+    this.serialize += times.getSerialize
+    this.compress += times.getCompress
+    this.sort += times.getSort
+    this.requireMemory += times.getRequireMemory
+    this.waitFinish += times.getWaitFinish
+  }
+}
+
+case class AggregatedShuffleWriteTimesUIData(times: ShuffleWriteTimesSummary) {
   @JsonIgnore
   @KVIndex
   def id: String = classOf[AggregatedShuffleWriteTimesUIData].getName()
 }
 
-case class AggregatedShuffleReadTimesUIData(times: ShuffleReadTimes) {
+case class AggregatedShuffleReadTimesUIData(times: ShuffleReadTimesSummary) {
   @JsonIgnore
   @KVIndex
   def id: String = classOf[AggregatedShuffleReadTimesUIData].getName()
@@ -196,13 +235,11 @@ case class ReassignInfoUIData(event: 
TaskReassignInfoEvent) {
   def id: String = classOf[ReassignInfoUIData].getName()
 }
 
-sealed abstract class ShuffleType private ()
-object ShuffleType {
-  val READ: ShuffleType = new ShuffleType {}
-  val WRITE: ShuffleType = new ShuffleType {}
+object ShuffleType extends Enumeration {
+  val READ, WRITE = Value
 }
 
-case class ShuffleTaskSummary(shuffleType: ShuffleType,
+case class ShuffleTaskSummary(shuffleType: ShuffleType.Value,
                               var failureReasons: mutable.HashSet[String] = 
new mutable.HashSet[String](),
                               var failedTaskNumber: Long = -1,
                               var failedTaskMaxAttemptNumber: Long = -1) {
diff --git 
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala 
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
index ecdbd0c93..edfc1740b 100644
--- 
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
+++ 
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
@@ -162,30 +162,30 @@ class ShufflePage(parent: ShuffleTab) extends 
WebUIPage("") with Logging {
 
     // render shuffle read times
     val readTimes = runtimeStatusStore.shuffleReadTimes().times
-    val readTotal = if (readTimes.getTotal <= 0) -1 else readTimes.getTotal
+    val readTotal = if (readTimes.total <= 0) -1 else readTimes.total
     val readTimesUI = UIUtils.listingTable(
       Seq("Total", "Fetch", "Copy", "CRC", "Deserialize", "Decompress", 
"Background Decompress", "Background Fetch"),
       shuffleReadTimesRow,
       Seq(
         Seq(
           UIUtils.formatDuration(readTotal),
-          UIUtils.formatDuration(readTimes.getFetch),
-          UIUtils.formatDuration(readTimes.getCopy),
-          UIUtils.formatDuration(readTimes.getCrc),
-          UIUtils.formatDuration(readTimes.getDeserialize),
-          UIUtils.formatDuration(readTimes.getDecompress),
-          UIUtils.formatDuration(readTimes.getBackgroundDecompress),
-          UIUtils.formatDuration(readTimes.getBackgroundFetch),
+          UIUtils.formatDuration(readTimes.fetch),
+          UIUtils.formatDuration(readTimes.copy),
+          UIUtils.formatDuration(readTimes.crc),
+          UIUtils.formatDuration(readTimes.deserialize),
+          UIUtils.formatDuration(readTimes.decompress),
+          UIUtils.formatDuration(readTimes.backgroundDecompress),
+          UIUtils.formatDuration(readTimes.backgroundFetch),
         ),
         Seq(
           1,
-          readTimes.getFetch.toDouble / readTotal,
-          readTimes.getCopy.toDouble / readTotal,
-          readTimes.getCrc.toDouble / readTotal,
-          readTimes.getDeserialize.toDouble / readTotal,
-          readTimes.getDecompress.toDouble / readTotal,
-          readTimes.getBackgroundDecompress.toDouble / readTotal,
-          readTimes.getBackgroundFetch.toDouble / readTotal,
+          readTimes.fetch.toDouble / readTotal,
+          readTimes.copy.toDouble / readTotal,
+          readTimes.crc.toDouble / readTotal,
+          readTimes.deserialize.toDouble / readTotal,
+          readTimes.decompress.toDouble / readTotal,
+          readTimes.backgroundDecompress.toDouble / readTotal,
+          readTimes.backgroundFetch.toDouble / readTotal,
         ).map(x => roundToTwoDecimals(x).toString)
       ),
       fixedWidth = true
@@ -193,28 +193,28 @@ class ShufflePage(parent: ShuffleTab) extends 
WebUIPage("") with Logging {
 
     // render shuffle write times
     val writeTimes = runtimeStatusStore.shuffleWriteTimes().times
-    val writeTotal = if (writeTimes.getTotal <= 0) -1 else writeTimes.getTotal
+    val writeTotal = if (writeTimes.total <= 0) -1 else writeTimes.total
     val writeTimesUI = UIUtils.listingTable(
       Seq("Total Time", "Wait Finish Time", "Copy Time", "Serialize Time", 
"Compress Time", "Sort Time", "Require Memory Time"),
       shuffleWriteTimesRow,
       Seq(
         Seq(
-          UIUtils.formatDuration(writeTimes.getTotal),
-          UIUtils.formatDuration(writeTimes.getWaitFinish),
-          UIUtils.formatDuration(writeTimes.getCopy),
-          UIUtils.formatDuration(writeTimes.getSerialize),
-          UIUtils.formatDuration(writeTimes.getCompress),
-          UIUtils.formatDuration(writeTimes.getSort),
-          UIUtils.formatDuration(writeTimes.getRequireMemory),
+          UIUtils.formatDuration(writeTimes.total),
+          UIUtils.formatDuration(writeTimes.waitFinish),
+          UIUtils.formatDuration(writeTimes.copy),
+          UIUtils.formatDuration(writeTimes.serialize),
+          UIUtils.formatDuration(writeTimes.compress),
+          UIUtils.formatDuration(writeTimes.sort),
+          UIUtils.formatDuration(writeTimes.requireMemory),
         ),
         Seq(
           1.toDouble,
-          writeTimes.getWaitFinish.toDouble / writeTotal,
-          writeTimes.getCopy.toDouble / writeTotal,
-          writeTimes.getSerialize.toDouble / writeTotal,
-          writeTimes.getCompress.toDouble / writeTotal,
-          writeTimes.getSort.toDouble / writeTotal,
-          writeTimes.getRequireMemory.toDouble / writeTotal,
+          writeTimes.waitFinish.toDouble / writeTotal,
+          writeTimes.copy.toDouble / writeTotal,
+          writeTimes.serialize.toDouble / writeTotal,
+          writeTimes.compress.toDouble / writeTotal,
+          writeTimes.sort.toDouble / writeTotal,
+          writeTimes.requireMemory.toDouble / writeTotal,
         ).map(x => roundToTwoDecimals(x).toString)
       ),
       fixedWidth = true
diff --git a/spotbugs-exclude.xml b/spotbugs-exclude.xml
index 9231414f9..29da06de4 100644
--- a/spotbugs-exclude.xml
+++ b/spotbugs-exclude.xml
@@ -22,4 +22,8 @@
   <Match>
     <Bug 
pattern="MS_EXPOSE_REP,EI_EXPOSE_REP,EI_EXPOSE_REP2,EI_EXPOSE_STATIC_REP2,THROWS_METHOD_THROWS_CLAUSE_BASIC_EXCEPTION,THROWS_METHOD_THROWS_CLAUSE_THROWABLE,MS_PKGPROTECT,MS_CANNOT_BE_FINAL,SE_BAD_FIELD,SWL_SLEEP_WITH_LOCK_HELD,IS2_INCONSISTENT_SYNC"
 />
   </Match>
+  <Match>
+    <Bug pattern="NM_METHOD_NAMING_CONVENTION"/>
+    <Class name="org.apache.spark.ShuffleType"/>
+  </Match>
 </FindBugsFilter>

Reply via email to