This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new 35ec75dae [CELEBORN-754][IMPORTANT] Provide a new SparkShuffleManager
to replace RssShuffleManager in the future
35ec75dae is described below
commit 35ec75dae45c7d2bf15ab6d561a7202e803c23a3
Author: Angerszhuuuu <[email protected]>
AuthorDate: Fri Jun 30 17:27:33 2023 +0800
[CELEBORN-754][IMPORTANT] Provide a new SparkShuffleManager to replace
RssShuffleManager in the future
### What changes were proposed in this pull request?
Provide a new SparkShuffleManager to replace RssShuffleManager in the future
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #1667 from AngersZhuuuu/CELEBORN-754.
Lead-authored-by: Angerszhuuuu <[email protected]>
Co-authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
(cherry picked from commit 5c7ecb83029e0eae0c1637496fa6fcb40b148243)
Signed-off-by: Cheng Pan <[email protected]>
---
README.md | 5 +-
.../Celeborn_Dynamic_Allocation_spark2.patch | 2 +-
.../spark/shuffle/celeborn/RssShuffleManager.java | 215 +--------------
...huffleManager.java => SparkShuffleManager.java} | 6 +-
.../celeborn/CelebornShuffleManagerSuite.scala} | 12 +-
.../spark/shuffle/celeborn/RssShuffleManager.java | 297 +--------------------
...huffleManager.java => SparkShuffleManager.java} | 6 +-
.../celeborn/CelebornShuffleManagerSuite.scala} | 12 +-
.../celeborn/common/protocol/RpcNameConstants.java | 2 +-
docs/deploy.md | 2 +-
docs/migration.md | 3 +
.../celeborn/tests/spark/PushDataTimeoutTest.scala | 4 +-
.../tests/spark/ShuffleFallbackSuite.scala | 2 +-
.../celeborn/tests/spark/SkewJoinSuite.scala | 2 +-
.../celeborn/tests/spark/SparkTestBase.scala | 4 +-
toolkit/scripts/genConfs.py | 2 +-
16 files changed, 46 insertions(+), 530 deletions(-)
diff --git a/README.md b/README.md
index 764c1b33a..9f368e2d0 100644
--- a/README.md
+++ b/README.md
@@ -233,7 +233,10 @@ Copy $CELEBORN_HOME/spark/*.jar to $SPARK_HOME/jars/
#### Spark Configuration
To use Celeborn,the following spark configurations should be added.
```properties
-spark.shuffle.manager org.apache.spark.shuffle.celeborn.RssShuffleManager
+# Shuffle manager class name changed in 0.3.0:
+# before 0.3.0: org.apache.spark.shuffle.celeborn.RssShuffleManager
+# since 0.3.0: org.apache.spark.shuffle.celeborn.SparkShuffleManager
+spark.shuffle.manager org.apache.spark.shuffle.celeborn.SparkShuffleManager
# must use kryo serializer because java serializer do not support relocation
spark.serializer org.apache.spark.serializer.KryoSerializer
diff --git a/assets/spark-patch/Celeborn_Dynamic_Allocation_spark2.patch
b/assets/spark-patch/Celeborn_Dynamic_Allocation_spark2.patch
index 325f1211f..dc5ce50e3 100644
--- a/assets/spark-patch/Celeborn_Dynamic_Allocation_spark2.patch
+++ b/assets/spark-patch/Celeborn_Dynamic_Allocation_spark2.patch
@@ -43,7 +43,7 @@ diff --git
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scal
+ !Utils.isCelebornEnabled(conf) && !testing) {
+ throw new SparkException("Dynamic allocation of executors requires the
external or remote " +
+ "shuffle service. You may enable this through
spark.shuffle.service.enabled or " +
-+ "set
spark.shuffle.manager=org.apache.spark.shuffle.celeborn.RssShuffleManager.")
++ "set
spark.shuffle.manager=org.apache.spark.shuffle.celeborn.SparkShuffleManager.")
}
if (tasksPerExecutorForFullParallelism == 0) {
throw new SparkException("spark.executor.cores must not be <
spark.task.cpus.")
diff --git
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
index e4515edcc..2df1faaf7 100644
---
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
+++
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
@@ -17,220 +17,11 @@
package org.apache.spark.shuffle.celeborn;
-import java.io.IOException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.spark.SparkConf;
-import scala.Int;
-
-import org.apache.spark.*;
-import org.apache.spark.launcher.SparkLauncher;
-import org.apache.spark.shuffle.*;
-import org.apache.spark.shuffle.sort.SortShuffleManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.celeborn.client.LifecycleManager;
-import org.apache.celeborn.client.ShuffleClient;
-import org.apache.celeborn.common.CelebornConf;
-import org.apache.celeborn.common.protocol.ShuffleMode;
-import org.apache.celeborn.common.util.ThreadUtils;
-
-public class RssShuffleManager implements ShuffleManager {
-
- private static final Logger logger =
LoggerFactory.getLogger(RssShuffleManager.class);
-
- private static final String sortShuffleManagerName =
- "org.apache.spark.shuffle.sort.SortShuffleManager";
-
- private final SparkConf conf;
- private final CelebornConf celebornConf;
- private final int cores;
- // either be "{appId}_{appAttemptId}" or "{appId}"
- private String appUniqueId;
-
- private LifecycleManager lifecycleManager;
- private ShuffleClient rssShuffleClient;
- private volatile SortShuffleManager _sortShuffleManager;
- private final ConcurrentHashMap.KeySetView<Integer, Boolean> sortShuffleIds =
- ConcurrentHashMap.newKeySet();
- private final RssShuffleFallbackPolicyRunner fallbackPolicyRunner;
-
- private final ExecutorService[] asyncPushers;
- private AtomicInteger pusherIdx = new AtomicInteger(0);
+public class RssShuffleManager extends SparkShuffleManager {
public RssShuffleManager(SparkConf conf) {
- this.conf = conf;
- this.celebornConf = SparkUtils.fromSparkConf(conf);
- this.cores = conf.getInt(SparkLauncher.EXECUTOR_CORES, 1);
- this.fallbackPolicyRunner = new
RssShuffleFallbackPolicyRunner(celebornConf);
- if (ShuffleMode.SORT.equals(celebornConf.shuffleWriterMode())
- && celebornConf.clientPushSortPipelineEnabled()) {
- asyncPushers = new ExecutorService[cores];
- for (int i = 0; i < asyncPushers.length; i++) {
- asyncPushers[i] =
ThreadUtils.newDaemonSingleThreadExecutor("async-pusher-" + i);
- }
- } else {
- asyncPushers = null;
- }
- }
-
- private boolean isDriver() {
- return "driver".equals(SparkEnv.get().executorId());
- }
-
- private SortShuffleManager sortShuffleManager() {
- if (_sortShuffleManager == null) {
- synchronized (this) {
- if (_sortShuffleManager == null) {
- _sortShuffleManager =
- SparkUtils.instantiateClass(sortShuffleManagerName, conf,
isDriver());
- }
- }
- }
- return _sortShuffleManager;
- }
-
- private void initializeLifecycleManager(String appId) {
- // Only create LifecycleManager singleton in Driver. When register shuffle
multiple times, we
- // need to ensure that LifecycleManager will only be created once.
Parallelism needs to be
- // considered in this place, because if there is one RDD that depends on
multiple RDDs
- // at the same time, it may bring parallel `register shuffle`, such as
Join in Sql.
- if (isDriver() && lifecycleManager == null) {
- synchronized (this) {
- if (lifecycleManager == null) {
- lifecycleManager = new LifecycleManager(appId, celebornConf);
- rssShuffleClient =
- ShuffleClient.get(
- appUniqueId,
- lifecycleManager.getRssMetaServiceHost(),
- lifecycleManager.getRssMetaServicePort(),
- celebornConf,
- lifecycleManager.getUserIdentifier());
- }
- }
- }
- }
-
- @Override
- public <K, V, C> ShuffleHandle registerShuffle(
- int shuffleId, int numMaps, ShuffleDependency<K, V, C> dependency) {
- // Note: generate app unique id at driver side, make sure
dependency.rdd.context
- // is the same SparkContext among different shuffleIds.
- // This method may be called many times.
- appUniqueId = SparkUtils.appUniqueId(dependency.rdd().context());
- initializeLifecycleManager(appUniqueId);
-
- if (fallbackPolicyRunner.applyAllFallbackPolicy(
- lifecycleManager, dependency.partitioner().numPartitions())) {
- logger.warn("Fallback to SortShuffleManager!");
- sortShuffleIds.add(shuffleId);
- return sortShuffleManager().registerShuffle(shuffleId, numMaps,
dependency);
- } else {
- return new RssShuffleHandle<>(
- appUniqueId,
- lifecycleManager.getRssMetaServiceHost(),
- lifecycleManager.getRssMetaServicePort(),
- lifecycleManager.getUserIdentifier(),
- shuffleId,
- numMaps,
- dependency);
- }
- }
-
- @Override
- public boolean unregisterShuffle(int shuffleId) {
- if (sortShuffleIds.contains(shuffleId)) {
- return sortShuffleManager().unregisterShuffle(shuffleId);
- }
- if (appUniqueId == null) {
- return true;
- }
- if (rssShuffleClient == null) {
- return false;
- }
- return rssShuffleClient.unregisterShuffle(shuffleId, isDriver());
- }
-
- @Override
- public ShuffleBlockResolver shuffleBlockResolver() {
- return sortShuffleManager().shuffleBlockResolver();
- }
-
- @Override
- public void stop() {
- if (rssShuffleClient != null) {
- rssShuffleClient.shutdown();
- }
- if (lifecycleManager != null) {
- lifecycleManager.stop();
- }
- if (sortShuffleManager() != null) {
- sortShuffleManager().stop();
- }
- }
-
- @Override
- public <K, V> ShuffleWriter<K, V> getWriter(
- ShuffleHandle handle, int mapId, TaskContext context) {
- try {
- if (handle instanceof RssShuffleHandle) {
- @SuppressWarnings("unchecked")
- RssShuffleHandle<K, V, ?> h = ((RssShuffleHandle<K, V, ?>) handle);
- ShuffleClient client =
- ShuffleClient.get(
- appUniqueId,
- h.rssMetaServiceHost(),
- h.rssMetaServicePort(),
- celebornConf,
- h.userIdentifier());
- if (ShuffleMode.SORT.equals(celebornConf.shuffleWriterMode())) {
- ExecutorService pushThread =
- celebornConf.clientPushSortPipelineEnabled() ? getPusherThread()
: null;
- return new SortBasedShuffleWriter<>(
- h.dependency(),
- h.appUniqueId(),
- h.numMaps(),
- context,
- celebornConf,
- client,
- pushThread);
- } else if (ShuffleMode.HASH.equals(celebornConf.shuffleWriterMode())) {
- return new HashBasedShuffleWriter<>(
- h, mapId, context, celebornConf, client,
SendBufferPool.get(cores));
- } else {
- throw new UnsupportedOperationException(
- "Unrecognized shuffle write mode!" +
celebornConf.shuffleWriterMode());
- }
- } else {
- sortShuffleIds.add(handle.shuffleId());
- return sortShuffleManager().getWriter(handle, mapId, context);
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- public <K, C> ShuffleReader<K, C> getReader(
- ShuffleHandle handle, int startPartition, int endPartition, TaskContext
context) {
- if (handle instanceof RssShuffleHandle) {
- @SuppressWarnings("unchecked")
- RssShuffleHandle<K, ?, C> h = (RssShuffleHandle<K, ?, C>) handle;
- return new RssShuffleReader<>(
- h, startPartition, endPartition, 0, Int.MaxValue(), context,
celebornConf);
- }
- return _sortShuffleManager.getReader(handle, startPartition, endPartition,
context);
- }
-
- private ExecutorService getPusherThread() {
- ExecutorService pusherThread = asyncPushers[pusherIdx.get() %
asyncPushers.length];
- pusherIdx.incrementAndGet();
- return pusherThread;
- }
-
- // for testing
- public LifecycleManager getLifecycleManager() {
- return this.lifecycleManager;
+ super(conf);
}
}
diff --git
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
similarity index 97%
copy from
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
copy to
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index e4515edcc..2e3776f7b 100644
---
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
+++
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -37,9 +37,9 @@ import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.protocol.ShuffleMode;
import org.apache.celeborn.common.util.ThreadUtils;
-public class RssShuffleManager implements ShuffleManager {
+public class SparkShuffleManager implements ShuffleManager {
- private static final Logger logger =
LoggerFactory.getLogger(RssShuffleManager.class);
+ private static final Logger logger =
LoggerFactory.getLogger(SparkShuffleManager.class);
private static final String sortShuffleManagerName =
"org.apache.spark.shuffle.sort.SortShuffleManager";
@@ -60,7 +60,7 @@ public class RssShuffleManager implements ShuffleManager {
private final ExecutorService[] asyncPushers;
private AtomicInteger pusherIdx = new AtomicInteger(0);
- public RssShuffleManager(SparkConf conf) {
+ public SparkShuffleManager(SparkConf conf) {
this.conf = conf;
this.celebornConf = SparkUtils.fromSparkConf(conf);
this.cores = conf.getInt(SparkLauncher.EXECUTOR_CORES, 1);
diff --git
a/client-spark/spark-3/src/test/scala/org/apache/spark/shuffle/celeborn/RssShuffleManagerSuite.scala
b/client-spark/spark-2/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleManagerSuite.scala
similarity index 89%
rename from
client-spark/spark-3/src/test/scala/org/apache/spark/shuffle/celeborn/RssShuffleManagerSuite.scala
rename to
client-spark/spark-2/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleManagerSuite.scala
index 54725730b..c1cbdd0aa 100644
---
a/client-spark/spark-3/src/test/scala/org/apache/spark/shuffle/celeborn/RssShuffleManagerSuite.scala
+++
b/client-spark/spark-2/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleManagerSuite.scala
@@ -26,11 +26,14 @@ import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
@RunWith(classOf[JUnit4])
-class RssShuffleManagerSuite extends Logging {
+class SparkShuffleManagerSuite extends Logging {
+
@junit.Test
def testFallBack(): Unit = {
val conf = new SparkConf().setIfMissing("spark.master", "local")
- .setIfMissing("spark.shuffle.manager",
"org.apache.spark.shuffle.celeborn.RssShuffleManager")
+ .setIfMissing(
+ "spark.shuffle.manager",
+ "org.apache.spark.shuffle.celeborn.SparkShuffleManager")
.set(s"spark.${CelebornConf.MASTER_ENDPOINTS.key}", "localhost:9097")
.set(s"spark.${CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key}", "false")
.set("spark.shuffle.service.enabled", "false")
@@ -48,7 +51,9 @@ class RssShuffleManagerSuite extends Logging {
@junit.Test
def testClusterNotAvailable(): Unit = {
val conf = new SparkConf().setIfMissing("spark.master", "local")
- .setIfMissing("spark.shuffle.manager",
"org.apache.spark.shuffle.celeborn.RssShuffleManager")
+ .setIfMissing(
+ "spark.shuffle.manager",
+ "org.apache.spark.shuffle.celeborn.SparkShuffleManager")
.set(s"spark.${CelebornConf.MASTER_ENDPOINTS.key}", "localhost:9097")
.set(s"spark.${CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key}", "false")
.set("spark.shuffle.service.enabled", "false")
@@ -62,4 +67,5 @@ class RssShuffleManagerSuite extends Logging {
// scalastyle:on println
sc.stop()
}
+
}
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
index dd597cc1a..2df1faaf7 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
@@ -17,302 +17,11 @@
package org.apache.spark.shuffle.celeborn;
-import java.io.IOException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.spark.SparkConf;
-import org.apache.spark.*;
-import org.apache.spark.launcher.SparkLauncher;
-import org.apache.spark.shuffle.*;
-import org.apache.spark.shuffle.sort.SortShuffleManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.celeborn.client.LifecycleManager;
-import org.apache.celeborn.client.ShuffleClient;
-import org.apache.celeborn.common.CelebornConf;
-import org.apache.celeborn.common.protocol.ShuffleMode;
-import org.apache.celeborn.common.util.ThreadUtils;
-
-public class RssShuffleManager implements ShuffleManager {
-
- private static final Logger logger =
LoggerFactory.getLogger(RssShuffleManager.class);
-
- private static final String sortShuffleManagerName =
- "org.apache.spark.shuffle.sort.SortShuffleManager";
-
- private final SparkConf conf;
- private final CelebornConf celebornConf;
- private final int cores;
- // either be "{appId}_{appAttemptId}" or "{appId}"
- private String appUniqueId;
-
- private LifecycleManager lifecycleManager;
- private ShuffleClient rssShuffleClient;
- private volatile SortShuffleManager _sortShuffleManager;
- private final ConcurrentHashMap.KeySetView<Integer, Boolean> sortShuffleIds =
- ConcurrentHashMap.newKeySet();
- private final RssShuffleFallbackPolicyRunner fallbackPolicyRunner;
-
- private final ExecutorService[] asyncPushers;
- private AtomicInteger pusherIdx = new AtomicInteger(0);
+public class RssShuffleManager extends SparkShuffleManager {
public RssShuffleManager(SparkConf conf) {
- this.conf = conf;
- this.celebornConf = SparkUtils.fromSparkConf(conf);
- this.cores = conf.getInt(SparkLauncher.EXECUTOR_CORES, 1);
- this.fallbackPolicyRunner = new
RssShuffleFallbackPolicyRunner(celebornConf);
- if (ShuffleMode.SORT.equals(celebornConf.shuffleWriterMode())
- && celebornConf.clientPushSortPipelineEnabled()) {
- asyncPushers = new ExecutorService[cores];
- for (int i = 0; i < asyncPushers.length; i++) {
- asyncPushers[i] =
ThreadUtils.newDaemonSingleThreadExecutor("async-pusher-" + i);
- }
- } else {
- asyncPushers = null;
- }
- }
-
- private boolean isDriver() {
- return "driver".equals(SparkEnv.get().executorId());
- }
-
- private SortShuffleManager sortShuffleManager() {
- if (_sortShuffleManager == null) {
- synchronized (this) {
- if (_sortShuffleManager == null) {
- _sortShuffleManager =
- SparkUtils.instantiateClass(sortShuffleManagerName, conf,
isDriver());
- }
- }
- }
- return _sortShuffleManager;
- }
-
- private void initializeLifecycleManager() {
- // Only create LifecycleManager singleton in Driver. When register shuffle
multiple times, we
- // need to ensure that LifecycleManager will only be created once.
Parallelism needs to be
- // considered in this place, because if there is one RDD that depends on
multiple RDDs
- // at the same time, it may bring parallel `register shuffle`, such as
Join in Sql.
- if (isDriver() && lifecycleManager == null) {
- synchronized (this) {
- if (lifecycleManager == null) {
- lifecycleManager = new LifecycleManager(appUniqueId, celebornConf);
- rssShuffleClient =
- ShuffleClient.get(
- appUniqueId,
- lifecycleManager.getRssMetaServiceHost(),
- lifecycleManager.getRssMetaServicePort(),
- celebornConf,
- lifecycleManager.getUserIdentifier());
- }
- }
- }
- }
-
- @Override
- public <K, V, C> ShuffleHandle registerShuffle(
- int shuffleId, ShuffleDependency<K, V, C> dependency) {
- // Note: generate app unique id at driver side, make sure
dependency.rdd.context
- // is the same SparkContext among different shuffleIds.
- // This method may be called many times.
- appUniqueId = SparkUtils.appUniqueId(dependency.rdd().context());
- initializeLifecycleManager();
-
- if (fallbackPolicyRunner.applyAllFallbackPolicy(
- lifecycleManager, dependency.partitioner().numPartitions())) {
- logger.warn("Fallback to SortShuffleManager!");
- sortShuffleIds.add(shuffleId);
- return sortShuffleManager().registerShuffle(shuffleId, dependency);
- } else {
- return new RssShuffleHandle<>(
- appUniqueId,
- lifecycleManager.getRssMetaServiceHost(),
- lifecycleManager.getRssMetaServicePort(),
- lifecycleManager.getUserIdentifier(),
- shuffleId,
- dependency.rdd().getNumPartitions(),
- dependency);
- }
- }
-
- @Override
- public boolean unregisterShuffle(int shuffleId) {
- if (sortShuffleIds.contains(shuffleId)) {
- return sortShuffleManager().unregisterShuffle(shuffleId);
- }
- if (appUniqueId == null) {
- return true;
- }
- if (rssShuffleClient == null) {
- return false;
- }
- return rssShuffleClient.unregisterShuffle(shuffleId, isDriver());
- }
-
- @Override
- public ShuffleBlockResolver shuffleBlockResolver() {
- return sortShuffleManager().shuffleBlockResolver();
- }
-
- @Override
- public void stop() {
- if (rssShuffleClient != null) {
- rssShuffleClient.shutdown();
- ShuffleClient.reset();
- rssShuffleClient = null;
- }
- if (lifecycleManager != null) {
- lifecycleManager.stop();
- lifecycleManager = null;
- }
- if (sortShuffleManager() != null) {
- sortShuffleManager().stop();
- _sortShuffleManager = null;
- }
- }
-
- @Override
- public <K, V> ShuffleWriter<K, V> getWriter(
- ShuffleHandle handle, long mapId, TaskContext context,
ShuffleWriteMetricsReporter metrics) {
- try {
- if (handle instanceof RssShuffleHandle) {
- @SuppressWarnings("unchecked")
- RssShuffleHandle<K, V, ?> h = ((RssShuffleHandle<K, V, ?>) handle);
- ShuffleClient client =
- ShuffleClient.get(
- h.appUniqueId(),
- h.rssMetaServiceHost(),
- h.rssMetaServicePort(),
- celebornConf,
- h.userIdentifier());
- if (ShuffleMode.SORT.equals(celebornConf.shuffleWriterMode())) {
- ExecutorService pushThread =
- celebornConf.clientPushSortPipelineEnabled() ? getPusherThread()
: null;
- return new SortBasedShuffleWriter<>(
- h.dependency(),
- h.appUniqueId(),
- h.numMappers(),
- context,
- celebornConf,
- client,
- metrics,
- pushThread);
- } else if (ShuffleMode.HASH.equals(celebornConf.shuffleWriterMode())) {
- return new HashBasedShuffleWriter<>(
- h, context, celebornConf, client, metrics,
SendBufferPool.get(cores));
- } else {
- throw new UnsupportedOperationException(
- "Unrecognized shuffle write mode!" +
celebornConf.shuffleWriterMode());
- }
- } else {
- sortShuffleIds.add(handle.shuffleId());
- return sortShuffleManager().getWriter(handle, mapId, context, metrics);
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- // Added in SPARK-32055, for Spark 3.1 and above
- public <K, C> ShuffleReader<K, C> getReader(
- ShuffleHandle handle,
- int startMapIndex,
- int endMapIndex,
- int startPartition,
- int endPartition,
- TaskContext context,
- ShuffleReadMetricsReporter metrics) {
- if (handle instanceof RssShuffleHandle) {
- @SuppressWarnings("unchecked")
- RssShuffleHandle<K, ?, C> h = (RssShuffleHandle<K, ?, C>) handle;
- return new RssShuffleReader<>(
- h,
- startPartition,
- endPartition,
- startMapIndex,
- endMapIndex,
- context,
- celebornConf,
- metrics);
- }
- return SparkUtils.getReader(
- sortShuffleManager(),
- handle,
- startMapIndex,
- endMapIndex,
- startPartition,
- endPartition,
- context,
- metrics);
- }
-
- // Marked as final in SPARK-32055, reserved for Spark 3.0
- public <K, C> ShuffleReader<K, C> getReader(
- ShuffleHandle handle,
- int startPartition,
- int endPartition,
- TaskContext context,
- ShuffleReadMetricsReporter metrics) {
- if (handle instanceof RssShuffleHandle) {
- @SuppressWarnings("unchecked")
- RssShuffleHandle<K, ?, C> h = (RssShuffleHandle<K, ?, C>) handle;
- return new RssShuffleReader<>(
- h, startPartition, endPartition, 0, Integer.MAX_VALUE, context,
celebornConf, metrics);
- }
- return SparkUtils.getReader(
- sortShuffleManager(),
- handle,
- 0,
- Integer.MAX_VALUE,
- startPartition,
- endPartition,
- context,
- metrics);
- }
-
- // Renamed to getReader in SPARK-32055, reserved for Spark 3.0
- public <K, C> ShuffleReader<K, C> getReaderForRange(
- ShuffleHandle handle,
- int startMapIndex,
- int endMapIndex,
- int startPartition,
- int endPartition,
- TaskContext context,
- ShuffleReadMetricsReporter metrics) {
- if (handle instanceof RssShuffleHandle) {
- @SuppressWarnings("unchecked")
- RssShuffleHandle<K, ?, C> h = (RssShuffleHandle<K, ?, C>) handle;
- return new RssShuffleReader<>(
- h,
- startPartition,
- endPartition,
- startMapIndex,
- endMapIndex,
- context,
- celebornConf,
- metrics);
- }
- return SparkUtils.getReader(
- sortShuffleManager(),
- handle,
- startMapIndex,
- endMapIndex,
- startPartition,
- endPartition,
- context,
- metrics);
- }
-
- private ExecutorService getPusherThread() {
- ExecutorService pusherThread = asyncPushers[pusherIdx.get() %
asyncPushers.length];
- pusherIdx.incrementAndGet();
- return pusherThread;
- }
-
- // for testing
- public LifecycleManager getLifecycleManager() {
- return this.lifecycleManager;
+ super(conf);
}
}
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
similarity index 98%
copy from
client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
copy to
client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index dd597cc1a..71122c230 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -35,9 +35,9 @@ import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.protocol.ShuffleMode;
import org.apache.celeborn.common.util.ThreadUtils;
-public class RssShuffleManager implements ShuffleManager {
+public class SparkShuffleManager implements ShuffleManager {
- private static final Logger logger =
LoggerFactory.getLogger(RssShuffleManager.class);
+ private static final Logger logger =
LoggerFactory.getLogger(SparkShuffleManager.class);
private static final String sortShuffleManagerName =
"org.apache.spark.shuffle.sort.SortShuffleManager";
@@ -58,7 +58,7 @@ public class RssShuffleManager implements ShuffleManager {
private final ExecutorService[] asyncPushers;
private AtomicInteger pusherIdx = new AtomicInteger(0);
- public RssShuffleManager(SparkConf conf) {
+ public SparkShuffleManager(SparkConf conf) {
this.conf = conf;
this.celebornConf = SparkUtils.fromSparkConf(conf);
this.cores = conf.getInt(SparkLauncher.EXECUTOR_CORES, 1);
diff --git
a/client-spark/spark-2/src/test/scala/org/apache/spark/shuffle/celeborn/RssShuffleManagerSuite.scala
b/client-spark/spark-3/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleManagerSuite.scala
similarity index 89%
rename from
client-spark/spark-2/src/test/scala/org/apache/spark/shuffle/celeborn/RssShuffleManagerSuite.scala
rename to
client-spark/spark-3/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleManagerSuite.scala
index 62047f1e1..5bc034e9f 100644
---
a/client-spark/spark-2/src/test/scala/org/apache/spark/shuffle/celeborn/RssShuffleManagerSuite.scala
+++
b/client-spark/spark-3/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleManagerSuite.scala
@@ -26,12 +26,13 @@ import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
@RunWith(classOf[JUnit4])
-class RssShuffleManagerSuite extends Logging {
-
+class SparkShuffleManagerSuite extends Logging {
@junit.Test
def testFallBack(): Unit = {
val conf = new SparkConf().setIfMissing("spark.master", "local")
- .setIfMissing("spark.shuffle.manager",
"org.apache.spark.shuffle.celeborn.RssShuffleManager")
+ .setIfMissing(
+ "spark.shuffle.manager",
+ "org.apache.spark.shuffle.celeborn.SparkShuffleManager")
.set(s"spark.${CelebornConf.MASTER_ENDPOINTS.key}", "localhost:9097")
.set(s"spark.${CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key}", "false")
.set("spark.shuffle.service.enabled", "false")
@@ -49,7 +50,9 @@ class RssShuffleManagerSuite extends Logging {
@junit.Test
def testClusterNotAvailable(): Unit = {
val conf = new SparkConf().setIfMissing("spark.master", "local")
- .setIfMissing("spark.shuffle.manager",
"org.apache.spark.shuffle.celeborn.RssShuffleManager")
+ .setIfMissing(
+ "spark.shuffle.manager",
+ "org.apache.spark.shuffle.celeborn.SparkShuffleManager")
.set(s"spark.${CelebornConf.MASTER_ENDPOINTS.key}", "localhost:9097")
.set(s"spark.${CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key}", "false")
.set("spark.shuffle.service.enabled", "false")
@@ -63,5 +66,4 @@ class RssShuffleManagerSuite extends Logging {
// scalastyle:on println
sc.stop()
}
-
}
diff --git
a/common/src/main/java/org/apache/celeborn/common/protocol/RpcNameConstants.java
b/common/src/main/java/org/apache/celeborn/common/protocol/RpcNameConstants.java
index 37978a98e..d241d7ed1 100644
---
a/common/src/main/java/org/apache/celeborn/common/protocol/RpcNameConstants.java
+++
b/common/src/main/java/org/apache/celeborn/common/protocol/RpcNameConstants.java
@@ -30,7 +30,7 @@ public class RpcNameConstants {
// Worker Endpoint Name
public static String WORKER_EP = "WorkerEndpoint";
- // For Driver(RssShuffleManager)
+ // For Driver(SparkShuffleManager)
public static String RSS_METASERVICE_EP = "RssMetaServiceEndpoint";
public static String RSS_METASERVICE_SYS = "RssMetaSys";
}
diff --git a/docs/deploy.md b/docs/deploy.md
index bf5573ac5..4eed1abf8 100644
--- a/docs/deploy.md
+++ b/docs/deploy.md
@@ -154,7 +154,7 @@ Copy $CELEBORN_HOME/spark/*.jar to $SPARK_HOME/jars/
### Spark Configuration
To use Celeborn, following spark configurations should be added.
```properties
-spark.shuffle.manager org.apache.spark.shuffle.celeborn.RssShuffleManager
+spark.shuffle.manager org.apache.spark.shuffle.celeborn.SparkShuffleManager
# must use kryo serializer because java serializer do not support relocation
spark.serializer org.apache.spark.serializer.KryoSerializer
diff --git a/docs/migration.md b/docs/migration.md
index 81170aa53..13dc3762a 100644
--- a/docs/migration.md
+++ b/docs/migration.md
@@ -61,3 +61,6 @@ license: |
| `SlavePushDataHandshakeTime` | `ReplicaPushDataHandshakeTime` |
| `SlaveRegionStartTime` | `ReplicaRegionStartTime` |
| `SlaveRegionFinishTime` | `ReplicaRegionFinishTime` |
+
+ - Since 0.3.0, Celeborn's spark shuffle manager change from
`org.apache.spark.shuffle.celeborn.RssShuffleManager` to
`org.apache.spark.shuffle.celeborn.SparkShuffleManager`. User can set spark
property `spark.shuffle.manager` to
`org.apache.spark.shuffle.celeborn.SparkShuffleManager` to use Celeborn remote
shuffle service.
+ In 0.3.0, Celeborn still support
`org.apache.spark.shuffle.celeborn.RssShuffleManager`, it will be removed in
0.4.0.
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/PushDataTimeoutTest.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/PushDataTimeoutTest.scala
index bf80cc292..ed3dfb2b9 100644
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/PushDataTimeoutTest.scala
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/PushDataTimeoutTest.scala
@@ -20,7 +20,7 @@ package org.apache.celeborn.tests.spark
import scala.collection.JavaConverters._
import org.apache.spark.{SparkConf, SparkContextHelper}
-import org.apache.spark.shuffle.celeborn.RssShuffleManager
+import org.apache.spark.shuffle.celeborn.SparkShuffleManager
import org.apache.spark.sql.SparkSession
import org.scalatest.BeforeAndAfterEach
import org.scalatest.funsuite.AnyFunSuite
@@ -140,7 +140,7 @@ class PushDataTimeoutTest extends AnyFunSuite
assert(PushDataHandler.pushReplicaMergeDataTimeoutTested.get())
val excludedWorkers = SparkContextHelper.env
.shuffleManager
- .asInstanceOf[RssShuffleManager]
+ .asInstanceOf[SparkShuffleManager]
.getLifecycleManager
.workerStatusTracker
.excludedWorkers
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/ShuffleFallbackSuite.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/ShuffleFallbackSuite.scala
index 6bbe5482e..2354fac55 100644
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/ShuffleFallbackSuite.scala
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/ShuffleFallbackSuite.scala
@@ -41,7 +41,7 @@ class ShuffleFallbackSuite extends AnyFunSuite
}
private def enableRss(conf: SparkConf) = {
- conf.set("spark.shuffle.manager",
"org.apache.spark.shuffle.celeborn.RssShuffleManager")
+ conf.set("spark.shuffle.manager",
"org.apache.spark.shuffle.celeborn.SparkShuffleManager")
.set(s"spark.${CelebornConf.MASTER_ENDPOINTS.key}",
masterInfo._1.rpcEnv.address.toString)
}
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SkewJoinSuite.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SkewJoinSuite.scala
index 223ef8cea..c31d6f408 100644
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SkewJoinSuite.scala
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SkewJoinSuite.scala
@@ -41,7 +41,7 @@ class SkewJoinSuite extends AnyFunSuite
}
private def enableRss(conf: SparkConf) = {
- conf.set("spark.shuffle.manager",
"org.apache.spark.shuffle.celeborn.RssShuffleManager")
+ conf.set("spark.shuffle.manager",
"org.apache.spark.shuffle.celeborn.SparkShuffleManager")
.set(s"spark.${CelebornConf.MASTER_ENDPOINTS.key}",
masterInfo._1.rpcEnv.address.toString)
.set(s"spark.${CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD.key}",
"10MB")
}
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SparkTestBase.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SparkTestBase.scala
index 5ee1c6ae6..c194a087b 100644
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SparkTestBase.scala
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SparkTestBase.scala
@@ -48,7 +48,9 @@ trait SparkTestBase extends AnyFunSuite
def updateSparkConf(sparkConf: SparkConf, mode: ShuffleMode): SparkConf = {
sparkConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
- sparkConf.set("spark.shuffle.manager",
"org.apache.spark.shuffle.celeborn.RssShuffleManager")
+ sparkConf.set(
+ "spark.shuffle.manager",
+ "org.apache.spark.shuffle.celeborn.SparkShuffleManager")
sparkConf.set("spark.shuffle.useOldFetchProtocol", "true")
sparkConf.set("spark.sql.adaptive.enabled", "false")
sparkConf.set("spark.shuffle.service.enabled", "false")
diff --git a/toolkit/scripts/genConfs.py b/toolkit/scripts/genConfs.py
index b1d693033..004834892 100644
--- a/toolkit/scripts/genConfs.py
+++ b/toolkit/scripts/genConfs.py
@@ -94,7 +94,7 @@ def set_skew_join_confs(conf):
def set_rss_confs(conf, replicate=False):
- conf["spark.shuffle.manager"] =
"org.apache.spark.shuffle.celeborn.RssShuffleManager"
+ conf["spark.shuffle.manager"] =
"org.apache.spark.shuffle.celeborn.SparkShuffleManager"
conf["spark.serializer"] = "org.apache.spark.serializer.KryoSerializer"
conf["spark.celeborn.master.endpoints"] = "master-1-1:9097"
conf["spark.shuffle.service.enabled"] = "false"