This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 77e468161 [CELEBORN-891] Remove pipeline feature for sort based writer
77e468161 is described below
commit 77e468161d5ed828530c6832610cb366c6f224da
Author: Cheng Pan <[email protected]>
AuthorDate: Mon Jan 1 10:42:17 2024 +0800
[CELEBORN-891] Remove pipeline feature for sort based writer
### What changes were proposed in this pull request?
Remove pipeline feature for sort based writer
### Why are the changes needed?
The pipeline feature is added as part of CELEBORN-295, for performance.
Eventually, an unresolvable issue that would crash the JVM was identified in
https://github.com/apache/incubator-celeborn/pull/1807, and after discussion,
we decided to delete this feature.
### Does this PR introduce _any_ user-facing change?
No, the pipeline feature is disabled by default, there are no changes to
users who use the default settings.
### How was this patch tested?
Pass GA.
Closes #2196 from pan3793/CELEBORN-891.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../spark/shuffle/celeborn/SortBasedPusher.java | 158 +++++++--------------
.../shuffle/celeborn/SortBasedPusherSuiteJ.java | 2 -
.../shuffle/celeborn/SortBasedShuffleWriter.java | 134 +++++------------
.../shuffle/celeborn/SparkShuffleManager.java | 24 ----
.../celeborn/SortBasedShuffleWriterSuiteJ.java | 1 -
.../celeborn/CelebornShuffleManagerSuite.scala | 2 -
.../shuffle/celeborn/SortBasedShuffleWriter.java | 133 +++++------------
.../shuffle/celeborn/SparkShuffleManager.java | 25 ----
.../celeborn/SortBasedShuffleWriterSuiteJ.java | 2 +-
.../org/apache/celeborn/common/util/JavaUtils.java | 6 -
.../org/apache/celeborn/common/CelebornConf.scala | 16 +--
docs/configuration/client.md | 3 +-
.../tests/spark/CelebornPipelineSortSuite.scala | 60 --------
...lineSortSuite.scala => CelebornSortSuite.scala} | 3 +-
14 files changed, 124 insertions(+), 445 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
index aff8e7ce3..3b051c3e7 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
@@ -19,7 +19,6 @@ package org.apache.spark.shuffle.celeborn;
import java.io.IOException;
import java.util.LinkedList;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer;
@@ -69,12 +68,8 @@ public class SortBasedPusher extends MemoryConsumer {
private final int numPartitions;
private final Consumer<Integer> afterPush;
private final LongAdder[] mapStatusLengths;
- // this lock is shared between different SortBasedPushers to synchronize
pushData
- private final Object sharedPushLock;
- private volatile boolean asyncPushing = false;
private int[] shuffledPartitions = null;
private int[] inversedShuffledPartitions = null;
- private final ExecutorService executorService;
private final SendBufferPool sendBufferPool;
public SortBasedPusher(
@@ -91,8 +86,6 @@ public class SortBasedPusher extends MemoryConsumer {
Consumer<Integer> afterPush,
LongAdder[] mapStatusLengths,
long pushSortMemoryThreshold,
- Object sharedPushLock,
- ExecutorService executorService,
SendBufferPool sendBufferPool) {
super(
memoryManager,
@@ -144,83 +137,74 @@ public class SortBasedPusher extends MemoryConsumer {
int initialSize = Math.min((int) pushSortMemoryThreshold / 8, 1024 * 1024);
this.inMemSorter = new ShuffleInMemorySorter(this, initialSize);
this.peakMemoryUsedBytes = getMemoryUsage();
- this.sharedPushLock = sharedPushLock;
- this.executorService = executorService;
}
public long pushData() throws IOException {
- // pushData should be synchronized between pushers
- synchronized (sharedPushLock) {
- final ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords =
- inMemSorter.getSortedIterator();
-
- byte[] dataBuf = new byte[pushBufferMaxSize];
- int offSet = 0;
- int currentPartition = -1;
- while (sortedRecords.hasNext()) {
- sortedRecords.loadNext();
- final int partition =
- shuffledPartitions != null
- ?
inversedShuffledPartitions[sortedRecords.packedRecordPointer.getPartitionId()]
- : sortedRecords.packedRecordPointer.getPartitionId();
- if (partition != currentPartition) {
- if (currentPartition == -1) {
- currentPartition = partition;
- } else {
- int bytesWritten =
- shuffleClient.mergeData(
- shuffleId,
- mapId,
- attemptNumber,
- currentPartition,
- dataBuf,
- 0,
- offSet,
- numMappers,
- numPartitions);
- mapStatusLengths[currentPartition].add(bytesWritten);
- afterPush.accept(bytesWritten);
- currentPartition = partition;
- offSet = 0;
- }
- }
- final long recordPointer =
sortedRecords.packedRecordPointer.getRecordPointer();
- final Object recordPage = taskMemoryManager.getPage(recordPointer);
- final long recordOffsetInPage =
taskMemoryManager.getOffsetInPage(recordPointer);
- int recordSize = UnsafeAlignedOffset.getSize(recordPage,
recordOffsetInPage);
-
- if (offSet + recordSize > dataBuf.length) {
- try {
- dataPusher.addTask(partition, dataBuf, offSet);
- } catch (InterruptedException e) {
- TaskInterruptedHelper.throwTaskKillException();
- }
+ final ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords =
+ inMemSorter.getSortedIterator();
+
+ byte[] dataBuf = new byte[pushBufferMaxSize];
+ int offSet = 0;
+ int currentPartition = -1;
+ while (sortedRecords.hasNext()) {
+ sortedRecords.loadNext();
+ final int partition =
+ shuffledPartitions != null
+ ?
inversedShuffledPartitions[sortedRecords.packedRecordPointer.getPartitionId()]
+ : sortedRecords.packedRecordPointer.getPartitionId();
+ if (partition != currentPartition) {
+ if (currentPartition == -1) {
+ currentPartition = partition;
+ } else {
+ int bytesWritten =
+ shuffleClient.mergeData(
+ shuffleId,
+ mapId,
+ attemptNumber,
+ currentPartition,
+ dataBuf,
+ 0,
+ offSet,
+ numMappers,
+ numPartitions);
+ mapStatusLengths[currentPartition].add(bytesWritten);
+ afterPush.accept(bytesWritten);
+ currentPartition = partition;
offSet = 0;
}
-
- long recordReadPosition = recordOffsetInPage + UAO_SIZE;
- Platform.copyMemory(
- recordPage,
- recordReadPosition,
- dataBuf,
- Platform.BYTE_ARRAY_OFFSET + offSet,
- recordSize);
- offSet += recordSize;
}
- if (offSet > 0) {
+ final long recordPointer =
sortedRecords.packedRecordPointer.getRecordPointer();
+ final Object recordPage = taskMemoryManager.getPage(recordPointer);
+ final long recordOffsetInPage =
taskMemoryManager.getOffsetInPage(recordPointer);
+ int recordSize = UnsafeAlignedOffset.getSize(recordPage,
recordOffsetInPage);
+
+ if (offSet + recordSize > dataBuf.length) {
try {
- dataPusher.addTask(currentPartition, dataBuf, offSet);
+ dataPusher.addTask(partition, dataBuf, offSet);
} catch (InterruptedException e) {
TaskInterruptedHelper.throwTaskKillException();
}
+ offSet = 0;
}
- long freedBytes = freeMemory();
- inMemSorter.freeMemory();
- taskContext.taskMetrics().incMemoryBytesSpilled(freedBytes);
-
- return freedBytes;
+ long recordReadPosition = recordOffsetInPage + UAO_SIZE;
+ Platform.copyMemory(
+ recordPage, recordReadPosition, dataBuf, Platform.BYTE_ARRAY_OFFSET
+ offSet, recordSize);
+ offSet += recordSize;
}
+ if (offSet > 0) {
+ try {
+ dataPusher.addTask(currentPartition, dataBuf, offSet);
+ } catch (InterruptedException e) {
+ TaskInterruptedHelper.throwTaskKillException();
+ }
+ }
+
+ long freedBytes = freeMemory();
+ inMemSorter.freeMemory();
+ taskContext.taskMetrics().incMemoryBytesSpilled(freedBytes);
+
+ return freedBytes;
}
public boolean insertRecord(
@@ -271,38 +255,6 @@ public class SortBasedPusher extends MemoryConsumer {
return true;
}
- public void triggerPush() throws IOException {
- asyncPushing = true;
- dataPusher.checkException();
- executorService.submit(
- () -> {
- try {
- pushData();
- asyncPushing = false;
- } catch (IOException ie) {
- dataPusher.setException(ie);
- }
- });
- }
-
- /**
- * Since this method and pushData() are synchronized When this method
returns, it means pushData
- * has released lock
- *
- * @throws IOException
- */
- public void waitPushFinish() throws IOException {
- dataPusher.checkException();
- while (asyncPushing) {
- try {
- Thread.sleep(50);
- } catch (InterruptedException e) {
- logger.error("SortBasedPusher thread interrupted while waiting push
finished.");
- TaskInterruptedHelper.throwTaskKillException();
- }
- }
- }
-
private void growPointerArrayIfNecessary() throws IOException {
assert (inMemSorter != null);
if (!inMemSorter.hasSpaceForAnotherRecord()) {
diff --git
a/client-spark/common/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedPusherSuiteJ.java
b/client-spark/common/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedPusherSuiteJ.java
index bc86f9db7..0962c98c4 100644
---
a/client-spark/common/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedPusherSuiteJ.java
+++
b/client-spark/common/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedPusherSuiteJ.java
@@ -97,8 +97,6 @@ public class SortBasedPusherSuiteJ {
/*afterPush=*/ null,
/*mapStatusLengths=*/ null,
/*pushSortMemoryThreshold=*/ Utils.byteStringAsBytes("1m"),
- /*sharedPushLock=*/ null,
- /*executorService=*/ null,
SendBufferPool.get(4, 30, 60));
// default page size == 2 MiB
diff --git
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
index daf251047..0985c9b69 100644
---
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
+++
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
@@ -18,7 +18,6 @@
package org.apache.spark.shuffle.celeborn;
import java.io.IOException;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.LongAdder;
import scala.Option;
@@ -70,9 +69,7 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
private final long pushBufferMaxSize;
private final Object globalPushLock = new Object();
- private final boolean pipelined;
- private SortBasedPusher[] pushers = new SortBasedPusher[2];
- private SortBasedPusher currentPusher;
+ private SortBasedPusher pusher;
private long peakMemoryUsedBytes = 0;
private final OpenByteArrayOutputStream serBuffer;
@@ -100,7 +97,6 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
TaskContext taskContext,
CelebornConf conf,
ShuffleClient client,
- ExecutorService executorService,
SendBufferPool sendBufferPool)
throws IOException {
this.mapId = taskContext.partitionId();
@@ -126,50 +122,23 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
tmpRecords = new long[numPartitions];
pushBufferMaxSize = conf.clientPushBufferMaxSize();
- pipelined = conf.clientPushSortPipelineEnabled();
-
- if (pipelined) {
- for (int i = 0; i < pushers.length; i++) {
- pushers[i] =
- new SortBasedPusher(
- taskContext.taskMemoryManager(),
- shuffleClient,
- taskContext,
- shuffleId,
- mapId,
- taskContext.attemptNumber(),
- taskContext.taskAttemptId(),
- numMappers,
- numPartitions,
- conf,
- writeMetrics::incBytesWritten,
- mapStatusLengths,
- conf.clientPushSortMemoryThreshold() / 2,
- globalPushLock,
- executorService,
- sendBufferPool);
- }
- currentPusher = pushers[0];
- } else {
- currentPusher =
- new SortBasedPusher(
- taskContext.taskMemoryManager(),
- shuffleClient,
- taskContext,
- shuffleId,
- mapId,
- taskContext.attemptNumber(),
- taskContext.taskAttemptId(),
- numMappers,
- numPartitions,
- conf,
- writeMetrics::incBytesWritten,
- mapStatusLengths,
- conf.clientPushSortMemoryThreshold(),
- globalPushLock,
- null,
- sendBufferPool);
- }
+
+ pusher =
+ new SortBasedPusher(
+ taskContext.taskMemoryManager(),
+ shuffleClient,
+ taskContext,
+ shuffleId,
+ mapId,
+ taskContext.attemptNumber(),
+ taskContext.taskAttemptId(),
+ numMappers,
+ numPartitions,
+ conf,
+ writeMetrics::incBytesWritten,
+ mapStatusLengths,
+ conf.clientPushSortMemoryThreshold(),
+ sendBufferPool);
}
@Override
@@ -224,12 +193,12 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
pushGiantRecord(partitionId, giantBuffer, serializedRecordSize);
} else {
boolean success =
- currentPusher.insertRecord(
+ pusher.insertRecord(
row.getBaseObject(), row.getBaseOffset(), rowSize,
partitionId, true);
if (!success) {
- pushAndSwitch();
+ doPush();
success =
- currentPusher.insertRecord(
+ pusher.insertRecord(
row.getBaseObject(), row.getBaseOffset(), rowSize,
partitionId, true);
if (!success) {
throw new IOException("Unable to push after switching pusher!");
@@ -240,37 +209,16 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
}
}
- private void pushAndSwitch() throws IOException {
+ private void doPush() throws IOException {
long start = System.nanoTime();
- if (pipelined) {
- currentPusher.triggerPush();
- currentPusher = (currentPusher == pushers[0] ? pushers[1] : pushers[0]);
- currentPusher.waitPushFinish();
- } else {
- currentPusher.pushData();
- }
+ pusher.pushData();
writeMetrics.incWriteTime(System.nanoTime() - start);
}
private void updatePeakMemoryUsed() {
- // sorter can be null if this writer is closed
- if (pipelined) {
- for (int i = 0; i < pushers.length; i++) {
-
- if (pushers[i] != null) {
- long mem = pushers[i].getPeakMemoryUsedBytes();
- if (mem > peakMemoryUsedBytes) {
- peakMemoryUsedBytes = mem;
- }
- }
- }
- } else {
- if (currentPusher != null) {
- long mem = currentPusher.getPeakMemoryUsedBytes();
- if (mem > peakMemoryUsedBytes) {
- peakMemoryUsedBytes = mem;
- }
- }
+ long mem = pusher.getPeakMemoryUsedBytes();
+ if (mem > peakMemoryUsedBytes) {
+ peakMemoryUsedBytes = mem;
}
}
@@ -299,16 +247,16 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
pushGiantRecord(partitionId, serBuffer.getBuf(), serializedRecordSize);
} else {
boolean success =
- currentPusher.insertRecord(
+ pusher.insertRecord(
serBuffer.getBuf(),
Platform.BYTE_ARRAY_OFFSET,
serializedRecordSize,
partitionId,
false);
if (!success) {
- pushAndSwitch();
+ doPush();
success =
- currentPusher.insertRecord(
+ pusher.insertRecord(
serBuffer.getBuf(),
Platform.BYTE_ARRAY_OFFSET,
serializedRecordSize,
@@ -341,23 +289,10 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
}
private void close() throws IOException {
- if (pipelined) {
- logger.info(
- "Memory used {}", Utils.bytesToString((pushers[0].getUsed() +
pushers[1].getUsed())));
- } else {
- logger.info("Memory used {}",
Utils.bytesToString(currentPusher.getUsed()));
- }
+ logger.info("Memory used {}", Utils.bytesToString(pusher.getUsed()));
long pushStartTime = System.nanoTime();
- if (pipelined) {
- for (SortBasedPusher pusher : pushers) {
- pusher.waitPushFinish();
- pusher.pushData();
- pusher.close();
- }
- } else {
- currentPusher.pushData();
- currentPusher.close();
- }
+ pusher.pushData();
+ pusher.close();
writeMetrics.incWriteTime(System.nanoTime() - pushStartTime);
shuffleClient.pushMergedData(shuffleId, mapId,
taskContext.attemptNumber());
@@ -404,9 +339,4 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
shuffleClient.cleanup(shuffleId, mapId, taskContext.attemptNumber());
}
}
-
- public long[] getPartitionLengths() {
- throw new UnsupportedOperationException(
- "Celeborn is not compatible with Spark push mode, please set
spark.shuffle.push.enabled to false");
- }
}
diff --git
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index 5b68db0f8..302e4398a 100644
---
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -21,8 +21,6 @@ import java.io.IOException;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
import scala.Int;
import scala.Option;
@@ -43,7 +41,6 @@ import org.apache.celeborn.client.ShuffleClient;
import org.apache.celeborn.client.security.CryptoUtils;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.protocol.ShuffleMode;
-import org.apache.celeborn.common.util.ThreadUtils;
import org.apache.celeborn.reflect.DynMethods;
public class SparkShuffleManager implements ShuffleManager {
@@ -67,9 +64,6 @@ public class SparkShuffleManager implements ShuffleManager {
ConcurrentHashMap.newKeySet();
private final CelebornShuffleFallbackPolicyRunner fallbackPolicyRunner;
- private final ExecutorService[] asyncPushers;
- private AtomicInteger pusherIdx = new AtomicInteger(0);
-
private long sendBufferPoolCheckInterval;
private long sendBufferPoolExpireTimeout;
@@ -81,15 +75,6 @@ public class SparkShuffleManager implements ShuffleManager {
this.celebornConf = SparkUtils.fromSparkConf(conf);
this.cores = conf.getInt(SparkLauncher.EXECUTOR_CORES, 1);
this.fallbackPolicyRunner = new
CelebornShuffleFallbackPolicyRunner(celebornConf);
- if (ShuffleMode.SORT.equals(celebornConf.shuffleWriterMode())
- && celebornConf.clientPushSortPipelineEnabled()) {
- asyncPushers = new ExecutorService[cores];
- for (int i = 0; i < asyncPushers.length; i++) {
- asyncPushers[i] =
ThreadUtils.newDaemonSingleThreadExecutor("async-pusher-" + i);
- }
- } else {
- asyncPushers = null;
- }
this.sendBufferPoolCheckInterval =
celebornConf.clientPushSendBufferPoolExpireCheckInterval();
this.sendBufferPoolExpireTimeout =
celebornConf.clientPushSendBufferPoolExpireTimeout();
}
@@ -235,8 +220,6 @@ public class SparkShuffleManager implements ShuffleManager {
shuffleIdTracker.track(h.shuffleId(), shuffleId);
if (ShuffleMode.SORT.equals(celebornConf.shuffleWriterMode())) {
- ExecutorService pushThread =
- celebornConf.clientPushSortPipelineEnabled() ? getPusherThread()
: null;
return new SortBasedShuffleWriter<>(
shuffleId,
h.dependency(),
@@ -244,7 +227,6 @@ public class SparkShuffleManager implements ShuffleManager {
context,
celebornConf,
client,
- pushThread,
SendBufferPool.get(cores, sendBufferPoolCheckInterval,
sendBufferPoolExpireTimeout));
} else if (ShuffleMode.HASH.equals(celebornConf.shuffleWriterMode())) {
return new HashBasedShuffleWriter<>(
@@ -287,12 +269,6 @@ public class SparkShuffleManager implements ShuffleManager
{
return _sortShuffleManager.getReader(handle, startPartition, endPartition,
context);
}
- private ExecutorService getPusherThread() {
- ExecutorService pusherThread = asyncPushers[pusherIdx.get() %
asyncPushers.length];
- pusherIdx.incrementAndGet();
- return pusherThread;
- }
-
private int executorCores(SparkConf conf) {
if (Utils.isLocalMaster(conf)) {
// SparkContext.numDriverCores is package private.
diff --git
a/client-spark/spark-2/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriterSuiteJ.java
b/client-spark/spark-2/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriterSuiteJ.java
index 74ebab2b9..b56bda1f8 100644
---
a/client-spark/spark-2/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriterSuiteJ.java
+++
b/client-spark/spark-2/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriterSuiteJ.java
@@ -40,7 +40,6 @@ public class SortBasedShuffleWriterSuiteJ extends
CelebornShuffleWriterSuiteBase
context,
conf,
client,
- null,
SendBufferPool.get(4, 30, 60));
}
}
diff --git
a/client-spark/spark-2/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleManagerSuite.scala
b/client-spark/spark-2/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleManagerSuite.scala
index 36876d6bd..cb6c3e0ab 100644
---
a/client-spark/spark-2/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleManagerSuite.scala
+++
b/client-spark/spark-2/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleManagerSuite.scala
@@ -38,7 +38,6 @@ class SparkShuffleManagerSuite extends Logging {
.set(s"spark.${CelebornConf.MASTER_ENDPOINTS.key}", "localhost:9097")
.set(s"spark.${CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key}", "false")
.set("spark.shuffle.service.enabled", "false")
- .set("spark.shuffle.useOldFetchProtocol", "true")
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
.setAppName("test")
val sc = new SparkContext(conf)
@@ -58,7 +57,6 @@ class SparkShuffleManagerSuite extends Logging {
.set(s"spark.${CelebornConf.MASTER_ENDPOINTS.key}", "localhost:9097")
.set(s"spark.${CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key}", "false")
.set("spark.shuffle.service.enabled", "false")
- .set("spark.shuffle.useOldFetchProtocol", "true")
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
.setAppName("test")
val sc = new SparkContext(conf)
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
index a2f3cdfb7..2a755a70c 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
@@ -18,7 +18,6 @@
package org.apache.spark.shuffle.celeborn;
import java.io.IOException;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.LongAdder;
import scala.Option;
@@ -69,11 +68,8 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
private final int numPartitions;
private final long pushBufferMaxSize;
- // this lock is shared between different SortBasedPushers to synchronize
pushData
- private final Object sharedPushLock = new Object();
- private final boolean pipelined;
- private final SortBasedPusher[] pushers = new SortBasedPusher[2];
- private SortBasedPusher currentPusher;
+
+ private final SortBasedPusher pusher;
private long peakMemoryUsedBytes = 0;
private final OpenByteArrayOutputStream serBuffer;
@@ -101,7 +97,6 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
CelebornConf conf,
ShuffleClient client,
ShuffleWriteMetricsReporter metrics,
- ExecutorService executorService,
SendBufferPool sendBufferPool)
throws IOException {
this.mapId = taskContext.partitionId();
@@ -126,50 +121,23 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
tmpRecords = new long[numPartitions];
pushBufferMaxSize = conf.clientPushBufferMaxSize();
- pipelined = conf.clientPushSortPipelineEnabled();
-
- if (pipelined) {
- for (int i = 0; i < pushers.length; i++) {
- pushers[i] =
- new SortBasedPusher(
- taskContext.taskMemoryManager(),
- shuffleClient,
- taskContext,
- shuffleId,
- mapId,
- taskContext.attemptNumber(),
- taskContext.taskAttemptId(),
- numMappers,
- numPartitions,
- conf,
- writeMetrics::incBytesWritten,
- mapStatusLengths,
- conf.clientPushSortMemoryThreshold() / 2,
- sharedPushLock,
- executorService,
- sendBufferPool);
- }
- currentPusher = pushers[0];
- } else {
- currentPusher =
- new SortBasedPusher(
- taskContext.taskMemoryManager(),
- shuffleClient,
- taskContext,
- shuffleId,
- mapId,
- taskContext.attemptNumber(),
- taskContext.taskAttemptId(),
- numMappers,
- numPartitions,
- conf,
- writeMetrics::incBytesWritten,
- mapStatusLengths,
- conf.clientPushSortMemoryThreshold(),
- sharedPushLock,
- null,
- sendBufferPool);
- }
+
+ pusher =
+ new SortBasedPusher(
+ taskContext.taskMemoryManager(),
+ shuffleClient,
+ taskContext,
+ shuffleId,
+ mapId,
+ taskContext.attemptNumber(),
+ taskContext.taskAttemptId(),
+ numMappers,
+ numPartitions,
+ conf,
+ writeMetrics::incBytesWritten,
+ mapStatusLengths,
+ conf.clientPushSortMemoryThreshold(),
+ sendBufferPool);
}
public SortBasedShuffleWriter(
@@ -178,7 +146,6 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
CelebornConf conf,
ShuffleClient client,
ShuffleWriteMetricsReporter metrics,
- ExecutorService executorService,
SendBufferPool sendBufferPool)
throws IOException {
this(
@@ -189,28 +156,13 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
conf,
client,
metrics,
- executorService,
sendBufferPool);
}
private void updatePeakMemoryUsed() {
- // sorter can be null if this writer is closed
- if (pipelined) {
- for (SortBasedPusher pusher : pushers) {
- if (pusher != null) {
- long mem = pusher.getPeakMemoryUsedBytes();
- if (mem > peakMemoryUsedBytes) {
- peakMemoryUsedBytes = mem;
- }
- }
- }
- } else {
- if (currentPusher != null) {
- long mem = currentPusher.getPeakMemoryUsedBytes();
- if (mem > peakMemoryUsedBytes) {
- peakMemoryUsedBytes = mem;
- }
- }
+ long mem = pusher.getPeakMemoryUsedBytes();
+ if (mem > peakMemoryUsedBytes) {
+ peakMemoryUsedBytes = mem;
}
}
@@ -275,12 +227,12 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
pushGiantRecord(partitionId, giantBuffer, serializedRecordSize);
} else {
boolean success =
- currentPusher.insertRecord(
+ pusher.insertRecord(
row.getBaseObject(), row.getBaseOffset(), rowSize,
partitionId, true);
if (!success) {
- pushAndSwitch();
+ doPush();
success =
- currentPusher.insertRecord(
+ pusher.insertRecord(
row.getBaseObject(), row.getBaseOffset(), rowSize,
partitionId, true);
if (!success) {
throw new CelebornIOException("Unable to push after switching
pusher!");
@@ -291,15 +243,9 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
}
}
- private void pushAndSwitch() throws IOException {
+ private void doPush() throws IOException {
long start = System.nanoTime();
- if (pipelined) {
- currentPusher.triggerPush();
- currentPusher = (currentPusher == pushers[0] ? pushers[1] : pushers[0]);
- currentPusher.waitPushFinish();
- } else {
- currentPusher.pushData();
- }
+ pusher.pushData();
writeMetrics.incWriteTime(System.nanoTime() - start);
}
@@ -322,16 +268,16 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
pushGiantRecord(partitionId, serBuffer.getBuf(), serializedRecordSize);
} else {
boolean success =
- currentPusher.insertRecord(
+ pusher.insertRecord(
serBuffer.getBuf(),
Platform.BYTE_ARRAY_OFFSET,
serializedRecordSize,
partitionId,
false);
if (!success) {
- pushAndSwitch();
+ doPush();
success =
- currentPusher.insertRecord(
+ pusher.insertRecord(
serBuffer.getBuf(),
Platform.BYTE_ARRAY_OFFSET,
serializedRecordSize,
@@ -364,23 +310,10 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
}
private void close() throws IOException {
- if (pipelined) {
- logger.info(
- "Memory used {}", Utils.bytesToString((pushers[0].getUsed() +
pushers[1].getUsed())));
- } else {
- logger.info("Memory used {}",
Utils.bytesToString(currentPusher.getUsed()));
- }
+ logger.info("Memory used {}", Utils.bytesToString(pusher.getUsed()));
long pushStartTime = System.nanoTime();
- if (pipelined) {
- for (SortBasedPusher pusher : pushers) {
- pusher.waitPushFinish();
- pusher.pushData();
- pusher.close();
- }
- } else {
- currentPusher.pushData();
- currentPusher.close();
- }
+ pusher.pushData();
+ pusher.close();
shuffleClient.pushMergedData(shuffleId, mapId,
taskContext.attemptNumber());
writeMetrics.incWriteTime(System.nanoTime() - pushStartTime);
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index f7c0e5985..85aa4d6c1 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -21,8 +21,6 @@ import java.io.IOException;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.spark.*;
import org.apache.spark.internal.config.package$;
@@ -41,7 +39,6 @@ import org.apache.celeborn.client.ShuffleClient;
import org.apache.celeborn.client.security.CryptoUtils;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.protocol.ShuffleMode;
-import org.apache.celeborn.common.util.ThreadUtils;
import org.apache.celeborn.reflect.DynMethods;
/**
@@ -90,9 +87,6 @@ public class SparkShuffleManager implements ShuffleManager {
ConcurrentHashMap.newKeySet();
private final CelebornShuffleFallbackPolicyRunner fallbackPolicyRunner;
- private final ExecutorService[] asyncPushers;
- private AtomicInteger pusherIdx = new AtomicInteger(0);
-
private long sendBufferPoolCheckInterval;
private long sendBufferPoolExpireTimeout;
@@ -110,16 +104,6 @@ public class SparkShuffleManager implements ShuffleManager
{
this.celebornConf = SparkUtils.fromSparkConf(conf);
this.cores = executorCores(conf);
this.fallbackPolicyRunner = new
CelebornShuffleFallbackPolicyRunner(celebornConf);
- if ((ShuffleMode.SORT.equals(celebornConf.shuffleWriterMode())
- || celebornConf.dynamicWriteModeEnabled())
- && celebornConf.clientPushSortPipelineEnabled()) {
- asyncPushers = new ExecutorService[cores];
- for (int i = 0; i < asyncPushers.length; i++) {
- asyncPushers[i] =
ThreadUtils.newDaemonSingleThreadExecutor("async-pusher-" + i);
- }
- } else {
- asyncPushers = null;
- }
this.sendBufferPoolCheckInterval =
celebornConf.clientPushSendBufferPoolExpireCheckInterval();
this.sendBufferPoolExpireTimeout =
celebornConf.clientPushSendBufferPoolExpireTimeout();
}
@@ -305,8 +289,6 @@ public class SparkShuffleManager implements ShuffleManager {
}
if (ShuffleMode.SORT.equals(shuffleMode)) {
- ExecutorService pushThread =
- celebornConf.clientPushSortPipelineEnabled() ? getPusherThread()
: null;
return new SortBasedShuffleWriter<>(
shuffleId,
h.dependency(),
@@ -315,7 +297,6 @@ public class SparkShuffleManager implements ShuffleManager {
celebornConf,
shuffleClient,
metrics,
- pushThread,
SendBufferPool.get(cores, sendBufferPoolCheckInterval,
sendBufferPoolExpireTimeout));
} else if (ShuffleMode.HASH.equals(shuffleMode)) {
SendBufferPool pool =
@@ -448,12 +429,6 @@ public class SparkShuffleManager implements ShuffleManager
{
}
}
- private ExecutorService getPusherThread() {
- ExecutorService pusherThread = asyncPushers[pusherIdx.get() %
asyncPushers.length];
- pusherIdx.incrementAndGet();
- return pusherThread;
- }
-
private int executorCores(SparkConf conf) {
if (Utils.isLocalMaster(conf)) {
// SparkContext.numDriverCores is package private.
diff --git
a/client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriterSuiteJ.java
b/client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriterSuiteJ.java
index 2b1ea681b..1764f727b 100644
---
a/client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriterSuiteJ.java
+++
b/client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriterSuiteJ.java
@@ -36,6 +36,6 @@ public class SortBasedShuffleWriterSuiteJ extends
CelebornShuffleWriterSuiteBase
ShuffleWriteMetricsReporter metrics)
throws IOException {
return new SortBasedShuffleWriter<Integer, String, String>(
- handle, context, conf, client, metrics, null, SendBufferPool.get(4,
30, 60));
+ handle, context, conf, client, metrics, SendBufferPool.get(4, 30, 60));
}
}
diff --git
a/common/src/main/java/org/apache/celeborn/common/util/JavaUtils.java
b/common/src/main/java/org/apache/celeborn/common/util/JavaUtils.java
index 2580ac300..df032a5cb 100644
--- a/common/src/main/java/org/apache/celeborn/common/util/JavaUtils.java
+++ b/common/src/main/java/org/apache/celeborn/common/util/JavaUtils.java
@@ -49,12 +49,6 @@ import org.apache.celeborn.common.network.util.ByteUnit;
public class JavaUtils {
private static final Logger logger =
LoggerFactory.getLogger(JavaUtils.class);
- /**
- * Define a default value for driver memory here since this value is
referenced across the code
- * base and nearly all files already use Utils.scala
- */
- public static final long DEFAULT_DRIVER_MEM_MB = 1024;
-
/** Closes the given object, ignoring IOExceptions. */
public static void closeQuietly(Closeable closeable) {
try {
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 3e779c702..589a8ab66 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -828,7 +828,6 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def clientPushReviveInterval: Long = get(CLIENT_PUSH_REVIVE_INTERVAL)
def clientPushReviveBatchSize: Int = get(CLIENT_PUSH_REVIVE_BATCHSIZE)
def clientPushSortMemoryThreshold: Long =
get(CLIENT_PUSH_SORT_MEMORY_THRESHOLD)
- def clientPushSortPipelineEnabled: Boolean =
get(CLIENT_PUSH_SORT_PIPELINE_ENABLED)
def clientPushSortRandomizePartitionIdEnabled: Boolean =
get(CLIENT_PUSH_SORT_RANDOMIZE_PARTITION_ENABLED)
def clientPushRetryThreads: Int = get(CLIENT_PUSH_RETRY_THREADS)
@@ -3824,24 +3823,11 @@ object CelebornConf extends Logging {
.longConf
.createWithDefault(Int.MaxValue)
- val CLIENT_PUSH_SORT_PIPELINE_ENABLED: ConfigEntry[Boolean] =
- buildConf("celeborn.client.spark.push.sort.pipeline.enabled")
- .withAlternative("celeborn.push.sort.pipeline.enabled")
- .categories("client")
- .doc("Whether to enable pipelining for sort based shuffle writer. If
true, double buffering" +
- " will be used to pipeline push")
- .version("0.3.0")
- .booleanConf
- .createWithDefault(false)
-
val CLIENT_PUSH_SORT_MEMORY_THRESHOLD: ConfigEntry[Long] =
buildConf("celeborn.client.spark.push.sort.memory.threshold")
.withAlternative("celeborn.push.sortMemory.threshold")
.categories("client")
- .doc("When SortBasedPusher use memory over the threshold, will trigger
push data. If the" +
- s" pipeline push feature is enabled
(`${CLIENT_PUSH_SORT_PIPELINE_ENABLED.key}=true`)," +
- " the SortBasedPusher will trigger a data push when the memory usage
exceeds half of the" +
- " threshold(by default, 32m).")
+ .doc("When SortBasedPusher use memory over the threshold, will trigger
push data.")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("64m")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 70f5e54b0..95965b2aa 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -107,8 +107,7 @@ license: |
| celeborn.client.spark.io.encryption.key | | io encryption key | 0.4.0 |
| celeborn.client.spark.push.dynamicWriteMode.enabled | false | Whether to
dynamically switch push write mode based on conditions.If true, shuffle mode
will be only determined by partition count | 0.5.0 |
| celeborn.client.spark.push.dynamicWriteMode.partitionNum.threshold | 2000 |
Threshold of shuffle partition number for dynamically switching push writer
mode. When the shuffle partition number is greater than this value, use the
sort-based shuffle writer for memory efficiency; otherwise use the hash-based
shuffle writer for speed. This configuration only takes effect when
celeborn.client.spark.push.dynamicWriteMode.enabled is true. | 0.5.0 |
-| celeborn.client.spark.push.sort.memory.threshold | 64m | When
SortBasedPusher use memory over the threshold, will trigger push data. If the
pipeline push feature is enabled
(`celeborn.client.spark.push.sort.pipeline.enabled=true`), the SortBasedPusher
will trigger a data push when the memory usage exceeds half of the threshold(by
default, 32m). | 0.3.0 |
-| celeborn.client.spark.push.sort.pipeline.enabled | false | Whether to enable
pipelining for sort based shuffle writer. If true, double buffering will be
used to pipeline push | 0.3.0 |
+| celeborn.client.spark.push.sort.memory.threshold | 64m | When
SortBasedPusher use memory over the threshold, will trigger push data. | 0.3.0
|
| celeborn.client.spark.push.unsafeRow.fastWrite.enabled | true | This is
Celeborn's optimization on UnsafeRow for Spark and it's true by default. If you
have changed UnsafeRow's memory layout set this to false. | 0.2.2 |
| celeborn.client.spark.shuffle.forceFallback.enabled | false | Whether force
fallback shuffle to Spark's default. | 0.3.0 |
| celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold |
2147483647 | Celeborn will only accept shuffle of partition number lower than
this configuration value. | 0.3.0 |
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornPipelineSortSuite.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornPipelineSortSuite.scala
deleted file mode 100644
index 5288ac7ab..000000000
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornPipelineSortSuite.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.celeborn.tests.spark
-
-import org.apache.spark.SparkConf
-import org.apache.spark.sql.SparkSession
-import org.scalatest.BeforeAndAfterEach
-import org.scalatest.funsuite.AnyFunSuite
-
-import org.apache.celeborn.client.ShuffleClient
-import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.protocol.ShuffleMode
-
-class CelebornPipelineSortSuite extends AnyFunSuite
- with SparkTestBase
- with BeforeAndAfterEach {
-
- override def beforeEach(): Unit = {
- ShuffleClient.reset()
- }
-
- override def afterEach(): Unit = {
- System.gc()
- }
-
- test("celeborn spark integration test - pipeline sort") {
- val sparkConf = new
SparkConf().setAppName("celeborn-demo").setMaster("local[2]")
- .set(s"spark.${CelebornConf.CLIENT_PUSH_SORT_PIPELINE_ENABLED.key}",
"true")
-
.set(s"spark.${CelebornConf.CLIENT_PUSH_SORT_RANDOMIZE_PARTITION_ENABLED.key}",
"true")
- val ss = SparkSession.builder()
- .config(updateSparkConf(sparkConf, ShuffleMode.SORT))
- .getOrCreate()
- val value = Range(1, 10000).mkString(",")
- val tuples = ss.sparkContext.parallelize(1 to 10000, 2)
- .map { i => (i, value) }.groupByKey(16).collect()
-
- // verify result
- assert(tuples.length == 10000)
- for (elem <- tuples) {
- assert(elem._2.mkString(",").equals(value))
- }
-
- ss.stop()
- }
-}
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornNonPipelineSortSuite.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornSortSuite.scala
similarity index 94%
rename from
tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornNonPipelineSortSuite.scala
rename to
tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornSortSuite.scala
index 0da01ee58..e4b2dd574 100644
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornNonPipelineSortSuite.scala
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornSortSuite.scala
@@ -26,7 +26,7 @@ import org.apache.celeborn.client.ShuffleClient
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.protocol.ShuffleMode
-class CelebornNonPipelineSortSuite extends AnyFunSuite
+class CelebornSortSuite extends AnyFunSuite
with SparkTestBase
with BeforeAndAfterEach {
@@ -40,7 +40,6 @@ class CelebornNonPipelineSortSuite extends AnyFunSuite
test("celeborn spark integration test - non pipeline sort") {
val sparkConf = new
SparkConf().setAppName("celeborn-demo").setMaster("local[2]")
- .set(s"spark.${CelebornConf.CLIENT_PUSH_SORT_PIPELINE_ENABLED.key}",
"false")
.set(s"spark.${CelebornConf.CLIENT_PUSH_SORT_RANDOMIZE_PARTITION_ENABLED.key}",
"false")
val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
val combineResult = combine(sparkSession)