This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 42c5d9f79 [#2644] feat(spark): Involve shuffle failure into the event 
logs (#2645)
42c5d9f79 is described below

commit 42c5d9f79b3fc90b9bdd0a9058c0954dbc9d5510
Author: Junfan Zhang <[email protected]>
AuthorDate: Mon Oct 20 17:49:28 2025 +0800

    [#2644] feat(spark): Involve shuffle failure into the event logs (#2645)
    
    ### What changes were proposed in this pull request?
    
    Involve shuffle failure into the event logs
    
    ### Why are the changes needed?
    
    for #2644
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Internal job tests
---
 .../shuffle/events/TaskShuffleReadInfoEvent.java   |  9 +++++++-
 .../shuffle/events/TaskShuffleWriteInfoEvent.java  |  9 +++++++-
 .../shuffle/manager/ShuffleManagerGrpcService.java |  6 ++++--
 .../scala/org/apache/spark/UniffleListener.scala   | 20 +++++++++++++++++
 .../org/apache/spark/UniffleStatusStore.scala      | 25 ++++++++++++++++++++++
 .../scala/org/apache/spark/ui/ShufflePage.scala    | 14 +++++++++++-
 .../spark/shuffle/reader/RssShuffleReader.java     |  3 ++-
 .../spark/shuffle/writer/RssShuffleWriter.java     |  3 ++-
 .../request/RssReportShuffleReadMetricRequest.java |  6 +++++-
 .../RssReportShuffleWriteMetricRequest.java        |  6 +++++-
 proto/src/main/proto/Rss.proto                     |  2 ++
 11 files changed, 94 insertions(+), 9 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleReadInfoEvent.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleReadInfoEvent.java
index fb897ee72..5d03b9659 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleReadInfoEvent.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleReadInfoEvent.java
@@ -29,6 +29,7 @@ public class TaskShuffleReadInfoEvent extends UniffleEvent {
   private boolean isShuffleReadFailed;
   private String failureReason;
   private ShuffleReadTimes shuffleReadTimes;
+  private long taskAttemptNumber;
 
   public TaskShuffleReadInfoEvent(
       int stageId,
@@ -37,7 +38,8 @@ public class TaskShuffleReadInfoEvent extends UniffleEvent {
       Map<String, ShuffleReadMetric> metrics,
       boolean isShuffleReadFailed,
       String failureReason,
-      ShuffleReadTimes shuffleReadTimes) {
+      ShuffleReadTimes shuffleReadTimes,
+      long taskAttemptNumber) {
     this.stageId = stageId;
     this.shuffleId = shuffleId;
     this.taskId = taskId;
@@ -45,6 +47,7 @@ public class TaskShuffleReadInfoEvent extends UniffleEvent {
     this.isShuffleReadFailed = isShuffleReadFailed;
     this.failureReason = failureReason;
     this.shuffleReadTimes = shuffleReadTimes;
+    this.taskAttemptNumber = taskAttemptNumber;
   }
 
   public int getStageId() {
@@ -74,4 +77,8 @@ public class TaskShuffleReadInfoEvent extends UniffleEvent {
   public ShuffleReadTimes getShuffleReadTimes() {
     return shuffleReadTimes;
   }
+
+  public long getTaskAttemptNumber() {
+    return taskAttemptNumber;
+  }
 }
diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleWriteInfoEvent.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleWriteInfoEvent.java
index 77fff3eda..4b48cc342 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleWriteInfoEvent.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleWriteInfoEvent.java
@@ -28,6 +28,7 @@ public class TaskShuffleWriteInfoEvent extends UniffleEvent {
   private boolean isShuffleWriteFailed;
   private String failureReason;
   private long uncompressedByteSize;
+  private long taskAttemptNumber;
 
   public TaskShuffleWriteInfoEvent(
       int stageId,
@@ -37,7 +38,8 @@ public class TaskShuffleWriteInfoEvent extends UniffleEvent {
       ShuffleWriteTimes writeTimes,
       boolean isShuffleWriteFailed,
       String failureReason,
-      long uncompressedByteSize) {
+      long uncompressedByteSize,
+      long taskAttemptNumber) {
     this.stageId = stageId;
     this.shuffleId = shuffleId;
     this.taskId = taskId;
@@ -46,6 +48,7 @@ public class TaskShuffleWriteInfoEvent extends UniffleEvent {
     this.isShuffleWriteFailed = isShuffleWriteFailed;
     this.failureReason = failureReason;
     this.uncompressedByteSize = uncompressedByteSize;
+    this.taskAttemptNumber = taskAttemptNumber;
   }
 
   public int getStageId() {
@@ -79,4 +82,8 @@ public class TaskShuffleWriteInfoEvent extends UniffleEvent {
   public long getUncompressedByteSize() {
     return uncompressedByteSize;
   }
+
+  public long getTaskAttemptNumber() {
+    return taskAttemptNumber;
+  }
 }
diff --git 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
index 9a7b48eb4..6c14fe929 100644
--- 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
+++ 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
@@ -735,7 +735,8 @@ public class ShuffleManagerGrpcService extends 
ShuffleManagerImplBase {
             ShuffleWriteTimes.fromProto(request.getShuffleWriteTimes()),
             request.getIsTaskWriteFailed(),
             request.getShuffleWriteFailureReason(),
-            request.getUncompressedByteSize());
+            request.getUncompressedByteSize(),
+            request.getTaskAttemptNumber());
     RssSparkShuffleUtils.getActiveSparkContext().listenerBus().post(event);
     RssProtos.ReportShuffleWriteMetricResponse reply =
         RssProtos.ReportShuffleWriteMetricResponse.newBuilder()
@@ -770,7 +771,8 @@ public class ShuffleManagerGrpcService extends 
ShuffleManagerImplBase {
                                 x.getValue().getHadoopByteSize()))),
             request.getIsTaskReadFailed(),
             request.getShuffleReadFailureReason(),
-            ShuffleReadTimes.fromProto(request.getShuffleReadTimes()));
+            ShuffleReadTimes.fromProto(request.getShuffleReadTimes()),
+            request.getTaskAttemptNumber());
     RssSparkShuffleUtils.getActiveSparkContext().listenerBus().post(event);
     RssProtos.ReportShuffleReadMetricResponse reply =
         RssProtos.ReportShuffleReadMetricResponse.newBuilder()
diff --git 
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala 
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
index 94fd0cc81..92187912f 100644
--- 
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
+++ 
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
@@ -31,6 +31,9 @@ import scala.collection.JavaConverters.mapAsScalaMapConverter
 class UniffleListener(conf: SparkConf, kvstore: ElementTrackingStore)
   extends SparkListener with Logging {
 
+  private val writeTaskInfo = ShuffleTaskSummary(shuffleType = 
ShuffleType.WRITE)
+  private val readTaskInfo = ShuffleTaskSummary(shuffleType = ShuffleType.READ)
+
   private val aggregatedShuffleReadTimes = new ShuffleReadTimes()
   private val aggregatedShuffleWriteTimes = new ShuffleWriteTimes()
   private val aggregatedShuffleWriteMetric = new ConcurrentHashMap[String, 
AggregatedShuffleWriteMetric]
@@ -70,6 +73,8 @@ class UniffleListener(conf: SparkConf, kvstore: 
ElementTrackingStore)
       kvstore.write(
         AggregatedShuffleReadTimesUIData(aggregatedShuffleReadTimes)
       )
+      kvstore.write(writeTaskInfo)
+      kvstore.write(readTaskInfo)
     }
   }
 
@@ -122,6 +127,14 @@ class UniffleListener(conf: SparkConf, kvstore: 
ElementTrackingStore)
     }
     aggregatedShuffleWriteTimes.inc(event.getWriteTimes)
     totalUncompressedShuffleBytes.addAndGet(event.getUncompressedByteSize)
+
+    if (event.isShuffleWriteFailed) {
+      writeTaskInfo.failedTaskNumber += 1
+      if (event.getTaskAttemptNumber > 
writeTaskInfo.failedTaskMaxAttemptNumber) {
+        writeTaskInfo.failedTaskMaxAttemptNumber = event.getTaskAttemptNumber
+      }
+      writeTaskInfo.failureReasons.add(event.getFailureReason)
+    }
   }
 
   private def onTaskShuffleReadInfo(event: TaskShuffleReadInfoEvent): Unit = {
@@ -143,6 +156,13 @@ class UniffleListener(conf: SparkConf, kvstore: 
ElementTrackingStore)
       agg_metric.hadoopDurationMillis += rmetric.getHadoopDurationMillis
     }
     aggregatedShuffleReadTimes.merge(event.getShuffleReadTimes)
+
+    if (event.isShuffleReadFailed) {
+      readTaskInfo.failedTaskNumber += 1
+      if (event.getTaskAttemptNumber > 
readTaskInfo.failedTaskMaxAttemptNumber) {}
+      readTaskInfo.failedTaskMaxAttemptNumber = event.getTaskAttemptNumber
+    }
+    readTaskInfo.failureReasons.add(event.getFailureReason)
   }
 
   override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
diff --git 
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
 
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
index 098013e05..e95758297 100644
--- 
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
+++ 
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
@@ -26,12 +26,22 @@ import org.apache.uniffle.common.ShuffleReadTimes
 
 import java.util.concurrent.ConcurrentHashMap
 import scala.collection.JavaConverters.asScalaIteratorConverter
+import scala.collection.mutable
 
 class UniffleStatusStore(store: KVStore) {
   private def viewToSeq[T](view: KVStoreView[T]): Seq[T] = {
     Utils.tryWithResource(view.closeableIterator())(iter => 
iter.asScala.toList)
   }
 
+  def shuffleTaskSummary(shuffleType: ShuffleType): ShuffleTaskSummary = {
+    val kClass = classOf[ShuffleTaskSummary]
+    try {
+      store.read(kClass, s"${kClass.getName}_$shuffleType")
+    } catch {
+      case _: NoSuchElementException => new ShuffleTaskSummary(shuffleType = 
shuffleType)
+    }
+  }
+
   def uniffleProperties(): UniffleProperties = {
     val kClass = classOf[UniffleProperties]
     try {
@@ -184,4 +194,19 @@ case class ReassignInfoUIData(event: 
TaskReassignInfoEvent) {
   @JsonIgnore
   @KVIndex
   def id: String = classOf[ReassignInfoUIData].getName()
+}
+
+sealed abstract class ShuffleType private ()
+object ShuffleType {
+  val READ: ShuffleType = new ShuffleType {}
+  val WRITE: ShuffleType = new ShuffleType {}
+}
+
+case class ShuffleTaskSummary(shuffleType: ShuffleType,
+                              var failureReasons: mutable.HashSet[String] = 
new mutable.HashSet[String](),
+                              var failedTaskNumber: Long = -1,
+                              var failedTaskMaxAttemptNumber: Long = -1) {
+  @JsonIgnore
+  @KVIndex
+  def id: String = 
s"${classOf[ShuffleTaskSummary].getName}_${shuffleType.toString}"
 }
\ No newline at end of file
diff --git 
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala 
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
index 46359e1e6..4667775f6 100644
--- 
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
+++ 
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
@@ -20,7 +20,7 @@ package org.apache.spark.ui
 import org.apache.spark.internal.Logging
 import org.apache.spark.shuffle.events.ShuffleWriteTimes
 import org.apache.spark.util.Utils
-import org.apache.spark.{AggregatedShuffleMetric, AggregatedShuffleReadMetric, 
AggregatedShuffleWriteMetric, AggregatedTaskInfoUIData}
+import org.apache.spark.{AggregatedShuffleMetric, AggregatedShuffleReadMetric, 
AggregatedShuffleWriteMetric, AggregatedTaskInfoUIData, ShuffleType}
 
 import java.util.concurrent.ConcurrentHashMap
 import javax.servlet.http.HttpServletRequest
@@ -139,6 +139,10 @@ class ShufflePage(parent: ShuffleTab) extends 
WebUIPage("") with Logging {
     // reassign info
     val reassignInfo = runtimeStatusStore.reassignInfo().event
 
+    // task failure summary
+    val writeSummary = runtimeStatusStore.shuffleTaskSummary(shuffleType = 
ShuffleType.WRITE)
+    val readSummary = runtimeStatusStore.shuffleTaskSummary(shuffleType = 
ShuffleType.READ)
+
     // render build info
     val buildInfo = runtimeStatusStore.buildInfo()
     val buildInfoTableUI = UIUtils.listingTable(
@@ -344,6 +348,14 @@ class ShufflePage(parent: ShuffleTab) extends 
WebUIPage("") with Logging {
               </a>
               
partitionSplit={reassignInfo.isReassignTriggeredOnPartitionSplit}, 
blockSentFailure={reassignInfo.isReassignTriggeredOnBlockSendFailure}, 
stageRetry={reassignInfo.isReassignTriggeredOnStageRetry}
             </li>
+
+            <li>
+              <a>
+                <strong>Shuffle Task Failure Summary (failure 
write/read):</strong>
+              </a>
+              {writeSummary.failedTaskNumber > 0} / 
{readSummary.failedTaskNumber > 0}
+            </li>
+
           </ul>
         </div>
 
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
index 93584e124..b0a25efd8 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
@@ -414,7 +414,8 @@ public class RssShuffleReader<K, C> implements 
ShuffleReader<K, C> {
                                           
x.getValue().getHadoopReadLocalFileBytes()))),
                       isShuffleReadFailed,
                       shuffleReadReason,
-                      shuffleReadTimes));
+                      shuffleReadTimes,
+                      context.attemptNumber()));
           if (response != null && response.getStatusCode() != 
StatusCode.SUCCESS) {
             LOG.error("Errors on reporting shuffle read metrics to driver");
           }
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index f1d910dc5..e6c48158e 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -980,7 +980,8 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
                         writeTimes,
                         isShuffleWriteFailed,
                         shuffleWriteFailureReason,
-                        bufferManager.getUncompressedDataLen()));
+                        bufferManager.getUncompressedDataLen(),
+                        taskContext.attemptNumber()));
             if (response.getStatusCode() != StatusCode.SUCCESS) {
               LOG.error(
                   "Errors on reporting shuffle write metrics to driver. 
status_code: {}",
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java
index e88ea097d..e1fcb78c7 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java
@@ -32,6 +32,7 @@ public class RssReportShuffleReadMetricRequest {
   private boolean isShuffleReadFailed;
   private Optional<String> shuffleReadReason;
   private ShuffleReadTimes shuffleReadTimes;
+  private long taskAttemptNumber;
 
   public RssReportShuffleReadMetricRequest(
       int stageId,
@@ -40,7 +41,8 @@ public class RssReportShuffleReadMetricRequest {
       Map<String, TaskShuffleReadMetric> metrics,
       boolean isShuffleReadFailed,
       Optional<String> shuffleReadReason,
-      ShuffleReadTimes shuffleReadTimes) {
+      ShuffleReadTimes shuffleReadTimes,
+      long taskAttemptNumber) {
     this.stageId = stageId;
     this.shuffleId = shuffleId;
     this.taskId = taskId;
@@ -48,6 +50,7 @@ public class RssReportShuffleReadMetricRequest {
     this.isShuffleReadFailed = isShuffleReadFailed;
     this.shuffleReadReason = shuffleReadReason;
     this.shuffleReadTimes = shuffleReadTimes;
+    this.taskAttemptNumber = taskAttemptNumber;
   }
 
   public RssProtos.ReportShuffleReadMetricRequest toProto() {
@@ -61,6 +64,7 @@ public class RssReportShuffleReadMetricRequest {
         .setIsTaskReadFailed(request.isShuffleReadFailed)
         .setShuffleReadFailureReason(request.shuffleReadReason.orElse(""))
         .setShuffleReadTimes(shuffleReadTimes.toProto())
+        .setTaskAttemptNumber(taskAttemptNumber)
         .putAllMetrics(
             request.metrics.entrySet().stream()
                 .collect(
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteMetricRequest.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteMetricRequest.java
index 5c3092ca1..83c0797f1 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteMetricRequest.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteMetricRequest.java
@@ -27,6 +27,7 @@ public class RssReportShuffleWriteMetricRequest {
   private int stageId;
   private int shuffleId;
   private long taskId;
+  private long taskAttemptNumber;
   private Map<String, TaskShuffleWriteMetric> metrics;
   private TaskShuffleWriteTimes writeTimes;
 
@@ -43,7 +44,8 @@ public class RssReportShuffleWriteMetricRequest {
       TaskShuffleWriteTimes writeTimes,
       boolean isShuffleWriteFailed,
       Optional<String> shuffleWriteFailureReason,
-      long uncompressedByteSize) {
+      long uncompressedByteSize,
+      long taskAttemptNumber) {
     this.stageId = stageId;
     this.shuffleId = shuffleId;
     this.taskId = taskId;
@@ -52,6 +54,7 @@ public class RssReportShuffleWriteMetricRequest {
     this.isShuffleWriteFailed = isShuffleWriteFailed;
     this.shuffleWriteFailureReason = shuffleWriteFailureReason;
     this.uncompressedByteSize = uncompressedByteSize;
+    this.taskAttemptNumber = taskAttemptNumber;
   }
 
   public RssProtos.ReportShuffleWriteMetricRequest toProto() {
@@ -66,6 +69,7 @@ public class RssReportShuffleWriteMetricRequest {
         .setIsTaskWriteFailed(isShuffleWriteFailed)
         .setShuffleWriteFailureReason(shuffleWriteFailureReason.orElse(""))
         .setUncompressedByteSize(request.uncompressedByteSize)
+        .setTaskAttemptNumber(taskAttemptNumber)
         .putAllMetrics(
             request.metrics.entrySet().stream()
                 .collect(
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 0da250345..fa869ac48 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -600,6 +600,7 @@ message ReportShuffleWriteMetricRequest {
   bool isTaskWriteFailed = 6;
   string shuffleWriteFailureReason = 7;
   int64 uncompressedByteSize = 8;
+  int64 taskAttemptNumber = 9;
 }
 
 message ShuffleWriteTimes {
@@ -648,6 +649,7 @@ message ReportShuffleReadMetricRequest {
   bool isTaskReadFailed = 5;
   string shuffleReadFailureReason = 6;
   ShuffleReadTimes shuffleReadTimes = 7;
+  int64 taskAttemptNumber = 8;
 }
 
 message ShuffleReadTimes {

Reply via email to