This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new 2bda175b1 [CELEBORN-673][SPARK][PERF] Improve the perf of sort-based 
shuffle write
2bda175b1 is described below

commit 2bda175b1761b6985ce155299e5b505ee342be94
Author: Fu Chen <[email protected]>
AuthorDate: Tue Jun 13 19:07:04 2023 +0800

    [CELEBORN-673][SPARK][PERF] Improve the perf of sort-based shuffle write
    
    ### What changes were proposed in this pull request?
    
    1. `SQLShuffleWriteMetricsReporter#incWriteTime` is a performance killer, 
stop calling it once we insert a record
    2. simplify the `incWriteTime` logic for handling large records, also 
including the time required for memory copying
    
    ### Why are the changes needed?
    
    frame graph and stage duration before:
    
    ![截屏2023-06-13 下午3 30 
53](https://github.com/apache/incubator-celeborn/assets/8537877/5fb0a242-82d1-4348-aeaa-4af75a012308)
    
    ![截屏2023-06-13 下午3 31 
26](https://github.com/apache/incubator-celeborn/assets/8537877/3ded2f16-1c17-4120-8d10-31ea7b5182a2)
    
    frame graph and stage duration after:
    
    ![截屏2023-06-13 下午3 33 
08](https://github.com/apache/incubator-celeborn/assets/8537877/fbe45cf2-4d23-4d6c-a476-64338e1610f1)
    
    ![截屏2023-06-13 下午3 33 
59](https://github.com/apache/incubator-celeborn/assets/8537877/9129d771-ad36-42e9-86b7-e454d2f8e0b0)
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, only perf improvement
    
    ### How was this patch tested?
    
    tested locally.
    
    Closes #1585 from cfmcgrady/shuffle-metrics.
    
    Authored-by: Fu Chen <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
    (cherry picked from commit 86cbf7a3596303fea207e55e7ecb696d724d5409)
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../spark/shuffle/celeborn/SortBasedShuffleWriter.java | 14 ++++++++------
 .../spark/shuffle/celeborn/SortBasedShuffleWriter.java | 18 +++++++++---------
 2 files changed, 17 insertions(+), 15 deletions(-)

diff --git 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
index d6abbe496..33292f3a7 100644
--- 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
+++ 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
@@ -200,6 +200,7 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     SQLMetric dataSize =
         SparkUtils.getUnsafeRowSerializerDataSizeMetric((UnsafeRowSerializer) 
dep.serializer());
 
+    long shuffleWriteTimeSum = 0L;
     while (records.hasNext()) {
       final Product2<Integer, UnsafeRow> record = records.next();
       final int partitionId = record._1();
@@ -211,6 +212,7 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
         dataSize.add(serializedRecordSize);
       }
 
+      long insertAndPushStartTime = System.nanoTime();
       if (serializedRecordSize > pushBufferMaxSize) {
         byte[] giantBuffer = new byte[serializedRecordSize];
         Platform.putInt(giantBuffer, Platform.BYTE_ARRAY_OFFSET, 
Integer.reverseBytes(rowSize));
@@ -222,7 +224,6 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
             rowSize);
         pushGiantRecord(partitionId, giantBuffer, serializedRecordSize);
       } else {
-        long insertStartTime = System.nanoTime();
         boolean success =
             currentPusher.insertRecord(
                 row.getBaseObject(), row.getBaseOffset(), rowSize, 
partitionId, true);
@@ -235,10 +236,11 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
             throw new IOException("Unable to push after switching pusher!");
           }
         }
-        writeMetrics.incWriteTime(System.nanoTime() - insertStartTime);
       }
+      shuffleWriteTimeSum += System.nanoTime() - insertAndPushStartTime;
       tmpRecords[partitionId] += 1;
     }
+    writeMetrics.incWriteTime(shuffleWriteTimeSum);
   }
 
   private void pushAndSwitch() throws IOException {
@@ -254,6 +256,7 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
   private void write0(scala.collection.Iterator iterator) throws IOException {
     final scala.collection.Iterator<Product2<K, ?>> records = iterator;
 
+    long shuffleWriteTimeSum = 0L;
     while (records.hasNext()) {
       final Product2<K, ?> record = records.next();
       final K key = record._1();
@@ -266,10 +269,10 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
       final int serializedRecordSize = serBuffer.size();
       assert (serializedRecordSize > 0);
 
+      long insertAndPushStartTime = System.nanoTime();
       if (serializedRecordSize > pushBufferMaxSize) {
         pushGiantRecord(partitionId, serBuffer.getBuf(), serializedRecordSize);
       } else {
-        long insertStartTime = System.nanoTime();
         boolean success =
             currentPusher.insertRecord(
                 serBuffer.getBuf(),
@@ -290,15 +293,15 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
             throw new IOException("Unable to push after switching pusher!");
           }
         }
-        writeMetrics.incWriteTime(System.nanoTime() - insertStartTime);
       }
+      shuffleWriteTimeSum += System.nanoTime() - insertAndPushStartTime;
       tmpRecords[partitionId] += 1;
     }
+    writeMetrics.incWriteTime(shuffleWriteTimeSum);
   }
 
   private void pushGiantRecord(int partitionId, byte[] buffer, int numBytes) 
throws IOException {
     logger.debug("Push giant record, size {}.", Utils.bytesToString(numBytes));
-    long pushStartTime = System.nanoTime();
     int bytesWritten =
         rssShuffleClient.pushData(
             appId,
@@ -313,7 +316,6 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
             numPartitions);
     mapStatusLengths[partitionId].add(bytesWritten);
     writeMetrics.incBytesWritten(bytesWritten);
-    writeMetrics.incWriteTime(System.nanoTime() - pushStartTime);
   }
 
   private void close() throws IOException {
diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
index 47d8eb41e..a62e2c21a 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
@@ -222,6 +222,7 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     final scala.collection.Iterator<Product2<Integer, UnsafeRow>> records = 
iterator;
 
     SQLMetric dataSize = SparkUtils.getDataSize((UnsafeRowSerializer) 
dep.serializer());
+    long shuffleWriteTimeSum = 0L;
     while (records.hasNext()) {
       final Product2<Integer, UnsafeRow> record = records.next();
       final int partitionId = record._1();
@@ -234,6 +235,7 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
         dataSize.add(serializedRecordSize);
       }
 
+      long insertAndPushStartTime = System.nanoTime();
       if (serializedRecordSize > pushBufferMaxSize) {
         byte[] giantBuffer = new byte[serializedRecordSize];
         Platform.putInt(giantBuffer, Platform.BYTE_ARRAY_OFFSET, 
Integer.reverseBytes(rowSize));
@@ -245,7 +247,6 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
             rowSize);
         pushGiantRecord(partitionId, giantBuffer, serializedRecordSize);
       } else {
-        long insertStartTime = System.nanoTime();
         boolean success =
             currentPusher.insertRecord(
                 row.getBaseObject(), row.getBaseOffset(), rowSize, 
partitionId, true);
@@ -258,10 +259,11 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
             throw new CelebornIOException("Unable to push after switching 
pusher!");
           }
         }
-        writeMetrics.incWriteTime(System.nanoTime() - insertStartTime);
       }
+      shuffleWriteTimeSum += System.nanoTime() - insertAndPushStartTime;
       tmpRecords[partitionId] += 1;
     }
+    writeMetrics.incWriteTime(shuffleWriteTimeSum);
   }
 
   private void pushAndSwitch() throws IOException {
@@ -277,6 +279,7 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
   private void write0(scala.collection.Iterator iterator) throws IOException {
     final scala.collection.Iterator<Product2<K, ?>> records = iterator;
 
+    long shuffleWriteTimeSum = 0L;
     while (records.hasNext()) {
       final Product2<K, ?> record = records.next();
       final K key = record._1();
@@ -289,10 +292,10 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
       final int serializedRecordSize = serBuffer.size();
       assert (serializedRecordSize > 0);
 
+      long insertAndPushStartTime = System.nanoTime();
       if (serializedRecordSize > pushBufferMaxSize) {
         pushGiantRecord(partitionId, serBuffer.getBuf(), serializedRecordSize);
       } else {
-        long insertStartTime = System.nanoTime();
         boolean success =
             currentPusher.insertRecord(
                 serBuffer.getBuf(),
@@ -313,15 +316,15 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
             throw new IOException("Unable to push after switching pusher!");
           }
         }
-        writeMetrics.incWriteTime(System.nanoTime() - insertStartTime);
       }
+      shuffleWriteTimeSum += System.nanoTime() - insertAndPushStartTime;
       tmpRecords[partitionId] += 1;
     }
+    writeMetrics.incWriteTime(shuffleWriteTimeSum);
   }
 
   private void pushGiantRecord(int partitionId, byte[] buffer, int numBytes) 
throws IOException {
     logger.debug("Push giant record, size {}.", Utils.bytesToString(numBytes));
-    long pushStartTime = System.nanoTime();
     int bytesWritten =
         rssShuffleClient.pushData(
             appId,
@@ -336,7 +339,6 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
             numPartitions);
     mapStatusLengths[partitionId].add(bytesWritten);
     writeMetrics.incBytesWritten(bytesWritten);
-    writeMetrics.incWriteTime(System.nanoTime() - pushStartTime);
   }
 
   private void close() throws IOException {
@@ -357,11 +359,9 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
       currentPusher.pushData();
       currentPusher.close();
     }
-    writeMetrics.incWriteTime(System.nanoTime() - pushStartTime);
 
-    long pushMergedDataTime = System.nanoTime();
     rssShuffleClient.pushMergedData(appId, shuffleId, mapId, 
taskContext.attemptNumber());
-    writeMetrics.incWriteTime(System.nanoTime() - pushMergedDataTime);
+    writeMetrics.incWriteTime(System.nanoTime() - pushStartTime);
 
     updateMapStatus();
 

Reply via email to