This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 6b64b1de9 [CELEBORN-648][SPARK] Improve perf of SendBufferPool and
logs about memory
6b64b1de9 is described below
commit 6b64b1de9c69d01ea459159e66e7c9acd1aeb47e
Author: Cheng Pan <[email protected]>
AuthorDate: Fri Jun 9 09:45:27 2023 +0800
[CELEBORN-648][SPARK] Improve perf of SendBufferPool and logs about memory
### What changes were proposed in this pull request?
- Replace index-based item access with an iterator for LinkedList.
- Always try to remove a buffer if SendBufferPool does not have a matched
candidate, this change makes the total buffer number from `capacity+N-1` to
`capacity` in worst cases.
- Some logs and code polish.
### Why are the changes needed?
Improve performance and logs, reduce memory consumption.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass GA.
Closes #1560 from pan3793/CELEBORN-648.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../spark/shuffle/celeborn/SendBufferPool.java | 19 ++++++++++++-------
.../shuffle/celeborn/ShuffleInMemorySorter.java | 5 +----
.../spark/shuffle/celeborn/SortBasedPusher.java | 22 +++++++++++++---------
.../shuffle/celeborn/HashBasedShuffleWriter.java | 5 +----
.../shuffle/celeborn/SortBasedShuffleWriter.java | 8 +++++---
.../shuffle/celeborn/HashBasedShuffleWriter.java | 6 +-----
.../shuffle/celeborn/SortBasedShuffleWriter.java | 20 ++++++++++----------
7 files changed, 43 insertions(+), 42 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SendBufferPool.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SendBufferPool.java
index 3608af48f..f568cc28c 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SendBufferPool.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SendBufferPool.java
@@ -17,6 +17,7 @@
package org.apache.spark.shuffle.celeborn;
+import java.util.Iterator;
import java.util.LinkedList;
public class SendBufferPool {
@@ -36,23 +37,27 @@ public class SendBufferPool {
private final int capacity;
// numPartitions -> buffers
- private LinkedList<byte[][]> buffers;
+ private final LinkedList<byte[][]> buffers;
- public SendBufferPool(int capacity) {
+ private SendBufferPool(int capacity) {
+ assert capacity > 0;
this.capacity = capacity;
buffers = new LinkedList<>();
}
public synchronized byte[][] acquireBuffer(int numPartitions) {
- for (int i = 0; i < buffers.size(); i++) {
- if (buffers.get(i).length == numPartitions) {
- return buffers.remove(i);
+ Iterator<byte[][]> iterator = buffers.iterator();
+ while (iterator.hasNext()) {
+ byte[][] candidate = iterator.next();
+ if (candidate.length == numPartitions) {
+ iterator.remove();
+ return candidate;
}
}
- if (buffers.size() == capacity) {
+ if (buffers.size() > 0) {
buffers.removeFirst();
}
- return null;
+ return new byte[numPartitions][];
}
public synchronized void returnBuffer(byte[][] buffer) {
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/ShuffleInMemorySorter.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/ShuffleInMemorySorter.java
index 1ac823f2b..57d569eba 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/ShuffleInMemorySorter.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/ShuffleInMemorySorter.java
@@ -87,9 +87,7 @@ public class ShuffleInMemorySorter {
public void expandPointerArray(LongArray newArray) {
if (array != null) {
if (newArray.size() < array.size()) {
- // checkstyle.off: RegexpSinglelineJava
throw new SparkOutOfMemoryError("Not enough memory to grow pointer
array");
- // checkstyle.on: RegexpSinglelineJava
}
Platform.copyMemory(
array.getBaseObject(),
@@ -166,8 +164,7 @@ public class ShuffleInMemorySorter {
return new ShuffleSorterIterator(0, array, 0);
}
- int offset = 0;
- offset =
+ int offset =
RadixSort.sort(
array,
pos,
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
index 44b277066..80bb6faa9 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
@@ -67,7 +67,7 @@ public class SortBasedPusher extends MemoryConsumer {
Consumer<Integer> afterPush;
LongAdder[] mapStatusLengths;
// this lock is shared between different SortBasedPushers to synchronize
pushData
- Object sharedPushLock;
+ final Object sharedPushLock;
volatile boolean asyncPushing = false;
int[] shuffledPartitions = null;
int[] inversedShuffledPartitions = null;
@@ -223,10 +223,10 @@ public class SortBasedPusher extends MemoryConsumer {
if (getUsed() > pushSortMemoryThreshold
&& pageCursor + bytes8K > currentPage.getBaseOffset() +
currentPage.size()) {
logger.info(
- "Memory Used across threshold, need to trigger push. Memory: "
- + getUsed()
- + ", currentPage size: "
- + currentPage.size());
+ "Memory used {} exceeds threshold {}, need to trigger push.
currentPage size: {}",
+ Utils.bytesToString(getUsed()),
+ Utils.bytesToString(pushSortMemoryThreshold),
+ Utils.bytesToString(currentPage.size()));
return false;
}
@@ -314,13 +314,15 @@ public class SortBasedPusher extends MemoryConsumer {
array = allocateArray(used / 8 * 2);
} catch (TooLargePageException e) {
// The pointer array is too big to fix in a single page, spill.
- logger.info("Pushdata in growPointerArrayIfNecessary, memory used " +
getUsed());
+ logger.info(
+ "Pushdata in growPointerArrayIfNecessary, memory used {}",
+ Utils.bytesToString(getUsed()));
pushData();
- } catch (SparkOutOfMemoryError e) {
+ } catch (SparkOutOfMemoryError rethrow) {
// should have trigger spilling
if (inMemSorter.numRecords() > 0) {
- logger.error("Unable to grow the pointer array");
- throw e;
+ logger.error("OOM, unable to grow the pointer array");
+ throw rethrow;
}
// The new array could not be allocated, but that is not an issue as
it is longer needed,
// as all records were spilled.
@@ -424,6 +426,8 @@ public class SortBasedPusher extends MemoryConsumer {
}
}
+ // SPARK-29310 opens it to public in Spark 3.0, it's necessary to keep
compatible with Spark 2
+ @Override
public long getUsed() {
return super.getUsed();
}
diff --git
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
index dcb46106d..f93eee8d5 100644
---
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
+++
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
@@ -95,7 +95,7 @@ public class HashBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
private volatile boolean stopping = false;
private DataPusher dataPusher;
- private boolean unsafeRowFastWrite;
+ private final boolean unsafeRowFastWrite;
// In order to facilitate the writing of unit test code, ShuffleClient needs
to be passed in as
// parameters. By the way, simplify the passed parameters.
@@ -137,9 +137,6 @@ public class HashBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
this.sendBufferPool = sendBufferPool;
sendBuffers = sendBufferPool.acquireBuffer(numPartitions);
- if (sendBuffers == null) {
- sendBuffers = new byte[numPartitions][];
- }
sendOffsets = new int[numPartitions];
try {
diff --git
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
index af3207e83..d6abbe496 100644
---
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
+++
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
@@ -48,6 +48,7 @@ import org.slf4j.LoggerFactory;
import org.apache.celeborn.client.ShuffleClient;
import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.util.Utils;
@Private
public class SortBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
@@ -296,7 +297,7 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
}
private void pushGiantRecord(int partitionId, byte[] buffer, int numBytes)
throws IOException {
- logger.debug("Push giant record, size {}.", numBytes);
+ logger.debug("Push giant record, size {}.", Utils.bytesToString(numBytes));
long pushStartTime = System.nanoTime();
int bytesWritten =
rssShuffleClient.pushData(
@@ -317,9 +318,10 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
private void close() throws IOException {
if (pipelined) {
- logger.info("Memory used {}", (pushers[0].getUsed() +
pushers[1].getUsed()));
+ logger.info(
+ "Memory used {}", Utils.bytesToString((pushers[0].getUsed() +
pushers[1].getUsed())));
} else {
- logger.info("Memory used {}", currentPusher.getUsed());
+ logger.info("Memory used {}",
Utils.bytesToString(currentPusher.getUsed()));
}
long pushStartTime = System.nanoTime();
if (pipelined) {
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
index 7b32c47b5..496c91ee8 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
@@ -103,7 +103,7 @@ public class HashBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
private StructType schema;
private boolean isColumnarShuffle = false;
- private boolean unsafeRowFastWrite;
+ private final boolean unsafeRowFastWrite;
// In order to facilitate the writing of unit test code, ShuffleClient needs
to be passed in as
// parameters. By the way, simplify the passed parameters.
@@ -143,10 +143,6 @@ public class HashBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
this.sendBufferPool = sendBufferPool;
sendBuffers = sendBufferPool.acquireBuffer(numPartitions);
- if (sendBuffers == null) {
- logger.info("Acquire failed");
- sendBuffers = new byte[numPartitions][];
- }
sendOffsets = new int[numPartitions];
try {
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
index ec92b0e46..0cc7114cf 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
@@ -21,8 +21,6 @@ import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.LongAdder;
-import javax.annotation.Nullable;
-
import scala.Option;
import scala.Product2;
import scala.reflect.ClassTag;
@@ -50,6 +48,7 @@ import org.slf4j.LoggerFactory;
import org.apache.celeborn.client.ShuffleClient;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.exception.CelebornIOException;
+import org.apache.celeborn.common.util.Utils;
@Private
public class SortBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
@@ -74,10 +73,10 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
// this lock is shared between different SortBasedPushers to synchronize
pushData
private final Object sharedPushLock = new Object();
private final boolean pipelined;
- private SortBasedPusher[] pushers = new SortBasedPusher[2];
+ private final SortBasedPusher[] pushers = new SortBasedPusher[2];
private SortBasedPusher currentPusher;
-
- @Nullable private long peakMemoryUsedBytes = 0;
+ // TODO it isn't be updated after initialization
+ private long peakMemoryUsedBytes = 0;
private final OpenByteArrayOutputStream serBuffer;
private final SerializationStream serOutputStream;
@@ -92,7 +91,7 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
*/
private volatile boolean stopping = false;
- private boolean unsafeRowFastWrite;
+ private final boolean unsafeRowFastWrite;
// In order to facilitate the writing of unit test code, ShuffleClient needs
to be passed in as
// parameters. By the way, simplify the passed parameters.
@@ -302,7 +301,7 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
}
private void pushGiantRecord(int partitionId, byte[] buffer, int numBytes)
throws IOException {
- logger.debug("Push giant record, size {}.", numBytes);
+ logger.debug("Push giant record, size {}.", Utils.bytesToString(numBytes));
long pushStartTime = System.nanoTime();
int bytesWritten =
rssShuffleClient.pushData(
@@ -323,9 +322,10 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
private void close() throws IOException {
if (pipelined) {
- logger.info("Memory used {}", (pushers[0].getUsed() +
pushers[1].getUsed()));
+ logger.info(
+ "Memory used {}", Utils.bytesToString((pushers[0].getUsed() +
pushers[1].getUsed())));
} else {
- logger.info("Memory used {}", currentPusher.getUsed());
+ logger.info("Memory used {}",
Utils.bytesToString(currentPusher.getUsed()));
}
long pushStartTime = System.nanoTime();
if (pipelined) {
@@ -388,6 +388,6 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
// Added in SPARK-32917, for Spark 3.2 and above
public long[] getPartitionLengths() {
throw new UnsupportedOperationException(
- "RSS is not compatible with Spark push mode, please set
spark.shuffle.push.enabled to false");
+ "Celeborn is not compatible with push-based shuffle, please set
spark.shuffle.push.enabled to false");
}
}