This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 0636e3ca4 [CELEBORN-654][SPARK] SortBasedShuffleWriter does not 
require mapStatusRecords in Spark 3
0636e3ca4 is described below

commit 0636e3ca409beff9ed893a9b4e54f19d60b80435
Author: Cheng Pan <[email protected]>
AuthorDate: Fri Jun 9 09:43:08 2023 +0800

    [CELEBORN-654][SPARK] SortBasedShuffleWriter does not require 
mapStatusRecords in Spark 3
    
    ### What changes were proposed in this pull request?
    
    `mapStatusRecords` is required in Spark 2 for constructing `MapStatus` when 
AQE is enabled, but not in Spark 3, so remove it to save memory and compute 
resources.
    
    This PR also simplifies the `for loop` code.
    
    ### Why are the changes needed?
    
    Remove unnecessary variables to save resources and clean up code.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass GA.
    
    Closes #1564 from pan3793/CELEBORN-654.
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../spark/shuffle/celeborn/SortBasedShuffleWriter.java  | 14 ++++++--------
 .../spark/shuffle/celeborn/SortBasedShuffleWriter.java  | 17 ++++++-----------
 2 files changed, 12 insertions(+), 19 deletions(-)

diff --git 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
index edbfa529f..af3207e83 100644
--- 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
+++ 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
@@ -323,10 +323,10 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     }
     long pushStartTime = System.nanoTime();
     if (pipelined) {
-      for (int i = 0; i < pushers.length; i++) {
-        pushers[i].waitPushFinish();
-        pushers[i].pushData();
-        pushers[i].close();
+      for (SortBasedPusher pusher : pushers) {
+        pusher.waitPushFinish();
+        pusher.pushData();
+        pusher.close();
       }
     } else {
       currentPusher.pushData();
@@ -344,13 +344,11 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
   }
 
   private void updateMapStatus() {
-    long recordsWritten = 0;
-    for (int i = 0; i < partitioner.numPartitions(); i++) {
+    for (int i = 0; i < tmpRecords.length; i++) {
       mapStatusRecords[i] += tmpRecords[i];
-      recordsWritten += tmpRecords[i];
+      writeMetrics.incRecordsWritten(tmpRecords[i]);
       tmpRecords[i] = 0;
     }
-    writeMetrics.incRecordsWritten(recordsWritten);
   }
 
   @Override
diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
index 070aa828f..ec92b0e46 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
@@ -83,7 +83,6 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
   private final SerializationStream serOutputStream;
 
   private final LongAdder[] mapStatusLengths;
-  private final long[] mapStatusRecords;
   private final long[] tmpRecords;
 
   /**
@@ -124,7 +123,6 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     serOutputStream = serializer.serializeStream(serBuffer);
 
     this.mapStatusLengths = new LongAdder[numPartitions];
-    this.mapStatusRecords = new long[numPartitions];
     for (int i = 0; i < numPartitions; i++) {
       this.mapStatusLengths[i] = new LongAdder();
     }
@@ -331,10 +329,10 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     }
     long pushStartTime = System.nanoTime();
     if (pipelined) {
-      for (int i = 0; i < pushers.length; i++) {
-        pushers[i].waitPushFinish();
-        pushers[i].pushData();
-        pushers[i].close();
+      for (SortBasedPusher pusher : pushers) {
+        pusher.waitPushFinish();
+        pusher.pushData();
+        pusher.close();
       }
     } else {
       currentPusher.pushData();
@@ -354,13 +352,10 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
   }
 
   private void updateMapStatus() {
-    long recordsWritten = 0;
-    for (int i = 0; i < partitioner.numPartitions(); i++) {
-      mapStatusRecords[i] += tmpRecords[i];
-      recordsWritten += tmpRecords[i];
+    for (int i = 0; i < tmpRecords.length; i++) {
+      writeMetrics.incRecordsWritten(tmpRecords[i]);
       tmpRecords[i] = 0;
     }
-    writeMetrics.incRecordsWritten(recordsWritten);
   }
 
   @Override

Reply via email to