This is an automated email from the ASF dual-hosted git repository. zuston pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push: new 9e5cd616a improvement(spark): Involve shuffle result report time into shuffle write time metrics (#2361) 9e5cd616a is described below commit 9e5cd616a51e06d9f4e99546253f692b4018fc09 Author: Junfan Zhang <zus...@apache.org> AuthorDate: Fri Feb 7 17:02:24 2025 +0800 improvement(spark): Involve shuffle result report time into shuffle write time metrics (#2361) ### What changes were proposed in this pull request? 1. Involve shuffle result report time into shuffle write time metrics 2. Print out all reporting to shuffle-server duration ### Why are the changes needed? To precisely calculate shuffle write time ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Needn't Co-authored-by: Junfan Zhang <zhangjun...@qiyi.com> --- .../spark/shuffle/writer/RssShuffleWriter.java | 7 ++++-- .../spark/shuffle/writer/RssShuffleWriter.java | 6 +++-- .../client/impl/ShuffleWriteClientImpl.java | 29 ++++++++++------------ 3 files changed, 22 insertions(+), 20 deletions(-) diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java index aa4ff9f89..1cd8113c0 100644 --- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java +++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java @@ -492,11 +492,14 @@ public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> { bitmapSplitNum, recordReportFailedShuffleservers, enableWriteFailureRetry); + long reportDuration = System.currentTimeMillis() - start; LOG.info( - "Report shuffle result for task[{}] with bitmapNum[{}] cost {} ms", + "Reported all shuffle result for shuffleId[{}] task[{}] with bitmapNum[{}] cost {} ms", + shuffleId, taskAttemptId, bitmapSplitNum, - (System.currentTimeMillis() - start)); + reportDuration); + shuffleWriteMetrics.incWriteTime(TimeUnit.MILLISECONDS.toNanos(reportDuration)); MapStatus mapStatus = MapStatus$.MODULE$.apply(blockManagerId, partitionLengths); return Option.apply(mapStatus); } else { diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java index 48cbf3efa..09d88c1ca 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java +++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java @@ -818,12 +818,14 @@ public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> { bitmapSplitNum, recordReportFailedShuffleservers, enableWriteFailureRetry); + long reportDuration = System.currentTimeMillis() - start; LOG.info( - "Report shuffle result for shuffleId[{}] task[{}] with bitmapNum[{}] cost {} ms", + "Reported all shuffle result for shuffleId[{}] task[{}] with bitmapNum[{}] cost {} ms", shuffleId, taskAttemptId, bitmapSplitNum, - (System.currentTimeMillis() - start)); + reportDuration); + shuffleWriteMetrics.incWriteTime(TimeUnit.MILLISECONDS.toNanos(reportDuration)); // todo: we can replace the dummy host and port with the real shuffle server which we prefer // to read final BlockManagerId blockManagerId = diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java index dfd3efe30..3d3bb925d 100644 --- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java +++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java @@ -756,27 +756,24 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient { bitmapNum); ShuffleServerInfo ssi = entry.getKey(); try { + long start = System.currentTimeMillis(); RssReportShuffleResultResponse response = getShuffleServerClient(ssi).reportShuffleResult(request); if (response.getStatusCode() == StatusCode.SUCCESS) { LOG.info( - "Report shuffle result to " - + ssi - + " for appId[" - + appId - + "], shuffleId[" - + shuffleId - + "] successfully"); + "Reported shuffle result to {} for appId[{}], shuffleId[{}] successfully that cost {} ms", + ssi, + appId, + shuffleId, + System.currentTimeMillis() - start); } else { - LOG.warn( - "Report shuffle result to " - + ssi - + " for appId[" - + appId - + "], shuffleId[" - + shuffleId - + "] failed with " - + response.getStatusCode()); + LOG.info( + "Reported shuffle result to {} for appId[{}], shuffleId[{}] failed with [{}] that cost {} ms", + ssi, + appId, + shuffleId, + response.getStatusCode(), + System.currentTimeMillis() - start); recordFailedBlockIds(blockReportTracker, requestBlockIds); if (enableWriteFailureRetry) { // The failed Shuffle Server is recorded and corresponding exceptions are raised only