This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e5cd616a improvement(spark): Involve shuffle result report time into 
shuffle write time metrics (#2361)
9e5cd616a is described below

commit 9e5cd616a51e06d9f4e99546253f692b4018fc09
Author: Junfan Zhang <zus...@apache.org>
AuthorDate: Fri Feb 7 17:02:24 2025 +0800

    improvement(spark): Involve shuffle result report time into shuffle write 
time metrics (#2361)
    
    ### What changes were proposed in this pull request?
    
    1. Involve shuffle result report time into shuffle write time metrics
    2. Print out all reporting to shuffle-server duration
    
    ### Why are the changes needed?
    
    To precisely calculate shuffle write time
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Needn't
    
    Co-authored-by: Junfan Zhang <zhangjun...@qiyi.com>
---
 .../spark/shuffle/writer/RssShuffleWriter.java     |  7 ++++--
 .../spark/shuffle/writer/RssShuffleWriter.java     |  6 +++--
 .../client/impl/ShuffleWriteClientImpl.java        | 29 ++++++++++------------
 3 files changed, 22 insertions(+), 20 deletions(-)

diff --git 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index aa4ff9f89..1cd8113c0 100644
--- 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -492,11 +492,14 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
             bitmapSplitNum,
             recordReportFailedShuffleservers,
             enableWriteFailureRetry);
+        long reportDuration = System.currentTimeMillis() - start;
         LOG.info(
-            "Report shuffle result for task[{}] with bitmapNum[{}] cost {} ms",
+            "Reported all shuffle result for shuffleId[{}] task[{}] with 
bitmapNum[{}] cost {} ms",
+            shuffleId,
             taskAttemptId,
             bitmapSplitNum,
-            (System.currentTimeMillis() - start));
+            reportDuration);
+        
shuffleWriteMetrics.incWriteTime(TimeUnit.MILLISECONDS.toNanos(reportDuration));
         MapStatus mapStatus = MapStatus$.MODULE$.apply(blockManagerId, 
partitionLengths);
         return Option.apply(mapStatus);
       } else {
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 48cbf3efa..09d88c1ca 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -818,12 +818,14 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
             bitmapSplitNum,
             recordReportFailedShuffleservers,
             enableWriteFailureRetry);
+        long reportDuration = System.currentTimeMillis() - start;
         LOG.info(
-            "Report shuffle result for shuffleId[{}] task[{}] with 
bitmapNum[{}] cost {} ms",
+            "Reported all shuffle result for shuffleId[{}] task[{}] with 
bitmapNum[{}] cost {} ms",
             shuffleId,
             taskAttemptId,
             bitmapSplitNum,
-            (System.currentTimeMillis() - start));
+            reportDuration);
+        
shuffleWriteMetrics.incWriteTime(TimeUnit.MILLISECONDS.toNanos(reportDuration));
         // todo: we can replace the dummy host and port with the real shuffle 
server which we prefer
         // to read
         final BlockManagerId blockManagerId =
diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index dfd3efe30..3d3bb925d 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -756,27 +756,24 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
               bitmapNum);
       ShuffleServerInfo ssi = entry.getKey();
       try {
+        long start = System.currentTimeMillis();
         RssReportShuffleResultResponse response =
             getShuffleServerClient(ssi).reportShuffleResult(request);
         if (response.getStatusCode() == StatusCode.SUCCESS) {
           LOG.info(
-              "Report shuffle result to "
-                  + ssi
-                  + " for appId["
-                  + appId
-                  + "], shuffleId["
-                  + shuffleId
-                  + "] successfully");
+              "Reported shuffle result to {} for appId[{}], shuffleId[{}] 
successfully that cost {} ms",
+              ssi,
+              appId,
+              shuffleId,
+              System.currentTimeMillis() - start);
         } else {
-          LOG.warn(
-              "Report shuffle result to "
-                  + ssi
-                  + " for appId["
-                  + appId
-                  + "], shuffleId["
-                  + shuffleId
-                  + "] failed with "
-                  + response.getStatusCode());
+          LOG.info(
+              "Reported shuffle result to {} for appId[{}], shuffleId[{}] 
failed with [{}] that cost {} ms",
+              ssi,
+              appId,
+              shuffleId,
+              response.getStatusCode(),
+              System.currentTimeMillis() - start);
           recordFailedBlockIds(blockReportTracker, requestBlockIds);
           if (enableWriteFailureRetry) {
             // The failed Shuffle Server is recorded and corresponding 
exceptions are raised only

Reply via email to