This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.4 by this push:
new 0ece2f5b5 [CELEBORN-891] Remove pipeline feature for sort based writer
0ece2f5b5 is described below
commit 0ece2f5b59eb25c916de717a6cebbff188c0153c
Author: Cheng Pan <[email protected]>
AuthorDate: Mon Jan 1 10:42:17 2024 +0800
[CELEBORN-891] Remove pipeline feature for sort based writer
Remove pipeline feature for sort based writer
The pipeline feature is added as part of CELEBORN-295, for performance.
Eventually, an unresolvable issue that would crash the JVM was identified in
https://github.com/apache/incubator-celeborn/pull/1807, and after discussion,
we decided to delete this feature.
No, the pipeline feature is disabled by default, there are no changes to
users who use the default settings.
Pass GA.
Closes #2196 from pan3793/CELEBORN-891.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../spark/shuffle/celeborn/SortBasedPusher.java | 158 +++++++--------------
.../shuffle/celeborn/SortBasedPusherSuiteJ.java | 2 -
.../shuffle/celeborn/SortBasedShuffleWriter.java | 136 +++++-------------
.../shuffle/celeborn/SparkShuffleManager.java | 24 ----
.../celeborn/SortBasedShuffleWriterSuiteJ.java | 1 -
.../celeborn/CelebornShuffleManagerSuite.scala | 2 -
.../shuffle/celeborn/SortBasedShuffleWriter.java | 133 +++++------------
.../shuffle/celeborn/SparkShuffleManager.java | 24 ----
.../celeborn/SortBasedShuffleWriterSuiteJ.java | 2 +-
.../org/apache/celeborn/common/util/JavaUtils.java | 6 -
.../org/apache/celeborn/common/CelebornConf.scala | 16 +--
docs/configuration/client.md | 3 +-
.../tests/spark/CelebornPipelineSortSuite.scala | 60 --------
...lineSortSuite.scala => CelebornSortSuite.scala} | 3 +-
14 files changed, 124 insertions(+), 446 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
index aff8e7ce3..3b051c3e7 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
@@ -19,7 +19,6 @@ package org.apache.spark.shuffle.celeborn;
import java.io.IOException;
import java.util.LinkedList;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer;
@@ -69,12 +68,8 @@ public class SortBasedPusher extends MemoryConsumer {
private final int numPartitions;
private final Consumer<Integer> afterPush;
private final LongAdder[] mapStatusLengths;
- // this lock is shared between different SortBasedPushers to synchronize
pushData
- private final Object sharedPushLock;
- private volatile boolean asyncPushing = false;
private int[] shuffledPartitions = null;
private int[] inversedShuffledPartitions = null;
- private final ExecutorService executorService;
private final SendBufferPool sendBufferPool;
public SortBasedPusher(
@@ -91,8 +86,6 @@ public class SortBasedPusher extends MemoryConsumer {
Consumer<Integer> afterPush,
LongAdder[] mapStatusLengths,
long pushSortMemoryThreshold,
- Object sharedPushLock,
- ExecutorService executorService,
SendBufferPool sendBufferPool) {
super(
memoryManager,
@@ -144,83 +137,74 @@ public class SortBasedPusher extends MemoryConsumer {
int initialSize = Math.min((int) pushSortMemoryThreshold / 8, 1024 * 1024);
this.inMemSorter = new ShuffleInMemorySorter(this, initialSize);
this.peakMemoryUsedBytes = getMemoryUsage();
- this.sharedPushLock = sharedPushLock;
- this.executorService = executorService;
}
public long pushData() throws IOException {
- // pushData should be synchronized between pushers
- synchronized (sharedPushLock) {
- final ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords =
- inMemSorter.getSortedIterator();
-
- byte[] dataBuf = new byte[pushBufferMaxSize];
- int offSet = 0;
- int currentPartition = -1;
- while (sortedRecords.hasNext()) {
- sortedRecords.loadNext();
- final int partition =
- shuffledPartitions != null
- ?
inversedShuffledPartitions[sortedRecords.packedRecordPointer.getPartitionId()]
- : sortedRecords.packedRecordPointer.getPartitionId();
- if (partition != currentPartition) {
- if (currentPartition == -1) {
- currentPartition = partition;
- } else {
- int bytesWritten =
- shuffleClient.mergeData(
- shuffleId,
- mapId,
- attemptNumber,
- currentPartition,
- dataBuf,
- 0,
- offSet,
- numMappers,
- numPartitions);
- mapStatusLengths[currentPartition].add(bytesWritten);
- afterPush.accept(bytesWritten);
- currentPartition = partition;
- offSet = 0;
- }
- }
- final long recordPointer =
sortedRecords.packedRecordPointer.getRecordPointer();
- final Object recordPage = taskMemoryManager.getPage(recordPointer);
- final long recordOffsetInPage =
taskMemoryManager.getOffsetInPage(recordPointer);
- int recordSize = UnsafeAlignedOffset.getSize(recordPage,
recordOffsetInPage);
-
- if (offSet + recordSize > dataBuf.length) {
- try {
- dataPusher.addTask(partition, dataBuf, offSet);
- } catch (InterruptedException e) {
- TaskInterruptedHelper.throwTaskKillException();
- }
+ final ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords =
+ inMemSorter.getSortedIterator();
+
+ byte[] dataBuf = new byte[pushBufferMaxSize];
+ int offSet = 0;
+ int currentPartition = -1;
+ while (sortedRecords.hasNext()) {
+ sortedRecords.loadNext();
+ final int partition =
+ shuffledPartitions != null
+ ?
inversedShuffledPartitions[sortedRecords.packedRecordPointer.getPartitionId()]
+ : sortedRecords.packedRecordPointer.getPartitionId();
+ if (partition != currentPartition) {
+ if (currentPartition == -1) {
+ currentPartition = partition;
+ } else {
+ int bytesWritten =
+ shuffleClient.mergeData(
+ shuffleId,
+ mapId,
+ attemptNumber,
+ currentPartition,
+ dataBuf,
+ 0,
+ offSet,
+ numMappers,
+ numPartitions);
+ mapStatusLengths[currentPartition].add(bytesWritten);
+ afterPush.accept(bytesWritten);
+ currentPartition = partition;
offSet = 0;
}
-
- long recordReadPosition = recordOffsetInPage + UAO_SIZE;
- Platform.copyMemory(
- recordPage,
- recordReadPosition,
- dataBuf,
- Platform.BYTE_ARRAY_OFFSET + offSet,
- recordSize);
- offSet += recordSize;
}
- if (offSet > 0) {
+ final long recordPointer =
sortedRecords.packedRecordPointer.getRecordPointer();
+ final Object recordPage = taskMemoryManager.getPage(recordPointer);
+ final long recordOffsetInPage =
taskMemoryManager.getOffsetInPage(recordPointer);
+ int recordSize = UnsafeAlignedOffset.getSize(recordPage,
recordOffsetInPage);
+
+ if (offSet + recordSize > dataBuf.length) {
try {
- dataPusher.addTask(currentPartition, dataBuf, offSet);
+ dataPusher.addTask(partition, dataBuf, offSet);
} catch (InterruptedException e) {
TaskInterruptedHelper.throwTaskKillException();
}
+ offSet = 0;
}
- long freedBytes = freeMemory();
- inMemSorter.freeMemory();
- taskContext.taskMetrics().incMemoryBytesSpilled(freedBytes);
-
- return freedBytes;
+ long recordReadPosition = recordOffsetInPage + UAO_SIZE;
+ Platform.copyMemory(
+ recordPage, recordReadPosition, dataBuf, Platform.BYTE_ARRAY_OFFSET
+ offSet, recordSize);
+ offSet += recordSize;
}
+ if (offSet > 0) {
+ try {
+ dataPusher.addTask(currentPartition, dataBuf, offSet);
+ } catch (InterruptedException e) {
+ TaskInterruptedHelper.throwTaskKillException();
+ }
+ }
+
+ long freedBytes = freeMemory();
+ inMemSorter.freeMemory();
+ taskContext.taskMetrics().incMemoryBytesSpilled(freedBytes);
+
+ return freedBytes;
}
public boolean insertRecord(
@@ -271,38 +255,6 @@ public class SortBasedPusher extends MemoryConsumer {
return true;
}
- public void triggerPush() throws IOException {
- asyncPushing = true;
- dataPusher.checkException();
- executorService.submit(
- () -> {
- try {
- pushData();
- asyncPushing = false;
- } catch (IOException ie) {
- dataPusher.setException(ie);
- }
- });
- }
-
- /**
- * Since this method and pushData() are synchronized When this method
returns, it means pushData
- * has released lock
- *
- * @throws IOException
- */
- public void waitPushFinish() throws IOException {
- dataPusher.checkException();
- while (asyncPushing) {
- try {
- Thread.sleep(50);
- } catch (InterruptedException e) {
- logger.error("SortBasedPusher thread interrupted while waiting push
finished.");
- TaskInterruptedHelper.throwTaskKillException();
- }
- }
- }
-
private void growPointerArrayIfNecessary() throws IOException {
assert (inMemSorter != null);
if (!inMemSorter.hasSpaceForAnotherRecord()) {
diff --git
a/client-spark/common/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedPusherSuiteJ.java
b/client-spark/common/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedPusherSuiteJ.java
index bc86f9db7..0962c98c4 100644
---
a/client-spark/common/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedPusherSuiteJ.java
+++
b/client-spark/common/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedPusherSuiteJ.java
@@ -97,8 +97,6 @@ public class SortBasedPusherSuiteJ {
/*afterPush=*/ null,
/*mapStatusLengths=*/ null,
/*pushSortMemoryThreshold=*/ Utils.byteStringAsBytes("1m"),
- /*sharedPushLock=*/ null,
- /*executorService=*/ null,
SendBufferPool.get(4, 30, 60));
// default page size == 2 MiB
diff --git
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
index 988cdc090..0985c9b69 100644
---
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
+++
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
@@ -18,7 +18,6 @@
package org.apache.spark.shuffle.celeborn;
import java.io.IOException;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.LongAdder;
import scala.Option;
@@ -70,9 +69,7 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
private final long pushBufferMaxSize;
private final Object globalPushLock = new Object();
- private final boolean pipelined;
- private SortBasedPusher[] pushers = new SortBasedPusher[2];
- private SortBasedPusher currentPusher;
+ private SortBasedPusher pusher;
private long peakMemoryUsedBytes = 0;
private final OpenByteArrayOutputStream serBuffer;
@@ -100,7 +97,6 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
TaskContext taskContext,
CelebornConf conf,
ShuffleClient client,
- ExecutorService executorService,
SendBufferPool sendBufferPool)
throws IOException {
this.mapId = taskContext.partitionId();
@@ -126,50 +122,23 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
tmpRecords = new long[numPartitions];
pushBufferMaxSize = conf.clientPushBufferMaxSize();
- pipelined = conf.clientPushSortPipelineEnabled();
-
- if (pipelined) {
- for (int i = 0; i < pushers.length; i++) {
- pushers[i] =
- new SortBasedPusher(
- taskContext.taskMemoryManager(),
- shuffleClient,
- taskContext,
- shuffleId,
- mapId,
- taskContext.attemptNumber(),
- taskContext.taskAttemptId(),
- numMappers,
- numPartitions,
- conf,
- writeMetrics::incBytesWritten,
- mapStatusLengths,
- conf.clientPushSortMemoryThreshold() / 2,
- globalPushLock,
- executorService,
- sendBufferPool);
- }
- currentPusher = pushers[0];
- } else {
- currentPusher =
- new SortBasedPusher(
- taskContext.taskMemoryManager(),
- shuffleClient,
- taskContext,
- shuffleId,
- mapId,
- taskContext.attemptNumber(),
- taskContext.taskAttemptId(),
- numMappers,
- numPartitions,
- conf,
- writeMetrics::incBytesWritten,
- mapStatusLengths,
- conf.clientPushSortMemoryThreshold(),
- globalPushLock,
- null,
- sendBufferPool);
- }
+
+ pusher =
+ new SortBasedPusher(
+ taskContext.taskMemoryManager(),
+ shuffleClient,
+ taskContext,
+ shuffleId,
+ mapId,
+ taskContext.attemptNumber(),
+ taskContext.taskAttemptId(),
+ numMappers,
+ numPartitions,
+ conf,
+ writeMetrics::incBytesWritten,
+ mapStatusLengths,
+ conf.clientPushSortMemoryThreshold(),
+ sendBufferPool);
}
@Override
@@ -224,12 +193,12 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
pushGiantRecord(partitionId, giantBuffer, serializedRecordSize);
} else {
boolean success =
- currentPusher.insertRecord(
+ pusher.insertRecord(
row.getBaseObject(), row.getBaseOffset(), rowSize,
partitionId, true);
if (!success) {
- pushAndSwitch();
+ doPush();
success =
- currentPusher.insertRecord(
+ pusher.insertRecord(
row.getBaseObject(), row.getBaseOffset(), rowSize,
partitionId, true);
if (!success) {
throw new IOException("Unable to push after switching pusher!");
@@ -240,37 +209,16 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
}
}
- private void pushAndSwitch() throws IOException {
+ private void doPush() throws IOException {
long start = System.nanoTime();
- if (pipelined) {
- currentPusher.triggerPush();
- currentPusher = (currentPusher == pushers[0] ? pushers[1] : pushers[0]);
- currentPusher.waitPushFinish();
- } else {
- currentPusher.pushData();
- }
+ pusher.pushData();
writeMetrics.incWriteTime(System.nanoTime() - start);
}
private void updatePeakMemoryUsed() {
- // sorter can be null if this writer is closed
- if (pipelined) {
- for (int i = 0; i < pushers.length; i++) {
-
- if (pushers[i] != null) {
- long mem = pushers[i].getPeakMemoryUsedBytes();
- if (mem > peakMemoryUsedBytes) {
- peakMemoryUsedBytes = mem;
- }
- }
- }
- } else {
- if (currentPusher != null) {
- long mem = currentPusher.getPeakMemoryUsedBytes();
- if (mem > peakMemoryUsedBytes) {
- peakMemoryUsedBytes = mem;
- }
- }
+ long mem = pusher.getPeakMemoryUsedBytes();
+ if (mem > peakMemoryUsedBytes) {
+ peakMemoryUsedBytes = mem;
}
}
@@ -299,16 +247,16 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
pushGiantRecord(partitionId, serBuffer.getBuf(), serializedRecordSize);
} else {
boolean success =
- currentPusher.insertRecord(
+ pusher.insertRecord(
serBuffer.getBuf(),
Platform.BYTE_ARRAY_OFFSET,
serializedRecordSize,
partitionId,
false);
if (!success) {
- pushAndSwitch();
+ doPush();
success =
- currentPusher.insertRecord(
+ pusher.insertRecord(
serBuffer.getBuf(),
Platform.BYTE_ARRAY_OFFSET,
serializedRecordSize,
@@ -341,23 +289,10 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
}
private void close() throws IOException {
- if (pipelined) {
- logger.info(
- "Memory used {}", Utils.bytesToString((pushers[0].getUsed() +
pushers[1].getUsed())));
- } else {
- logger.info("Memory used {}",
Utils.bytesToString(currentPusher.getUsed()));
- }
+ logger.info("Memory used {}", Utils.bytesToString(pusher.getUsed()));
long pushStartTime = System.nanoTime();
- if (pipelined) {
- for (SortBasedPusher pusher : pushers) {
- pusher.waitPushFinish();
- pusher.pushData();
- pusher.close();
- }
- } else {
- currentPusher.pushData();
- currentPusher.close();
- }
+ pusher.pushData();
+ pusher.close();
writeMetrics.incWriteTime(System.nanoTime() - pushStartTime);
shuffleClient.pushMergedData(shuffleId, mapId,
taskContext.attemptNumber());
@@ -404,11 +339,4 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
shuffleClient.cleanup(shuffleId, mapId, taskContext.attemptNumber());
}
}
-
- // Added in SPARK-32917, for Spark 3.2 and above
- @SuppressWarnings("MissingOverride")
- public long[] getPartitionLengths() {
- throw new UnsupportedOperationException(
- "Celeborn is not compatible with Spark push mode, please set
spark.shuffle.push.enabled to false");
- }
}
diff --git
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index 05f4bf733..cda992b29 100644
---
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -19,8 +19,6 @@ package org.apache.spark.shuffle.celeborn;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
import scala.Int;
@@ -37,7 +35,6 @@ import org.apache.celeborn.client.LifecycleManager;
import org.apache.celeborn.client.ShuffleClient;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.protocol.ShuffleMode;
-import org.apache.celeborn.common.util.ThreadUtils;
import org.apache.celeborn.reflect.DynMethods;
public class SparkShuffleManager implements ShuffleManager {
@@ -61,9 +58,6 @@ public class SparkShuffleManager implements ShuffleManager {
ConcurrentHashMap.newKeySet();
private final CelebornShuffleFallbackPolicyRunner fallbackPolicyRunner;
- private final ExecutorService[] asyncPushers;
- private AtomicInteger pusherIdx = new AtomicInteger(0);
-
private long sendBufferPoolCheckInterval;
private long sendBufferPoolExpireTimeout;
@@ -75,15 +69,6 @@ public class SparkShuffleManager implements ShuffleManager {
this.celebornConf = SparkUtils.fromSparkConf(conf);
this.cores = conf.getInt(SparkLauncher.EXECUTOR_CORES, 1);
this.fallbackPolicyRunner = new
CelebornShuffleFallbackPolicyRunner(celebornConf);
- if (ShuffleMode.SORT.equals(celebornConf.shuffleWriterMode())
- && celebornConf.clientPushSortPipelineEnabled()) {
- asyncPushers = new ExecutorService[cores];
- for (int i = 0; i < asyncPushers.length; i++) {
- asyncPushers[i] =
ThreadUtils.newDaemonSingleThreadExecutor("async-pusher-" + i);
- }
- } else {
- asyncPushers = null;
- }
this.sendBufferPoolCheckInterval =
celebornConf.clientPushSendBufferPoolExpireCheckInterval();
this.sendBufferPoolExpireTimeout =
celebornConf.clientPushSendBufferPoolExpireTimeout();
}
@@ -206,8 +191,6 @@ public class SparkShuffleManager implements ShuffleManager {
shuffleIdTracker.track(h.shuffleId(), shuffleId);
if (ShuffleMode.SORT.equals(celebornConf.shuffleWriterMode())) {
- ExecutorService pushThread =
- celebornConf.clientPushSortPipelineEnabled() ? getPusherThread()
: null;
return new SortBasedShuffleWriter<>(
shuffleId,
h.dependency(),
@@ -215,7 +198,6 @@ public class SparkShuffleManager implements ShuffleManager {
context,
celebornConf,
client,
- pushThread,
SendBufferPool.get(cores, sendBufferPoolCheckInterval,
sendBufferPoolExpireTimeout));
} else if (ShuffleMode.HASH.equals(celebornConf.shuffleWriterMode())) {
return new HashBasedShuffleWriter<>(
@@ -258,12 +240,6 @@ public class SparkShuffleManager implements ShuffleManager
{
return _sortShuffleManager.getReader(handle, startPartition, endPartition,
context);
}
- private ExecutorService getPusherThread() {
- ExecutorService pusherThread = asyncPushers[pusherIdx.get() %
asyncPushers.length];
- pusherIdx.incrementAndGet();
- return pusherThread;
- }
-
private int executorCores(SparkConf conf) {
if (Utils.isLocalMaster(conf)) {
// SparkContext.numDriverCores is package private.
diff --git
a/client-spark/spark-2/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriterSuiteJ.java
b/client-spark/spark-2/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriterSuiteJ.java
index 74ebab2b9..b56bda1f8 100644
---
a/client-spark/spark-2/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriterSuiteJ.java
+++
b/client-spark/spark-2/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriterSuiteJ.java
@@ -40,7 +40,6 @@ public class SortBasedShuffleWriterSuiteJ extends
CelebornShuffleWriterSuiteBase
context,
conf,
client,
- null,
SendBufferPool.get(4, 30, 60));
}
}
diff --git
a/client-spark/spark-2/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleManagerSuite.scala
b/client-spark/spark-2/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleManagerSuite.scala
index 36876d6bd..cb6c3e0ab 100644
---
a/client-spark/spark-2/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleManagerSuite.scala
+++
b/client-spark/spark-2/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleManagerSuite.scala
@@ -38,7 +38,6 @@ class SparkShuffleManagerSuite extends Logging {
.set(s"spark.${CelebornConf.MASTER_ENDPOINTS.key}", "localhost:9097")
.set(s"spark.${CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key}", "false")
.set("spark.shuffle.service.enabled", "false")
- .set("spark.shuffle.useOldFetchProtocol", "true")
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
.setAppName("test")
val sc = new SparkContext(conf)
@@ -58,7 +57,6 @@ class SparkShuffleManagerSuite extends Logging {
.set(s"spark.${CelebornConf.MASTER_ENDPOINTS.key}", "localhost:9097")
.set(s"spark.${CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key}", "false")
.set("spark.shuffle.service.enabled", "false")
- .set("spark.shuffle.useOldFetchProtocol", "true")
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
.setAppName("test")
val sc = new SparkContext(conf)
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
index 2de03a569..cbb8ec725 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
@@ -18,7 +18,6 @@
package org.apache.spark.shuffle.celeborn;
import java.io.IOException;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.LongAdder;
import scala.Option;
@@ -69,11 +68,8 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
private final int numPartitions;
private final long pushBufferMaxSize;
- // this lock is shared between different SortBasedPushers to synchronize
pushData
- private final Object sharedPushLock = new Object();
- private final boolean pipelined;
- private final SortBasedPusher[] pushers = new SortBasedPusher[2];
- private SortBasedPusher currentPusher;
+
+ private final SortBasedPusher pusher;
private long peakMemoryUsedBytes = 0;
private final OpenByteArrayOutputStream serBuffer;
@@ -101,7 +97,6 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
CelebornConf conf,
ShuffleClient client,
ShuffleWriteMetricsReporter metrics,
- ExecutorService executorService,
SendBufferPool sendBufferPool)
throws IOException {
this.mapId = taskContext.partitionId();
@@ -126,50 +121,23 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
tmpRecords = new long[numPartitions];
pushBufferMaxSize = conf.clientPushBufferMaxSize();
- pipelined = conf.clientPushSortPipelineEnabled();
-
- if (pipelined) {
- for (int i = 0; i < pushers.length; i++) {
- pushers[i] =
- new SortBasedPusher(
- taskContext.taskMemoryManager(),
- shuffleClient,
- taskContext,
- shuffleId,
- mapId,
- taskContext.attemptNumber(),
- taskContext.taskAttemptId(),
- numMappers,
- numPartitions,
- conf,
- writeMetrics::incBytesWritten,
- mapStatusLengths,
- conf.clientPushSortMemoryThreshold() / 2,
- sharedPushLock,
- executorService,
- sendBufferPool);
- }
- currentPusher = pushers[0];
- } else {
- currentPusher =
- new SortBasedPusher(
- taskContext.taskMemoryManager(),
- shuffleClient,
- taskContext,
- shuffleId,
- mapId,
- taskContext.attemptNumber(),
- taskContext.taskAttemptId(),
- numMappers,
- numPartitions,
- conf,
- writeMetrics::incBytesWritten,
- mapStatusLengths,
- conf.clientPushSortMemoryThreshold(),
- sharedPushLock,
- null,
- sendBufferPool);
- }
+
+ pusher =
+ new SortBasedPusher(
+ taskContext.taskMemoryManager(),
+ shuffleClient,
+ taskContext,
+ shuffleId,
+ mapId,
+ taskContext.attemptNumber(),
+ taskContext.taskAttemptId(),
+ numMappers,
+ numPartitions,
+ conf,
+ writeMetrics::incBytesWritten,
+ mapStatusLengths,
+ conf.clientPushSortMemoryThreshold(),
+ sendBufferPool);
}
public SortBasedShuffleWriter(
@@ -178,7 +146,6 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
CelebornConf conf,
ShuffleClient client,
ShuffleWriteMetricsReporter metrics,
- ExecutorService executorService,
SendBufferPool sendBufferPool)
throws IOException {
this(
@@ -189,28 +156,13 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
conf,
client,
metrics,
- executorService,
sendBufferPool);
}
private void updatePeakMemoryUsed() {
- // sorter can be null if this writer is closed
- if (pipelined) {
- for (SortBasedPusher pusher : pushers) {
- if (pusher != null) {
- long mem = pusher.getPeakMemoryUsedBytes();
- if (mem > peakMemoryUsedBytes) {
- peakMemoryUsedBytes = mem;
- }
- }
- }
- } else {
- if (currentPusher != null) {
- long mem = currentPusher.getPeakMemoryUsedBytes();
- if (mem > peakMemoryUsedBytes) {
- peakMemoryUsedBytes = mem;
- }
- }
+ long mem = pusher.getPeakMemoryUsedBytes();
+ if (mem > peakMemoryUsedBytes) {
+ peakMemoryUsedBytes = mem;
}
}
@@ -275,12 +227,12 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
pushGiantRecord(partitionId, giantBuffer, serializedRecordSize);
} else {
boolean success =
- currentPusher.insertRecord(
+ pusher.insertRecord(
row.getBaseObject(), row.getBaseOffset(), rowSize,
partitionId, true);
if (!success) {
- pushAndSwitch();
+ doPush();
success =
- currentPusher.insertRecord(
+ pusher.insertRecord(
row.getBaseObject(), row.getBaseOffset(), rowSize,
partitionId, true);
if (!success) {
throw new CelebornIOException("Unable to push after switching
pusher!");
@@ -291,15 +243,9 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
}
}
- private void pushAndSwitch() throws IOException {
+ private void doPush() throws IOException {
long start = System.nanoTime();
- if (pipelined) {
- currentPusher.triggerPush();
- currentPusher = (currentPusher == pushers[0] ? pushers[1] : pushers[0]);
- currentPusher.waitPushFinish();
- } else {
- currentPusher.pushData();
- }
+ pusher.pushData();
writeMetrics.incWriteTime(System.nanoTime() - start);
}
@@ -322,16 +268,16 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
pushGiantRecord(partitionId, serBuffer.getBuf(), serializedRecordSize);
} else {
boolean success =
- currentPusher.insertRecord(
+ pusher.insertRecord(
serBuffer.getBuf(),
Platform.BYTE_ARRAY_OFFSET,
serializedRecordSize,
partitionId,
false);
if (!success) {
- pushAndSwitch();
+ doPush();
success =
- currentPusher.insertRecord(
+ pusher.insertRecord(
serBuffer.getBuf(),
Platform.BYTE_ARRAY_OFFSET,
serializedRecordSize,
@@ -364,23 +310,10 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
}
private void close() throws IOException {
- if (pipelined) {
- logger.info(
- "Memory used {}", Utils.bytesToString((pushers[0].getUsed() +
pushers[1].getUsed())));
- } else {
- logger.info("Memory used {}",
Utils.bytesToString(currentPusher.getUsed()));
- }
+ logger.info("Memory used {}", Utils.bytesToString(pusher.getUsed()));
long pushStartTime = System.nanoTime();
- if (pipelined) {
- for (SortBasedPusher pusher : pushers) {
- pusher.waitPushFinish();
- pusher.pushData();
- pusher.close();
- }
- } else {
- currentPusher.pushData();
- currentPusher.close();
- }
+ pusher.pushData();
+ pusher.close();
shuffleClient.pushMergedData(shuffleId, mapId,
taskContext.attemptNumber());
writeMetrics.incWriteTime(System.nanoTime() - pushStartTime);
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index 450abe7d2..a9b714fb6 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -19,8 +19,6 @@ package org.apache.spark.shuffle.celeborn;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.spark.*;
import org.apache.spark.launcher.SparkLauncher;
@@ -36,7 +34,6 @@ import org.apache.celeborn.client.LifecycleManager;
import org.apache.celeborn.client.ShuffleClient;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.protocol.ShuffleMode;
-import org.apache.celeborn.common.util.ThreadUtils;
import org.apache.celeborn.reflect.DynMethods;
/**
@@ -85,9 +82,6 @@ public class SparkShuffleManager implements ShuffleManager {
ConcurrentHashMap.newKeySet();
private final CelebornShuffleFallbackPolicyRunner fallbackPolicyRunner;
- private final ExecutorService[] asyncPushers;
- private AtomicInteger pusherIdx = new AtomicInteger(0);
-
private long sendBufferPoolCheckInterval;
private long sendBufferPoolExpireTimeout;
@@ -105,15 +99,6 @@ public class SparkShuffleManager implements ShuffleManager {
this.celebornConf = SparkUtils.fromSparkConf(conf);
this.cores = executorCores(conf);
this.fallbackPolicyRunner = new
CelebornShuffleFallbackPolicyRunner(celebornConf);
- if (ShuffleMode.SORT.equals(celebornConf.shuffleWriterMode())
- && celebornConf.clientPushSortPipelineEnabled()) {
- asyncPushers = new ExecutorService[cores];
- for (int i = 0; i < asyncPushers.length; i++) {
- asyncPushers[i] =
ThreadUtils.newDaemonSingleThreadExecutor("async-pusher-" + i);
- }
- } else {
- asyncPushers = null;
- }
this.sendBufferPoolCheckInterval =
celebornConf.clientPushSendBufferPoolExpireCheckInterval();
this.sendBufferPoolExpireTimeout =
celebornConf.clientPushSendBufferPoolExpireTimeout();
}
@@ -248,8 +233,6 @@ public class SparkShuffleManager implements ShuffleManager {
shuffleIdTracker.track(h.shuffleId(), shuffleId);
if (ShuffleMode.SORT.equals(celebornConf.shuffleWriterMode())) {
- ExecutorService pushThread =
- celebornConf.clientPushSortPipelineEnabled() ? getPusherThread()
: null;
return new SortBasedShuffleWriter<>(
shuffleId,
h.dependency(),
@@ -258,7 +241,6 @@ public class SparkShuffleManager implements ShuffleManager {
celebornConf,
shuffleClient,
metrics,
- pushThread,
SendBufferPool.get(cores, sendBufferPoolCheckInterval,
sendBufferPoolExpireTimeout));
} else if (ShuffleMode.HASH.equals(celebornConf.shuffleWriterMode())) {
SendBufferPool pool =
@@ -389,12 +371,6 @@ public class SparkShuffleManager implements ShuffleManager
{
}
}
- private ExecutorService getPusherThread() {
- ExecutorService pusherThread = asyncPushers[pusherIdx.get() %
asyncPushers.length];
- pusherIdx.incrementAndGet();
- return pusherThread;
- }
-
private int executorCores(SparkConf conf) {
if (Utils.isLocalMaster(conf)) {
// SparkContext.numDriverCores is package private.
diff --git
a/client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriterSuiteJ.java
b/client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriterSuiteJ.java
index 2b1ea681b..1764f727b 100644
---
a/client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriterSuiteJ.java
+++
b/client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriterSuiteJ.java
@@ -36,6 +36,6 @@ public class SortBasedShuffleWriterSuiteJ extends
CelebornShuffleWriterSuiteBase
ShuffleWriteMetricsReporter metrics)
throws IOException {
return new SortBasedShuffleWriter<Integer, String, String>(
- handle, context, conf, client, metrics, null, SendBufferPool.get(4,
30, 60));
+ handle, context, conf, client, metrics, SendBufferPool.get(4, 30, 60));
}
}
diff --git
a/common/src/main/java/org/apache/celeborn/common/util/JavaUtils.java
b/common/src/main/java/org/apache/celeborn/common/util/JavaUtils.java
index 2580ac300..df032a5cb 100644
--- a/common/src/main/java/org/apache/celeborn/common/util/JavaUtils.java
+++ b/common/src/main/java/org/apache/celeborn/common/util/JavaUtils.java
@@ -49,12 +49,6 @@ import org.apache.celeborn.common.network.util.ByteUnit;
public class JavaUtils {
private static final Logger logger =
LoggerFactory.getLogger(JavaUtils.class);
- /**
- * Define a default value for driver memory here since this value is
referenced across the code
- * base and nearly all files already use Utils.scala
- */
- public static final long DEFAULT_DRIVER_MEM_MB = 1024;
-
/** Closes the given object, ignoring IOExceptions. */
public static void closeQuietly(Closeable closeable) {
try {
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index de79714c7..e2d34c18d 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -824,7 +824,6 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def clientPushReviveInterval: Long = get(CLIENT_PUSH_REVIVE_INTERVAL)
def clientPushReviveBatchSize: Int = get(CLIENT_PUSH_REVIVE_BATCHSIZE)
def clientPushSortMemoryThreshold: Long =
get(CLIENT_PUSH_SORT_MEMORY_THRESHOLD)
- def clientPushSortPipelineEnabled: Boolean =
get(CLIENT_PUSH_SORT_PIPELINE_ENABLED)
def clientPushSortRandomizePartitionIdEnabled: Boolean =
get(CLIENT_PUSH_SORT_RANDOMIZE_PARTITION_ENABLED)
def clientPushRetryThreads: Int = get(CLIENT_PUSH_RETRY_THREADS)
@@ -3810,24 +3809,11 @@ object CelebornConf extends Logging {
.longConf
.createWithDefault(Int.MaxValue)
- val CLIENT_PUSH_SORT_PIPELINE_ENABLED: ConfigEntry[Boolean] =
- buildConf("celeborn.client.spark.push.sort.pipeline.enabled")
- .withAlternative("celeborn.push.sort.pipeline.enabled")
- .categories("client")
- .doc("Whether to enable pipelining for sort based shuffle writer. If
true, double buffering" +
- " will be used to pipeline push")
- .version("0.3.0")
- .booleanConf
- .createWithDefault(false)
-
val CLIENT_PUSH_SORT_MEMORY_THRESHOLD: ConfigEntry[Long] =
buildConf("celeborn.client.spark.push.sort.memory.threshold")
.withAlternative("celeborn.push.sortMemory.threshold")
.categories("client")
- .doc("When SortBasedPusher use memory over the threshold, will trigger
push data. If the" +
- s" pipeline push feature is enabled
(`${CLIENT_PUSH_SORT_PIPELINE_ENABLED.key}=true`)," +
- " the SortBasedPusher will trigger a data push when the memory usage
exceeds half of the" +
- " threshold(by default, 32m).")
+ .doc("When SortBasedPusher use memory over the threshold, will trigger
push data.")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("64m")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 7c171ed3f..76a3aefbb 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -102,8 +102,7 @@ license: |
| celeborn.client.shuffle.register.filterExcludedWorker.enabled | false |
Whether to filter excluded worker when register shuffle. | 0.4.0 |
| celeborn.client.slot.assign.maxWorkers | 10000 | Max workers that slots of
one shuffle can be allocated on. Will choose the smaller positive one from
Master side and Client side, see `celeborn.master.slot.assign.maxWorkers`. |
0.3.1 |
| celeborn.client.spark.fetch.throwsFetchFailure | false | client throws
FetchFailedException instead of CelebornIOException | 0.4.0 |
-| celeborn.client.spark.push.sort.memory.threshold | 64m | When
SortBasedPusher use memory over the threshold, will trigger push data. If the
pipeline push feature is enabled
(`celeborn.client.spark.push.sort.pipeline.enabled=true`), the SortBasedPusher
will trigger a data push when the memory usage exceeds half of the threshold(by
default, 32m). | 0.3.0 |
-| celeborn.client.spark.push.sort.pipeline.enabled | false | Whether to enable
pipelining for sort based shuffle writer. If true, double buffering will be
used to pipeline push | 0.3.0 |
+| celeborn.client.spark.push.sort.memory.threshold | 64m | When
SortBasedPusher use memory over the threshold, will trigger push data. | 0.3.0
|
| celeborn.client.spark.push.unsafeRow.fastWrite.enabled | true | This is
Celeborn's optimization on UnsafeRow for Spark and it's true by default. If you
have changed UnsafeRow's memory layout set this to false. | 0.2.2 |
| celeborn.client.spark.shuffle.forceFallback.enabled | false | Whether force
fallback shuffle to Spark's default. | 0.3.0 |
| celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold |
2147483647 | Celeborn will only accept shuffle of partition number lower than
this configuration value. | 0.3.0 |
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornPipelineSortSuite.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornPipelineSortSuite.scala
deleted file mode 100644
index 5288ac7ab..000000000
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornPipelineSortSuite.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.celeborn.tests.spark
-
-import org.apache.spark.SparkConf
-import org.apache.spark.sql.SparkSession
-import org.scalatest.BeforeAndAfterEach
-import org.scalatest.funsuite.AnyFunSuite
-
-import org.apache.celeborn.client.ShuffleClient
-import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.protocol.ShuffleMode
-
-class CelebornPipelineSortSuite extends AnyFunSuite
- with SparkTestBase
- with BeforeAndAfterEach {
-
- override def beforeEach(): Unit = {
- ShuffleClient.reset()
- }
-
- override def afterEach(): Unit = {
- System.gc()
- }
-
- test("celeborn spark integration test - pipeline sort") {
- val sparkConf = new
SparkConf().setAppName("celeborn-demo").setMaster("local[2]")
- .set(s"spark.${CelebornConf.CLIENT_PUSH_SORT_PIPELINE_ENABLED.key}",
"true")
-
.set(s"spark.${CelebornConf.CLIENT_PUSH_SORT_RANDOMIZE_PARTITION_ENABLED.key}",
"true")
- val ss = SparkSession.builder()
- .config(updateSparkConf(sparkConf, ShuffleMode.SORT))
- .getOrCreate()
- val value = Range(1, 10000).mkString(",")
- val tuples = ss.sparkContext.parallelize(1 to 10000, 2)
- .map { i => (i, value) }.groupByKey(16).collect()
-
- // verify result
- assert(tuples.length == 10000)
- for (elem <- tuples) {
- assert(elem._2.mkString(",").equals(value))
- }
-
- ss.stop()
- }
-}
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornNonPipelineSortSuite.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornSortSuite.scala
similarity index 94%
rename from
tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornNonPipelineSortSuite.scala
rename to
tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornSortSuite.scala
index 0da01ee58..e4b2dd574 100644
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornNonPipelineSortSuite.scala
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornSortSuite.scala
@@ -26,7 +26,7 @@ import org.apache.celeborn.client.ShuffleClient
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.protocol.ShuffleMode
-class CelebornNonPipelineSortSuite extends AnyFunSuite
+class CelebornSortSuite extends AnyFunSuite
with SparkTestBase
with BeforeAndAfterEach {
@@ -40,7 +40,6 @@ class CelebornNonPipelineSortSuite extends AnyFunSuite
test("celeborn spark integration test - non pipeline sort") {
val sparkConf = new
SparkConf().setAppName("celeborn-demo").setMaster("local[2]")
- .set(s"spark.${CelebornConf.CLIENT_PUSH_SORT_PIPELINE_ENABLED.key}",
"false")
.set(s"spark.${CelebornConf.CLIENT_PUSH_SORT_RANDOMIZE_PARTITION_ENABLED.key}",
"false")
val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
val combineResult = combine(sparkSession)