This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 42c5d9f79 [#2644] feat(spark): Involve shuffle failure into the event
logs (#2645)
42c5d9f79 is described below
commit 42c5d9f79b3fc90b9bdd0a9058c0954dbc9d5510
Author: Junfan Zhang <[email protected]>
AuthorDate: Mon Oct 20 17:49:28 2025 +0800
[#2644] feat(spark): Involve shuffle failure into the event logs (#2645)
### What changes were proposed in this pull request?
Involve shuffle failure into the event logs
### Why are the changes needed?
for #2644
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Internal job tests
---
.../shuffle/events/TaskShuffleReadInfoEvent.java | 9 +++++++-
.../shuffle/events/TaskShuffleWriteInfoEvent.java | 9 +++++++-
.../shuffle/manager/ShuffleManagerGrpcService.java | 6 ++++--
.../scala/org/apache/spark/UniffleListener.scala | 20 +++++++++++++++++
.../org/apache/spark/UniffleStatusStore.scala | 25 ++++++++++++++++++++++
.../scala/org/apache/spark/ui/ShufflePage.scala | 14 +++++++++++-
.../spark/shuffle/reader/RssShuffleReader.java | 3 ++-
.../spark/shuffle/writer/RssShuffleWriter.java | 3 ++-
.../request/RssReportShuffleReadMetricRequest.java | 6 +++++-
.../RssReportShuffleWriteMetricRequest.java | 6 +++++-
proto/src/main/proto/Rss.proto | 2 ++
11 files changed, 94 insertions(+), 9 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleReadInfoEvent.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleReadInfoEvent.java
index fb897ee72..5d03b9659 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleReadInfoEvent.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleReadInfoEvent.java
@@ -29,6 +29,7 @@ public class TaskShuffleReadInfoEvent extends UniffleEvent {
private boolean isShuffleReadFailed;
private String failureReason;
private ShuffleReadTimes shuffleReadTimes;
+ private long taskAttemptNumber;
public TaskShuffleReadInfoEvent(
int stageId,
@@ -37,7 +38,8 @@ public class TaskShuffleReadInfoEvent extends UniffleEvent {
Map<String, ShuffleReadMetric> metrics,
boolean isShuffleReadFailed,
String failureReason,
- ShuffleReadTimes shuffleReadTimes) {
+ ShuffleReadTimes shuffleReadTimes,
+ long taskAttemptNumber) {
this.stageId = stageId;
this.shuffleId = shuffleId;
this.taskId = taskId;
@@ -45,6 +47,7 @@ public class TaskShuffleReadInfoEvent extends UniffleEvent {
this.isShuffleReadFailed = isShuffleReadFailed;
this.failureReason = failureReason;
this.shuffleReadTimes = shuffleReadTimes;
+ this.taskAttemptNumber = taskAttemptNumber;
}
public int getStageId() {
@@ -74,4 +77,8 @@ public class TaskShuffleReadInfoEvent extends UniffleEvent {
public ShuffleReadTimes getShuffleReadTimes() {
return shuffleReadTimes;
}
+
+ public long getTaskAttemptNumber() {
+ return taskAttemptNumber;
+ }
}
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleWriteInfoEvent.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleWriteInfoEvent.java
index 77fff3eda..4b48cc342 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleWriteInfoEvent.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleWriteInfoEvent.java
@@ -28,6 +28,7 @@ public class TaskShuffleWriteInfoEvent extends UniffleEvent {
private boolean isShuffleWriteFailed;
private String failureReason;
private long uncompressedByteSize;
+ private long taskAttemptNumber;
public TaskShuffleWriteInfoEvent(
int stageId,
@@ -37,7 +38,8 @@ public class TaskShuffleWriteInfoEvent extends UniffleEvent {
ShuffleWriteTimes writeTimes,
boolean isShuffleWriteFailed,
String failureReason,
- long uncompressedByteSize) {
+ long uncompressedByteSize,
+ long taskAttemptNumber) {
this.stageId = stageId;
this.shuffleId = shuffleId;
this.taskId = taskId;
@@ -46,6 +48,7 @@ public class TaskShuffleWriteInfoEvent extends UniffleEvent {
this.isShuffleWriteFailed = isShuffleWriteFailed;
this.failureReason = failureReason;
this.uncompressedByteSize = uncompressedByteSize;
+ this.taskAttemptNumber = taskAttemptNumber;
}
public int getStageId() {
@@ -79,4 +82,8 @@ public class TaskShuffleWriteInfoEvent extends UniffleEvent {
public long getUncompressedByteSize() {
return uncompressedByteSize;
}
+
+ public long getTaskAttemptNumber() {
+ return taskAttemptNumber;
+ }
}
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
index 9a7b48eb4..6c14fe929 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
@@ -735,7 +735,8 @@ public class ShuffleManagerGrpcService extends
ShuffleManagerImplBase {
ShuffleWriteTimes.fromProto(request.getShuffleWriteTimes()),
request.getIsTaskWriteFailed(),
request.getShuffleWriteFailureReason(),
- request.getUncompressedByteSize());
+ request.getUncompressedByteSize(),
+ request.getTaskAttemptNumber());
RssSparkShuffleUtils.getActiveSparkContext().listenerBus().post(event);
RssProtos.ReportShuffleWriteMetricResponse reply =
RssProtos.ReportShuffleWriteMetricResponse.newBuilder()
@@ -770,7 +771,8 @@ public class ShuffleManagerGrpcService extends
ShuffleManagerImplBase {
x.getValue().getHadoopByteSize()))),
request.getIsTaskReadFailed(),
request.getShuffleReadFailureReason(),
- ShuffleReadTimes.fromProto(request.getShuffleReadTimes()));
+ ShuffleReadTimes.fromProto(request.getShuffleReadTimes()),
+ request.getTaskAttemptNumber());
RssSparkShuffleUtils.getActiveSparkContext().listenerBus().post(event);
RssProtos.ReportShuffleReadMetricResponse reply =
RssProtos.ReportShuffleReadMetricResponse.newBuilder()
diff --git
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
index 94fd0cc81..92187912f 100644
---
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
+++
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
@@ -31,6 +31,9 @@ import scala.collection.JavaConverters.mapAsScalaMapConverter
class UniffleListener(conf: SparkConf, kvstore: ElementTrackingStore)
extends SparkListener with Logging {
+ private val writeTaskInfo = ShuffleTaskSummary(shuffleType =
ShuffleType.WRITE)
+ private val readTaskInfo = ShuffleTaskSummary(shuffleType = ShuffleType.READ)
+
private val aggregatedShuffleReadTimes = new ShuffleReadTimes()
private val aggregatedShuffleWriteTimes = new ShuffleWriteTimes()
private val aggregatedShuffleWriteMetric = new ConcurrentHashMap[String,
AggregatedShuffleWriteMetric]
@@ -70,6 +73,8 @@ class UniffleListener(conf: SparkConf, kvstore:
ElementTrackingStore)
kvstore.write(
AggregatedShuffleReadTimesUIData(aggregatedShuffleReadTimes)
)
+ kvstore.write(writeTaskInfo)
+ kvstore.write(readTaskInfo)
}
}
@@ -122,6 +127,14 @@ class UniffleListener(conf: SparkConf, kvstore:
ElementTrackingStore)
}
aggregatedShuffleWriteTimes.inc(event.getWriteTimes)
totalUncompressedShuffleBytes.addAndGet(event.getUncompressedByteSize)
+
+ if (event.isShuffleWriteFailed) {
+ writeTaskInfo.failedTaskNumber += 1
+ if (event.getTaskAttemptNumber >
writeTaskInfo.failedTaskMaxAttemptNumber) {
+ writeTaskInfo.failedTaskMaxAttemptNumber = event.getTaskAttemptNumber
+ }
+ writeTaskInfo.failureReasons.add(event.getFailureReason)
+ }
}
private def onTaskShuffleReadInfo(event: TaskShuffleReadInfoEvent): Unit = {
@@ -143,6 +156,13 @@ class UniffleListener(conf: SparkConf, kvstore:
ElementTrackingStore)
agg_metric.hadoopDurationMillis += rmetric.getHadoopDurationMillis
}
aggregatedShuffleReadTimes.merge(event.getShuffleReadTimes)
+
+ if (event.isShuffleReadFailed) {
+ readTaskInfo.failedTaskNumber += 1
+ if (event.getTaskAttemptNumber >
readTaskInfo.failedTaskMaxAttemptNumber) {}
+ readTaskInfo.failedTaskMaxAttemptNumber = event.getTaskAttemptNumber
+ }
+ readTaskInfo.failureReasons.add(event.getFailureReason)
}
override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
diff --git
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
index 098013e05..e95758297 100644
---
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
+++
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
@@ -26,12 +26,22 @@ import org.apache.uniffle.common.ShuffleReadTimes
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters.asScalaIteratorConverter
+import scala.collection.mutable
class UniffleStatusStore(store: KVStore) {
private def viewToSeq[T](view: KVStoreView[T]): Seq[T] = {
Utils.tryWithResource(view.closeableIterator())(iter =>
iter.asScala.toList)
}
+ def shuffleTaskSummary(shuffleType: ShuffleType): ShuffleTaskSummary = {
+ val kClass = classOf[ShuffleTaskSummary]
+ try {
+ store.read(kClass, s"${kClass.getName}_$shuffleType")
+ } catch {
+ case _: NoSuchElementException => new ShuffleTaskSummary(shuffleType =
shuffleType)
+ }
+ }
+
def uniffleProperties(): UniffleProperties = {
val kClass = classOf[UniffleProperties]
try {
@@ -184,4 +194,19 @@ case class ReassignInfoUIData(event:
TaskReassignInfoEvent) {
@JsonIgnore
@KVIndex
def id: String = classOf[ReassignInfoUIData].getName()
+}
+
+sealed abstract class ShuffleType private ()
+object ShuffleType {
+ val READ: ShuffleType = new ShuffleType {}
+ val WRITE: ShuffleType = new ShuffleType {}
+}
+
+case class ShuffleTaskSummary(shuffleType: ShuffleType,
+ var failureReasons: mutable.HashSet[String] =
new mutable.HashSet[String](),
+ var failedTaskNumber: Long = -1,
+ var failedTaskMaxAttemptNumber: Long = -1) {
+ @JsonIgnore
+ @KVIndex
+ def id: String =
s"${classOf[ShuffleTaskSummary].getName}_${shuffleType.toString}"
}
\ No newline at end of file
diff --git
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
index 46359e1e6..4667775f6 100644
---
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
+++
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
@@ -20,7 +20,7 @@ package org.apache.spark.ui
import org.apache.spark.internal.Logging
import org.apache.spark.shuffle.events.ShuffleWriteTimes
import org.apache.spark.util.Utils
-import org.apache.spark.{AggregatedShuffleMetric, AggregatedShuffleReadMetric,
AggregatedShuffleWriteMetric, AggregatedTaskInfoUIData}
+import org.apache.spark.{AggregatedShuffleMetric, AggregatedShuffleReadMetric,
AggregatedShuffleWriteMetric, AggregatedTaskInfoUIData, ShuffleType}
import java.util.concurrent.ConcurrentHashMap
import javax.servlet.http.HttpServletRequest
@@ -139,6 +139,10 @@ class ShufflePage(parent: ShuffleTab) extends
WebUIPage("") with Logging {
// reassign info
val reassignInfo = runtimeStatusStore.reassignInfo().event
+ // task failure summary
+ val writeSummary = runtimeStatusStore.shuffleTaskSummary(shuffleType =
ShuffleType.WRITE)
+ val readSummary = runtimeStatusStore.shuffleTaskSummary(shuffleType =
ShuffleType.READ)
+
// render build info
val buildInfo = runtimeStatusStore.buildInfo()
val buildInfoTableUI = UIUtils.listingTable(
@@ -344,6 +348,14 @@ class ShufflePage(parent: ShuffleTab) extends
WebUIPage("") with Logging {
</a>
partitionSplit={reassignInfo.isReassignTriggeredOnPartitionSplit},
blockSentFailure={reassignInfo.isReassignTriggeredOnBlockSendFailure},
stageRetry={reassignInfo.isReassignTriggeredOnStageRetry}
</li>
+
+ <li>
+ <a>
+ <strong>Shuffle Task Failure Summary (failure
write/read):</strong>
+ </a>
+ {writeSummary.failedTaskNumber > 0} /
{readSummary.failedTaskNumber > 0}
+ </li>
+
</ul>
</div>
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
index 93584e124..b0a25efd8 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
@@ -414,7 +414,8 @@ public class RssShuffleReader<K, C> implements
ShuffleReader<K, C> {
x.getValue().getHadoopReadLocalFileBytes()))),
isShuffleReadFailed,
shuffleReadReason,
- shuffleReadTimes));
+ shuffleReadTimes,
+ context.attemptNumber()));
if (response != null && response.getStatusCode() !=
StatusCode.SUCCESS) {
LOG.error("Errors on reporting shuffle read metrics to driver");
}
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index f1d910dc5..e6c48158e 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -980,7 +980,8 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
writeTimes,
isShuffleWriteFailed,
shuffleWriteFailureReason,
- bufferManager.getUncompressedDataLen()));
+ bufferManager.getUncompressedDataLen(),
+ taskContext.attemptNumber()));
if (response.getStatusCode() != StatusCode.SUCCESS) {
LOG.error(
"Errors on reporting shuffle write metrics to driver.
status_code: {}",
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java
index e88ea097d..e1fcb78c7 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java
@@ -32,6 +32,7 @@ public class RssReportShuffleReadMetricRequest {
private boolean isShuffleReadFailed;
private Optional<String> shuffleReadReason;
private ShuffleReadTimes shuffleReadTimes;
+ private long taskAttemptNumber;
public RssReportShuffleReadMetricRequest(
int stageId,
@@ -40,7 +41,8 @@ public class RssReportShuffleReadMetricRequest {
Map<String, TaskShuffleReadMetric> metrics,
boolean isShuffleReadFailed,
Optional<String> shuffleReadReason,
- ShuffleReadTimes shuffleReadTimes) {
+ ShuffleReadTimes shuffleReadTimes,
+ long taskAttemptNumber) {
this.stageId = stageId;
this.shuffleId = shuffleId;
this.taskId = taskId;
@@ -48,6 +50,7 @@ public class RssReportShuffleReadMetricRequest {
this.isShuffleReadFailed = isShuffleReadFailed;
this.shuffleReadReason = shuffleReadReason;
this.shuffleReadTimes = shuffleReadTimes;
+ this.taskAttemptNumber = taskAttemptNumber;
}
public RssProtos.ReportShuffleReadMetricRequest toProto() {
@@ -61,6 +64,7 @@ public class RssReportShuffleReadMetricRequest {
.setIsTaskReadFailed(request.isShuffleReadFailed)
.setShuffleReadFailureReason(request.shuffleReadReason.orElse(""))
.setShuffleReadTimes(shuffleReadTimes.toProto())
+ .setTaskAttemptNumber(taskAttemptNumber)
.putAllMetrics(
request.metrics.entrySet().stream()
.collect(
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteMetricRequest.java
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteMetricRequest.java
index 5c3092ca1..83c0797f1 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteMetricRequest.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteMetricRequest.java
@@ -27,6 +27,7 @@ public class RssReportShuffleWriteMetricRequest {
private int stageId;
private int shuffleId;
private long taskId;
+ private long taskAttemptNumber;
private Map<String, TaskShuffleWriteMetric> metrics;
private TaskShuffleWriteTimes writeTimes;
@@ -43,7 +44,8 @@ public class RssReportShuffleWriteMetricRequest {
TaskShuffleWriteTimes writeTimes,
boolean isShuffleWriteFailed,
Optional<String> shuffleWriteFailureReason,
- long uncompressedByteSize) {
+ long uncompressedByteSize,
+ long taskAttemptNumber) {
this.stageId = stageId;
this.shuffleId = shuffleId;
this.taskId = taskId;
@@ -52,6 +54,7 @@ public class RssReportShuffleWriteMetricRequest {
this.isShuffleWriteFailed = isShuffleWriteFailed;
this.shuffleWriteFailureReason = shuffleWriteFailureReason;
this.uncompressedByteSize = uncompressedByteSize;
+ this.taskAttemptNumber = taskAttemptNumber;
}
public RssProtos.ReportShuffleWriteMetricRequest toProto() {
@@ -66,6 +69,7 @@ public class RssReportShuffleWriteMetricRequest {
.setIsTaskWriteFailed(isShuffleWriteFailed)
.setShuffleWriteFailureReason(shuffleWriteFailureReason.orElse(""))
.setUncompressedByteSize(request.uncompressedByteSize)
+ .setTaskAttemptNumber(taskAttemptNumber)
.putAllMetrics(
request.metrics.entrySet().stream()
.collect(
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 0da250345..fa869ac48 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -600,6 +600,7 @@ message ReportShuffleWriteMetricRequest {
bool isTaskWriteFailed = 6;
string shuffleWriteFailureReason = 7;
int64 uncompressedByteSize = 8;
+ int64 taskAttemptNumber = 9;
}
message ShuffleWriteTimes {
@@ -648,6 +649,7 @@ message ReportShuffleReadMetricRequest {
bool isTaskReadFailed = 5;
string shuffleReadFailureReason = 6;
ShuffleReadTimes shuffleReadTimes = 7;
+ int64 taskAttemptNumber = 8;
}
message ShuffleReadTimes {