This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new d5e689c32 [#2592] fix(spark): Ignore failure when reporting shuffle 
read metrics to driver (#2593)
d5e689c32 is described below

commit d5e689c32aaaa69465fe92d0c79a773321ffd1b9
Author: Junfan Zhang <zus...@apache.org>
AuthorDate: Wed Aug 27 10:15:00 2025 +0800

    [#2592] fix(spark): Ignore failure when reporting shuffle read metrics to 
driver (#2593)
    
    ### What changes were proposed in this pull request?
    
    Ignore failure when reporting shuffle read metrics to driver
    
    ### Why are the changes needed?
    
    fix #2592
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Needn't
---
 .../spark/shuffle/reader/RssShuffleReader.java     | 52 ++++++++++++----------
 1 file changed, 28 insertions(+), 24 deletions(-)

diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
index b53d67f62..4113f0627 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
@@ -367,30 +367,34 @@ public class RssShuffleReader<K, C> implements 
ShuffleReader<K, C> {
     if (managerClientSupplier != null) {
       ShuffleManagerClient client = managerClientSupplier.get();
       if (client != null) {
-        RssReportShuffleReadMetricResponse response =
-            client.reportShuffleReadMetric(
-                new RssReportShuffleReadMetricRequest(
-                    context.stageId(),
-                    shuffleId,
-                    context.taskAttemptId(),
-                    shuffleServerReadCostTracker.list().entrySet().stream()
-                        .collect(
-                            Collectors.toMap(
-                                Map.Entry::getKey,
-                                x ->
-                                    new 
RssReportShuffleReadMetricRequest.TaskShuffleReadMetric(
-                                        x.getValue().getDurationMillis(),
-                                        x.getValue().getReadBytes(),
-                                        
x.getValue().getMemoryReadDurationMillis(),
-                                        x.getValue().getMemoryReadBytes(),
-                                        
x.getValue().getLocalfileReadDurationMillis(),
-                                        x.getValue().getLocalfileReadBytes(),
-                                        
x.getValue().getHadoopReadLocalFileDurationMillis(),
-                                        
x.getValue().getHadoopReadLocalFileBytes()))),
-                    isShuffleReadFailed,
-                    shuffleReadReason));
-        if (response != null && response.getStatusCode() != 
StatusCode.SUCCESS) {
-          LOG.error("Errors on reporting shuffle read metrics to driver");
+        try {
+          RssReportShuffleReadMetricResponse response =
+              client.reportShuffleReadMetric(
+                  new RssReportShuffleReadMetricRequest(
+                      context.stageId(),
+                      shuffleId,
+                      context.taskAttemptId(),
+                      shuffleServerReadCostTracker.list().entrySet().stream()
+                          .collect(
+                              Collectors.toMap(
+                                  Map.Entry::getKey,
+                                  x ->
+                                      new 
RssReportShuffleReadMetricRequest.TaskShuffleReadMetric(
+                                          x.getValue().getDurationMillis(),
+                                          x.getValue().getReadBytes(),
+                                          
x.getValue().getMemoryReadDurationMillis(),
+                                          x.getValue().getMemoryReadBytes(),
+                                          
x.getValue().getLocalfileReadDurationMillis(),
+                                          x.getValue().getLocalfileReadBytes(),
+                                          
x.getValue().getHadoopReadLocalFileDurationMillis(),
+                                          
x.getValue().getHadoopReadLocalFileBytes()))),
+                      isShuffleReadFailed,
+                      shuffleReadReason));
+          if (response != null && response.getStatusCode() != 
StatusCode.SUCCESS) {
+            LOG.error("Errors on reporting shuffle read metrics to driver");
+          }
+        } catch (Exception e) {
+          LOG.error("Errors on post shuffle read metric to driver", e);
         }
       }
     }

Reply via email to