This is an automated email from the ASF dual-hosted git repository. zuston pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push: new d5e689c32 [#2592] fix(spark): Ignore failure when reporting shuffle read metrics to driver (#2593) d5e689c32 is described below commit d5e689c32aaaa69465fe92d0c79a773321ffd1b9 Author: Junfan Zhang <zus...@apache.org> AuthorDate: Wed Aug 27 10:15:00 2025 +0800 [#2592] fix(spark): Ignore failure when reporting shuffle read metrics to driver (#2593) ### What changes were proposed in this pull request? Ignore failure when reporting shuffle read metrics to driver ### Why are the changes needed? fix #2592 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Needn't --- .../spark/shuffle/reader/RssShuffleReader.java | 52 ++++++++++++---------- 1 file changed, 28 insertions(+), 24 deletions(-) diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java index b53d67f62..4113f0627 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java +++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java @@ -367,30 +367,34 @@ public class RssShuffleReader<K, C> implements ShuffleReader<K, C> { if (managerClientSupplier != null) { ShuffleManagerClient client = managerClientSupplier.get(); if (client != null) { - RssReportShuffleReadMetricResponse response = - client.reportShuffleReadMetric( - new RssReportShuffleReadMetricRequest( - context.stageId(), - shuffleId, - context.taskAttemptId(), - shuffleServerReadCostTracker.list().entrySet().stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, - x -> - new RssReportShuffleReadMetricRequest.TaskShuffleReadMetric( - x.getValue().getDurationMillis(), - x.getValue().getReadBytes(), - x.getValue().getMemoryReadDurationMillis(), - x.getValue().getMemoryReadBytes(), - x.getValue().getLocalfileReadDurationMillis(), - x.getValue().getLocalfileReadBytes(), - x.getValue().getHadoopReadLocalFileDurationMillis(), - x.getValue().getHadoopReadLocalFileBytes()))), - isShuffleReadFailed, - shuffleReadReason)); - if (response != null && response.getStatusCode() != StatusCode.SUCCESS) { - LOG.error("Errors on reporting shuffle read metrics to driver"); + try { + RssReportShuffleReadMetricResponse response = + client.reportShuffleReadMetric( + new RssReportShuffleReadMetricRequest( + context.stageId(), + shuffleId, + context.taskAttemptId(), + shuffleServerReadCostTracker.list().entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + x -> + new RssReportShuffleReadMetricRequest.TaskShuffleReadMetric( + x.getValue().getDurationMillis(), + x.getValue().getReadBytes(), + x.getValue().getMemoryReadDurationMillis(), + x.getValue().getMemoryReadBytes(), + x.getValue().getLocalfileReadDurationMillis(), + x.getValue().getLocalfileReadBytes(), + x.getValue().getHadoopReadLocalFileDurationMillis(), + x.getValue().getHadoopReadLocalFileBytes()))), + isShuffleReadFailed, + shuffleReadReason)); + if (response != null && response.getStatusCode() != StatusCode.SUCCESS) { + LOG.error("Errors on reporting shuffle read metrics to driver"); + } + } catch (Exception e) { + LOG.error("Errors on post shuffle read metric to driver", e); } } }