This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 717427553 [CELEBORN-1914] incWriteTime when ShuffleWriter invoke 
pushGiantRecord
717427553 is described below

commit 71742755337fa0407e4e789d66b2c36b80019d1e
Author: TheodoreLx <[email protected] >
AuthorDate: Tue Mar 18 15:38:36 2025 +0800

    [CELEBORN-1914] incWriteTime when ShuffleWriter invoke pushGiantRecord
    
    ### What changes were proposed in this pull request?
    
    incWriteTime when ShuffleWriter invoke pushGiantRecord
    
    ### Why are the changes needed?
    When ShuffleWriter calls pushGiantRecord, the task thread needs to wait 
synchronously for the push to complete, just like pushing the last part of data 
in the close method. The time spent on pushGiantRecord needs to be included in 
writeTime.
    
    ### Does this PR introduce _any_ user-facing change?
    no
    
    ### How was this patch tested?
    ut
    
    Closes #3155 from TheodoreLx/fix-write-time.
    
    Authored-by: TheodoreLx <[email protected] >
    Signed-off-by: mingji <[email protected]>
---
 .../java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java | 3 +++
 .../java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java | 3 +++
 .../apache/spark/shuffle/celeborn/CelebornShuffleWriterSuiteBase.java  | 1 +
 3 files changed, 7 insertions(+)

diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
index 4c5e6739b..0202f2f5f 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
@@ -277,6 +277,7 @@ public class HashBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
 
   protected void pushGiantRecord(int partitionId, byte[] buffer, int numBytes) 
throws IOException {
     logger.debug("Push giant record, size {}.", numBytes);
+    long start = System.nanoTime();
     int bytesWritten =
         shuffleClient.pushData(
             shuffleId,
@@ -288,8 +289,10 @@ public class HashBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
             numBytes,
             numMappers,
             numPartitions);
+    long delta = System.nanoTime() - start;
     mapStatusLengths[partitionId].add(bytesWritten);
     writeMetrics.incBytesWritten(bytesWritten);
+    writeMetrics.incWriteTime(delta);
   }
 
   private int getOrUpdateOffset(int partitionId, int serializedRecordSize)
diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
index 5717910ee..3346deb2a 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
@@ -346,6 +346,7 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
 
   private void pushGiantRecord(int partitionId, byte[] buffer, int numBytes) 
throws IOException {
     logger.debug("Push giant record, size {}.", Utils.bytesToString(numBytes));
+    long start = System.nanoTime();
     int bytesWritten =
         shuffleClient.pushData(
             shuffleId,
@@ -357,8 +358,10 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
             numBytes,
             numMappers,
             numPartitions);
+    long delta = System.nanoTime() - start;
     mapStatusLengths[partitionId].add(bytesWritten);
     writeMetrics.incBytesWritten(bytesWritten);
+    writeMetrics.incWriteTime(delta);
   }
 
   private void cleanupPusher() throws IOException {
diff --git 
a/client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/CelebornShuffleWriterSuiteBase.java
 
b/client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/CelebornShuffleWriterSuiteBase.java
index 33eedd9d4..62d31ee6d 100644
--- 
a/client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/CelebornShuffleWriterSuiteBase.java
+++ 
b/client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/CelebornShuffleWriterSuiteBase.java
@@ -268,6 +268,7 @@ public abstract class CelebornShuffleWriterSuiteBase {
     ShuffleWriteMetrics metrics = 
taskContext.taskMetrics().shuffleWriteMetrics();
     assertEquals(metrics.recordsWritten(), total.intValue());
     assertEquals(metrics.bytesWritten(), tempFile.length());
+    assertTrue(metrics.writeTime() > 0);
 
     try (FileInputStream fis = new FileInputStream(tempFile)) {
       Iterator it = 
newSerializerInstance(serializer).deserializeStream(fis).asKeyValueIterator();

Reply via email to