This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 1bd746886 [#2592] fix(spark): Skip failure when reporting shuffle
write metrics to driver (#2629)
1bd746886 is described below
commit 1bd7468863d1c75424c9b31f77ba9b8166933afd
Author: Junfan Zhang <[email protected]>
AuthorDate: Thu Sep 25 10:30:40 2025 +0800
[#2592] fix(spark): Skip failure when reporting shuffle write metrics to
driver (#2629)
### What changes were proposed in this pull request?
Skip failure when reporting shuffle write metrics to driver
### Why are the changes needed?
followup the PR for the #2592
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Needn't
Co-authored-by: Junfan Zhang <[email protected]>
---
.../spark/shuffle/writer/RssShuffleWriter.java | 32 +++++++++++++---------
1 file changed, 19 insertions(+), 13 deletions(-)
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 122f9548c..f1d910dc5 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -969,19 +969,25 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
bufferManager.getSortTime(),
bufferManager.getRequireMemoryTime(),
checkSendResultMills);
- RssReportShuffleWriteMetricResponse response =
- shuffleManagerClient.reportShuffleWriteMetric(
- new RssReportShuffleWriteMetricRequest(
- taskContext.stageId(),
- shuffleId,
- taskContext.taskAttemptId(),
-
bufferManager.getShuffleServerPushCostTracker().toMetric(),
- writeTimes,
- isShuffleWriteFailed,
- shuffleWriteFailureReason,
- bufferManager.getUncompressedDataLen()));
- if (response.getStatusCode() != StatusCode.SUCCESS) {
- LOG.error("Errors on reporting shuffle write metrics to driver");
+ try {
+ RssReportShuffleWriteMetricResponse response =
+ shuffleManagerClient.reportShuffleWriteMetric(
+ new RssReportShuffleWriteMetricRequest(
+ taskContext.stageId(),
+ shuffleId,
+ taskContext.taskAttemptId(),
+
bufferManager.getShuffleServerPushCostTracker().toMetric(),
+ writeTimes,
+ isShuffleWriteFailed,
+ shuffleWriteFailureReason,
+ bufferManager.getUncompressedDataLen()));
+ if (response.getStatusCode() != StatusCode.SUCCESS) {
+ LOG.error(
+ "Errors on reporting shuffle write metrics to driver.
status_code: {}",
+ response.getStatusCode());
+ }
+ } catch (Exception e) {
+ LOG.error("Errors on reporting shuffle write metrics to driver",
e);
}
}
}