This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 6b64b1de9 [CELEBORN-648][SPARK] Improve perf of SendBufferPool and 
logs about memory
6b64b1de9 is described below

commit 6b64b1de9c69d01ea459159e66e7c9acd1aeb47e
Author: Cheng Pan <[email protected]>
AuthorDate: Fri Jun 9 09:45:27 2023 +0800

    [CELEBORN-648][SPARK] Improve perf of SendBufferPool and logs about memory
    
    ### What changes were proposed in this pull request?
    
    - Replace index-based item access with an iterator for LinkedList.
    - Always try to remove a buffer if SendBufferPool does not have a matched 
candidate, this change makes the total buffer number from `capacity+N-1` to 
`capacity` in worst cases.
    - Some logs and code polish.
    
    ### Why are the changes needed?
    
    Improve performance and logs, reduce memory consumption.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass GA.
    
    Closes #1560 from pan3793/CELEBORN-648.
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../spark/shuffle/celeborn/SendBufferPool.java     | 19 ++++++++++++-------
 .../shuffle/celeborn/ShuffleInMemorySorter.java    |  5 +----
 .../spark/shuffle/celeborn/SortBasedPusher.java    | 22 +++++++++++++---------
 .../shuffle/celeborn/HashBasedShuffleWriter.java   |  5 +----
 .../shuffle/celeborn/SortBasedShuffleWriter.java   |  8 +++++---
 .../shuffle/celeborn/HashBasedShuffleWriter.java   |  6 +-----
 .../shuffle/celeborn/SortBasedShuffleWriter.java   | 20 ++++++++++----------
 7 files changed, 43 insertions(+), 42 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SendBufferPool.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SendBufferPool.java
index 3608af48f..f568cc28c 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SendBufferPool.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SendBufferPool.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.shuffle.celeborn;
 
+import java.util.Iterator;
 import java.util.LinkedList;
 
 public class SendBufferPool {
@@ -36,23 +37,27 @@ public class SendBufferPool {
   private final int capacity;
 
   // numPartitions -> buffers
-  private LinkedList<byte[][]> buffers;
+  private final LinkedList<byte[][]> buffers;
 
-  public SendBufferPool(int capacity) {
+  private SendBufferPool(int capacity) {
+    assert capacity > 0;
     this.capacity = capacity;
     buffers = new LinkedList<>();
   }
 
   public synchronized byte[][] acquireBuffer(int numPartitions) {
-    for (int i = 0; i < buffers.size(); i++) {
-      if (buffers.get(i).length == numPartitions) {
-        return buffers.remove(i);
+    Iterator<byte[][]> iterator = buffers.iterator();
+    while (iterator.hasNext()) {
+      byte[][] candidate = iterator.next();
+      if (candidate.length == numPartitions) {
+        iterator.remove();
+        return candidate;
       }
     }
-    if (buffers.size() == capacity) {
+    if (buffers.size() > 0) {
       buffers.removeFirst();
     }
-    return null;
+    return new byte[numPartitions][];
   }
 
   public synchronized void returnBuffer(byte[][] buffer) {
diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/ShuffleInMemorySorter.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/ShuffleInMemorySorter.java
index 1ac823f2b..57d569eba 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/ShuffleInMemorySorter.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/ShuffleInMemorySorter.java
@@ -87,9 +87,7 @@ public class ShuffleInMemorySorter {
   public void expandPointerArray(LongArray newArray) {
     if (array != null) {
       if (newArray.size() < array.size()) {
-        // checkstyle.off: RegexpSinglelineJava
         throw new SparkOutOfMemoryError("Not enough memory to grow pointer 
array");
-        // checkstyle.on: RegexpSinglelineJava
       }
       Platform.copyMemory(
           array.getBaseObject(),
@@ -166,8 +164,7 @@ public class ShuffleInMemorySorter {
       return new ShuffleSorterIterator(0, array, 0);
     }
 
-    int offset = 0;
-    offset =
+    int offset =
         RadixSort.sort(
             array,
             pos,
diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
index 44b277066..80bb6faa9 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
@@ -67,7 +67,7 @@ public class SortBasedPusher extends MemoryConsumer {
   Consumer<Integer> afterPush;
   LongAdder[] mapStatusLengths;
   // this lock is shared between different SortBasedPushers to synchronize 
pushData
-  Object sharedPushLock;
+  final Object sharedPushLock;
   volatile boolean asyncPushing = false;
   int[] shuffledPartitions = null;
   int[] inversedShuffledPartitions = null;
@@ -223,10 +223,10 @@ public class SortBasedPusher extends MemoryConsumer {
     if (getUsed() > pushSortMemoryThreshold
         && pageCursor + bytes8K > currentPage.getBaseOffset() + 
currentPage.size()) {
       logger.info(
-          "Memory Used across threshold, need to trigger push. Memory: "
-              + getUsed()
-              + ", currentPage size: "
-              + currentPage.size());
+          "Memory used {} exceeds threshold {}, need to trigger push. 
currentPage size: {}",
+          Utils.bytesToString(getUsed()),
+          Utils.bytesToString(pushSortMemoryThreshold),
+          Utils.bytesToString(currentPage.size()));
       return false;
     }
 
@@ -314,13 +314,15 @@ public class SortBasedPusher extends MemoryConsumer {
         array = allocateArray(used / 8 * 2);
       } catch (TooLargePageException e) {
         // The pointer array is too big to fix in a single page, spill.
-        logger.info("Pushdata in growPointerArrayIfNecessary, memory used " + 
getUsed());
+        logger.info(
+            "Pushdata in growPointerArrayIfNecessary, memory used {}",
+            Utils.bytesToString(getUsed()));
         pushData();
-      } catch (SparkOutOfMemoryError e) {
+      } catch (SparkOutOfMemoryError rethrow) {
         // should have trigger spilling
         if (inMemSorter.numRecords() > 0) {
-          logger.error("Unable to grow the pointer array");
-          throw e;
+          logger.error("OOM, unable to grow the pointer array");
+          throw rethrow;
         }
         // The new array could not be allocated, but that is not an issue as 
it is longer needed,
         // as all records were spilled.
@@ -424,6 +426,8 @@ public class SortBasedPusher extends MemoryConsumer {
     }
   }
 
+  // SPARK-29310 opens it to public in Spark 3.0, it's necessary to keep 
compatible with Spark 2
+  @Override
   public long getUsed() {
     return super.getUsed();
   }
diff --git 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
index dcb46106d..f93eee8d5 100644
--- 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
+++ 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
@@ -95,7 +95,7 @@ public class HashBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
   private volatile boolean stopping = false;
 
   private DataPusher dataPusher;
-  private boolean unsafeRowFastWrite;
+  private final boolean unsafeRowFastWrite;
 
   // In order to facilitate the writing of unit test code, ShuffleClient needs 
to be passed in as
   // parameters. By the way, simplify the passed parameters.
@@ -137,9 +137,6 @@ public class HashBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
 
     this.sendBufferPool = sendBufferPool;
     sendBuffers = sendBufferPool.acquireBuffer(numPartitions);
-    if (sendBuffers == null) {
-      sendBuffers = new byte[numPartitions][];
-    }
     sendOffsets = new int[numPartitions];
 
     try {
diff --git 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
index af3207e83..d6abbe496 100644
--- 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
+++ 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
@@ -48,6 +48,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.celeborn.client.ShuffleClient;
 import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.util.Utils;
 
 @Private
 public class SortBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
@@ -296,7 +297,7 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
   }
 
   private void pushGiantRecord(int partitionId, byte[] buffer, int numBytes) 
throws IOException {
-    logger.debug("Push giant record, size {}.", numBytes);
+    logger.debug("Push giant record, size {}.", Utils.bytesToString(numBytes));
     long pushStartTime = System.nanoTime();
     int bytesWritten =
         rssShuffleClient.pushData(
@@ -317,9 +318,10 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
 
   private void close() throws IOException {
     if (pipelined) {
-      logger.info("Memory used {}", (pushers[0].getUsed() + 
pushers[1].getUsed()));
+      logger.info(
+          "Memory used {}", Utils.bytesToString((pushers[0].getUsed() + 
pushers[1].getUsed())));
     } else {
-      logger.info("Memory used {}", currentPusher.getUsed());
+      logger.info("Memory used {}", 
Utils.bytesToString(currentPusher.getUsed()));
     }
     long pushStartTime = System.nanoTime();
     if (pipelined) {
diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
index 7b32c47b5..496c91ee8 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
@@ -103,7 +103,7 @@ public class HashBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
   private StructType schema;
 
   private boolean isColumnarShuffle = false;
-  private boolean unsafeRowFastWrite;
+  private final boolean unsafeRowFastWrite;
 
   // In order to facilitate the writing of unit test code, ShuffleClient needs 
to be passed in as
   // parameters. By the way, simplify the passed parameters.
@@ -143,10 +143,6 @@ public class HashBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
 
     this.sendBufferPool = sendBufferPool;
     sendBuffers = sendBufferPool.acquireBuffer(numPartitions);
-    if (sendBuffers == null) {
-      logger.info("Acquire failed");
-      sendBuffers = new byte[numPartitions][];
-    }
     sendOffsets = new int[numPartitions];
 
     try {
diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
index ec92b0e46..0cc7114cf 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
@@ -21,8 +21,6 @@ import java.io.IOException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.LongAdder;
 
-import javax.annotation.Nullable;
-
 import scala.Option;
 import scala.Product2;
 import scala.reflect.ClassTag;
@@ -50,6 +48,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.celeborn.client.ShuffleClient;
 import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.common.exception.CelebornIOException;
+import org.apache.celeborn.common.util.Utils;
 
 @Private
 public class SortBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
@@ -74,10 +73,10 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
   // this lock is shared between different SortBasedPushers to synchronize 
pushData
   private final Object sharedPushLock = new Object();
   private final boolean pipelined;
-  private SortBasedPusher[] pushers = new SortBasedPusher[2];
+  private final SortBasedPusher[] pushers = new SortBasedPusher[2];
   private SortBasedPusher currentPusher;
-
-  @Nullable private long peakMemoryUsedBytes = 0;
+  // TODO it isn't be updated after initialization
+  private long peakMemoryUsedBytes = 0;
 
   private final OpenByteArrayOutputStream serBuffer;
   private final SerializationStream serOutputStream;
@@ -92,7 +91,7 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
    */
   private volatile boolean stopping = false;
 
-  private boolean unsafeRowFastWrite;
+  private final boolean unsafeRowFastWrite;
 
   // In order to facilitate the writing of unit test code, ShuffleClient needs 
to be passed in as
   // parameters. By the way, simplify the passed parameters.
@@ -302,7 +301,7 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
   }
 
   private void pushGiantRecord(int partitionId, byte[] buffer, int numBytes) 
throws IOException {
-    logger.debug("Push giant record, size {}.", numBytes);
+    logger.debug("Push giant record, size {}.", Utils.bytesToString(numBytes));
     long pushStartTime = System.nanoTime();
     int bytesWritten =
         rssShuffleClient.pushData(
@@ -323,9 +322,10 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
 
   private void close() throws IOException {
     if (pipelined) {
-      logger.info("Memory used {}", (pushers[0].getUsed() + 
pushers[1].getUsed()));
+      logger.info(
+          "Memory used {}", Utils.bytesToString((pushers[0].getUsed() + 
pushers[1].getUsed())));
     } else {
-      logger.info("Memory used {}", currentPusher.getUsed());
+      logger.info("Memory used {}", 
Utils.bytesToString(currentPusher.getUsed()));
     }
     long pushStartTime = System.nanoTime();
     if (pipelined) {
@@ -388,6 +388,6 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
   // Added in SPARK-32917, for Spark 3.2 and above
   public long[] getPartitionLengths() {
     throw new UnsupportedOperationException(
-        "RSS is not compatible with Spark push mode, please set 
spark.shuffle.push.enabled to false");
+        "Celeborn is not compatible with push-based shuffle, please set 
spark.shuffle.push.enabled to false");
   }
 }

Reply via email to