This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new 35ec75dae [CELEBORN-754][IMPORTANT] Provide a new SparkShuffleManager 
to replace RssShuffleManager in the future
35ec75dae is described below

commit 35ec75dae45c7d2bf15ab6d561a7202e803c23a3
Author: Angerszhuuuu <[email protected]>
AuthorDate: Fri Jun 30 17:27:33 2023 +0800

    [CELEBORN-754][IMPORTANT] Provide a new SparkShuffleManager to replace 
RssShuffleManager in the future
    
    ### What changes were proposed in this pull request?
    Provide a new SparkShuffleManager to replace RssShuffleManager in the future
    
    ### Why are the changes needed?
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #1667 from AngersZhuuuu/CELEBORN-754.
    
    Lead-authored-by: Angerszhuuuu <[email protected]>
    Co-authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
    (cherry picked from commit 5c7ecb83029e0eae0c1637496fa6fcb40b148243)
    Signed-off-by: Cheng Pan <[email protected]>
---
 README.md                                          |   5 +-
 .../Celeborn_Dynamic_Allocation_spark2.patch       |   2 +-
 .../spark/shuffle/celeborn/RssShuffleManager.java  | 215 +--------------
 ...huffleManager.java => SparkShuffleManager.java} |   6 +-
 .../celeborn/CelebornShuffleManagerSuite.scala}    |  12 +-
 .../spark/shuffle/celeborn/RssShuffleManager.java  | 297 +--------------------
 ...huffleManager.java => SparkShuffleManager.java} |   6 +-
 .../celeborn/CelebornShuffleManagerSuite.scala}    |  12 +-
 .../celeborn/common/protocol/RpcNameConstants.java |   2 +-
 docs/deploy.md                                     |   2 +-
 docs/migration.md                                  |   3 +
 .../celeborn/tests/spark/PushDataTimeoutTest.scala |   4 +-
 .../tests/spark/ShuffleFallbackSuite.scala         |   2 +-
 .../celeborn/tests/spark/SkewJoinSuite.scala       |   2 +-
 .../celeborn/tests/spark/SparkTestBase.scala       |   4 +-
 toolkit/scripts/genConfs.py                        |   2 +-
 16 files changed, 46 insertions(+), 530 deletions(-)

diff --git a/README.md b/README.md
index 764c1b33a..9f368e2d0 100644
--- a/README.md
+++ b/README.md
@@ -233,7 +233,10 @@ Copy $CELEBORN_HOME/spark/*.jar to $SPARK_HOME/jars/
 #### Spark Configuration
 To use Celeborn,the following spark configurations should be added.
 ```properties
-spark.shuffle.manager org.apache.spark.shuffle.celeborn.RssShuffleManager
+# Shuffle manager class name changed in 0.3.0:
+#   before 0.3.0: org.apache.spark.shuffle.celeborn.RssShuffleManager
+#    since 0.3.0: org.apache.spark.shuffle.celeborn.SparkShuffleManager
+spark.shuffle.manager org.apache.spark.shuffle.celeborn.SparkShuffleManager
 # must use kryo serializer because java serializer do not support relocation
 spark.serializer org.apache.spark.serializer.KryoSerializer
 
diff --git a/assets/spark-patch/Celeborn_Dynamic_Allocation_spark2.patch 
b/assets/spark-patch/Celeborn_Dynamic_Allocation_spark2.patch
index 325f1211f..dc5ce50e3 100644
--- a/assets/spark-patch/Celeborn_Dynamic_Allocation_spark2.patch
+++ b/assets/spark-patch/Celeborn_Dynamic_Allocation_spark2.patch
@@ -43,7 +43,7 @@ diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scal
 +      !Utils.isCelebornEnabled(conf) && !testing) {
 +      throw new SparkException("Dynamic allocation of executors requires the 
external or remote " +
 +        "shuffle service. You may enable this through 
spark.shuffle.service.enabled or " +
-+        "set 
spark.shuffle.manager=org.apache.spark.shuffle.celeborn.RssShuffleManager.")
++        "set 
spark.shuffle.manager=org.apache.spark.shuffle.celeborn.SparkShuffleManager.")
      }
      if (tasksPerExecutorForFullParallelism == 0) {
        throw new SparkException("spark.executor.cores must not be < 
spark.task.cpus.")
diff --git 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
index e4515edcc..2df1faaf7 100644
--- 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
+++ 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
@@ -17,220 +17,11 @@
 
 package org.apache.spark.shuffle.celeborn;
 
-import java.io.IOException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.spark.SparkConf;
 
-import scala.Int;
-
-import org.apache.spark.*;
-import org.apache.spark.launcher.SparkLauncher;
-import org.apache.spark.shuffle.*;
-import org.apache.spark.shuffle.sort.SortShuffleManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.celeborn.client.LifecycleManager;
-import org.apache.celeborn.client.ShuffleClient;
-import org.apache.celeborn.common.CelebornConf;
-import org.apache.celeborn.common.protocol.ShuffleMode;
-import org.apache.celeborn.common.util.ThreadUtils;
-
-public class RssShuffleManager implements ShuffleManager {
-
-  private static final Logger logger = 
LoggerFactory.getLogger(RssShuffleManager.class);
-
-  private static final String sortShuffleManagerName =
-      "org.apache.spark.shuffle.sort.SortShuffleManager";
-
-  private final SparkConf conf;
-  private final CelebornConf celebornConf;
-  private final int cores;
-  // either be "{appId}_{appAttemptId}" or "{appId}"
-  private String appUniqueId;
-
-  private LifecycleManager lifecycleManager;
-  private ShuffleClient rssShuffleClient;
-  private volatile SortShuffleManager _sortShuffleManager;
-  private final ConcurrentHashMap.KeySetView<Integer, Boolean> sortShuffleIds =
-      ConcurrentHashMap.newKeySet();
-  private final RssShuffleFallbackPolicyRunner fallbackPolicyRunner;
-
-  private final ExecutorService[] asyncPushers;
-  private AtomicInteger pusherIdx = new AtomicInteger(0);
+public class RssShuffleManager extends SparkShuffleManager {
 
   public RssShuffleManager(SparkConf conf) {
-    this.conf = conf;
-    this.celebornConf = SparkUtils.fromSparkConf(conf);
-    this.cores = conf.getInt(SparkLauncher.EXECUTOR_CORES, 1);
-    this.fallbackPolicyRunner = new 
RssShuffleFallbackPolicyRunner(celebornConf);
-    if (ShuffleMode.SORT.equals(celebornConf.shuffleWriterMode())
-        && celebornConf.clientPushSortPipelineEnabled()) {
-      asyncPushers = new ExecutorService[cores];
-      for (int i = 0; i < asyncPushers.length; i++) {
-        asyncPushers[i] = 
ThreadUtils.newDaemonSingleThreadExecutor("async-pusher-" + i);
-      }
-    } else {
-      asyncPushers = null;
-    }
-  }
-
-  private boolean isDriver() {
-    return "driver".equals(SparkEnv.get().executorId());
-  }
-
-  private SortShuffleManager sortShuffleManager() {
-    if (_sortShuffleManager == null) {
-      synchronized (this) {
-        if (_sortShuffleManager == null) {
-          _sortShuffleManager =
-              SparkUtils.instantiateClass(sortShuffleManagerName, conf, 
isDriver());
-        }
-      }
-    }
-    return _sortShuffleManager;
-  }
-
-  private void initializeLifecycleManager(String appId) {
-    // Only create LifecycleManager singleton in Driver. When register shuffle 
multiple times, we
-    // need to ensure that LifecycleManager will only be created once. 
Parallelism needs to be
-    // considered in this place, because if there is one RDD that depends on 
multiple RDDs
-    // at the same time, it may bring parallel `register shuffle`, such as 
Join in Sql.
-    if (isDriver() && lifecycleManager == null) {
-      synchronized (this) {
-        if (lifecycleManager == null) {
-          lifecycleManager = new LifecycleManager(appId, celebornConf);
-          rssShuffleClient =
-              ShuffleClient.get(
-                  appUniqueId,
-                  lifecycleManager.getRssMetaServiceHost(),
-                  lifecycleManager.getRssMetaServicePort(),
-                  celebornConf,
-                  lifecycleManager.getUserIdentifier());
-        }
-      }
-    }
-  }
-
-  @Override
-  public <K, V, C> ShuffleHandle registerShuffle(
-      int shuffleId, int numMaps, ShuffleDependency<K, V, C> dependency) {
-    // Note: generate app unique id at driver side, make sure 
dependency.rdd.context
-    // is the same SparkContext among different shuffleIds.
-    // This method may be called many times.
-    appUniqueId = SparkUtils.appUniqueId(dependency.rdd().context());
-    initializeLifecycleManager(appUniqueId);
-
-    if (fallbackPolicyRunner.applyAllFallbackPolicy(
-        lifecycleManager, dependency.partitioner().numPartitions())) {
-      logger.warn("Fallback to SortShuffleManager!");
-      sortShuffleIds.add(shuffleId);
-      return sortShuffleManager().registerShuffle(shuffleId, numMaps, 
dependency);
-    } else {
-      return new RssShuffleHandle<>(
-          appUniqueId,
-          lifecycleManager.getRssMetaServiceHost(),
-          lifecycleManager.getRssMetaServicePort(),
-          lifecycleManager.getUserIdentifier(),
-          shuffleId,
-          numMaps,
-          dependency);
-    }
-  }
-
-  @Override
-  public boolean unregisterShuffle(int shuffleId) {
-    if (sortShuffleIds.contains(shuffleId)) {
-      return sortShuffleManager().unregisterShuffle(shuffleId);
-    }
-    if (appUniqueId == null) {
-      return true;
-    }
-    if (rssShuffleClient == null) {
-      return false;
-    }
-    return rssShuffleClient.unregisterShuffle(shuffleId, isDriver());
-  }
-
-  @Override
-  public ShuffleBlockResolver shuffleBlockResolver() {
-    return sortShuffleManager().shuffleBlockResolver();
-  }
-
-  @Override
-  public void stop() {
-    if (rssShuffleClient != null) {
-      rssShuffleClient.shutdown();
-    }
-    if (lifecycleManager != null) {
-      lifecycleManager.stop();
-    }
-    if (sortShuffleManager() != null) {
-      sortShuffleManager().stop();
-    }
-  }
-
-  @Override
-  public <K, V> ShuffleWriter<K, V> getWriter(
-      ShuffleHandle handle, int mapId, TaskContext context) {
-    try {
-      if (handle instanceof RssShuffleHandle) {
-        @SuppressWarnings("unchecked")
-        RssShuffleHandle<K, V, ?> h = ((RssShuffleHandle<K, V, ?>) handle);
-        ShuffleClient client =
-            ShuffleClient.get(
-                appUniqueId,
-                h.rssMetaServiceHost(),
-                h.rssMetaServicePort(),
-                celebornConf,
-                h.userIdentifier());
-        if (ShuffleMode.SORT.equals(celebornConf.shuffleWriterMode())) {
-          ExecutorService pushThread =
-              celebornConf.clientPushSortPipelineEnabled() ? getPusherThread() 
: null;
-          return new SortBasedShuffleWriter<>(
-              h.dependency(),
-              h.appUniqueId(),
-              h.numMaps(),
-              context,
-              celebornConf,
-              client,
-              pushThread);
-        } else if (ShuffleMode.HASH.equals(celebornConf.shuffleWriterMode())) {
-          return new HashBasedShuffleWriter<>(
-              h, mapId, context, celebornConf, client, 
SendBufferPool.get(cores));
-        } else {
-          throw new UnsupportedOperationException(
-              "Unrecognized shuffle write mode!" + 
celebornConf.shuffleWriterMode());
-        }
-      } else {
-        sortShuffleIds.add(handle.shuffleId());
-        return sortShuffleManager().getWriter(handle, mapId, context);
-      }
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public <K, C> ShuffleReader<K, C> getReader(
-      ShuffleHandle handle, int startPartition, int endPartition, TaskContext 
context) {
-    if (handle instanceof RssShuffleHandle) {
-      @SuppressWarnings("unchecked")
-      RssShuffleHandle<K, ?, C> h = (RssShuffleHandle<K, ?, C>) handle;
-      return new RssShuffleReader<>(
-          h, startPartition, endPartition, 0, Int.MaxValue(), context, 
celebornConf);
-    }
-    return _sortShuffleManager.getReader(handle, startPartition, endPartition, 
context);
-  }
-
-  private ExecutorService getPusherThread() {
-    ExecutorService pusherThread = asyncPushers[pusherIdx.get() % 
asyncPushers.length];
-    pusherIdx.incrementAndGet();
-    return pusherThread;
-  }
-
-  // for testing
-  public LifecycleManager getLifecycleManager() {
-    return this.lifecycleManager;
+    super(conf);
   }
 }
diff --git 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
similarity index 97%
copy from 
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
copy to 
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index e4515edcc..2e3776f7b 100644
--- 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
+++ 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -37,9 +37,9 @@ import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.common.protocol.ShuffleMode;
 import org.apache.celeborn.common.util.ThreadUtils;
 
-public class RssShuffleManager implements ShuffleManager {
+public class SparkShuffleManager implements ShuffleManager {
 
-  private static final Logger logger = 
LoggerFactory.getLogger(RssShuffleManager.class);
+  private static final Logger logger = 
LoggerFactory.getLogger(SparkShuffleManager.class);
 
   private static final String sortShuffleManagerName =
       "org.apache.spark.shuffle.sort.SortShuffleManager";
@@ -60,7 +60,7 @@ public class RssShuffleManager implements ShuffleManager {
   private final ExecutorService[] asyncPushers;
   private AtomicInteger pusherIdx = new AtomicInteger(0);
 
-  public RssShuffleManager(SparkConf conf) {
+  public SparkShuffleManager(SparkConf conf) {
     this.conf = conf;
     this.celebornConf = SparkUtils.fromSparkConf(conf);
     this.cores = conf.getInt(SparkLauncher.EXECUTOR_CORES, 1);
diff --git 
a/client-spark/spark-3/src/test/scala/org/apache/spark/shuffle/celeborn/RssShuffleManagerSuite.scala
 
b/client-spark/spark-2/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleManagerSuite.scala
similarity index 89%
rename from 
client-spark/spark-3/src/test/scala/org/apache/spark/shuffle/celeborn/RssShuffleManagerSuite.scala
rename to 
client-spark/spark-2/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleManagerSuite.scala
index 54725730b..c1cbdd0aa 100644
--- 
a/client-spark/spark-3/src/test/scala/org/apache/spark/shuffle/celeborn/RssShuffleManagerSuite.scala
+++ 
b/client-spark/spark-2/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleManagerSuite.scala
@@ -26,11 +26,14 @@ import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.internal.Logging
 
 @RunWith(classOf[JUnit4])
-class RssShuffleManagerSuite extends Logging {
+class SparkShuffleManagerSuite extends Logging {
+
   @junit.Test
   def testFallBack(): Unit = {
     val conf = new SparkConf().setIfMissing("spark.master", "local")
-      .setIfMissing("spark.shuffle.manager", 
"org.apache.spark.shuffle.celeborn.RssShuffleManager")
+      .setIfMissing(
+        "spark.shuffle.manager",
+        "org.apache.spark.shuffle.celeborn.SparkShuffleManager")
       .set(s"spark.${CelebornConf.MASTER_ENDPOINTS.key}", "localhost:9097")
       .set(s"spark.${CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key}", "false")
       .set("spark.shuffle.service.enabled", "false")
@@ -48,7 +51,9 @@ class RssShuffleManagerSuite extends Logging {
   @junit.Test
   def testClusterNotAvailable(): Unit = {
     val conf = new SparkConf().setIfMissing("spark.master", "local")
-      .setIfMissing("spark.shuffle.manager", 
"org.apache.spark.shuffle.celeborn.RssShuffleManager")
+      .setIfMissing(
+        "spark.shuffle.manager",
+        "org.apache.spark.shuffle.celeborn.SparkShuffleManager")
       .set(s"spark.${CelebornConf.MASTER_ENDPOINTS.key}", "localhost:9097")
       .set(s"spark.${CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key}", "false")
       .set("spark.shuffle.service.enabled", "false")
@@ -62,4 +67,5 @@ class RssShuffleManagerSuite extends Logging {
     // scalastyle:on println
     sc.stop()
   }
+
 }
diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
index dd597cc1a..2df1faaf7 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
@@ -17,302 +17,11 @@
 
 package org.apache.spark.shuffle.celeborn;
 
-import java.io.IOException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.spark.SparkConf;
 
-import org.apache.spark.*;
-import org.apache.spark.launcher.SparkLauncher;
-import org.apache.spark.shuffle.*;
-import org.apache.spark.shuffle.sort.SortShuffleManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.celeborn.client.LifecycleManager;
-import org.apache.celeborn.client.ShuffleClient;
-import org.apache.celeborn.common.CelebornConf;
-import org.apache.celeborn.common.protocol.ShuffleMode;
-import org.apache.celeborn.common.util.ThreadUtils;
-
-public class RssShuffleManager implements ShuffleManager {
-
-  private static final Logger logger = 
LoggerFactory.getLogger(RssShuffleManager.class);
-
-  private static final String sortShuffleManagerName =
-      "org.apache.spark.shuffle.sort.SortShuffleManager";
-
-  private final SparkConf conf;
-  private final CelebornConf celebornConf;
-  private final int cores;
-  // either be "{appId}_{appAttemptId}" or "{appId}"
-  private String appUniqueId;
-
-  private LifecycleManager lifecycleManager;
-  private ShuffleClient rssShuffleClient;
-  private volatile SortShuffleManager _sortShuffleManager;
-  private final ConcurrentHashMap.KeySetView<Integer, Boolean> sortShuffleIds =
-      ConcurrentHashMap.newKeySet();
-  private final RssShuffleFallbackPolicyRunner fallbackPolicyRunner;
-
-  private final ExecutorService[] asyncPushers;
-  private AtomicInteger pusherIdx = new AtomicInteger(0);
+public class RssShuffleManager extends SparkShuffleManager {
 
   public RssShuffleManager(SparkConf conf) {
-    this.conf = conf;
-    this.celebornConf = SparkUtils.fromSparkConf(conf);
-    this.cores = conf.getInt(SparkLauncher.EXECUTOR_CORES, 1);
-    this.fallbackPolicyRunner = new 
RssShuffleFallbackPolicyRunner(celebornConf);
-    if (ShuffleMode.SORT.equals(celebornConf.shuffleWriterMode())
-        && celebornConf.clientPushSortPipelineEnabled()) {
-      asyncPushers = new ExecutorService[cores];
-      for (int i = 0; i < asyncPushers.length; i++) {
-        asyncPushers[i] = 
ThreadUtils.newDaemonSingleThreadExecutor("async-pusher-" + i);
-      }
-    } else {
-      asyncPushers = null;
-    }
-  }
-
-  private boolean isDriver() {
-    return "driver".equals(SparkEnv.get().executorId());
-  }
-
-  private SortShuffleManager sortShuffleManager() {
-    if (_sortShuffleManager == null) {
-      synchronized (this) {
-        if (_sortShuffleManager == null) {
-          _sortShuffleManager =
-              SparkUtils.instantiateClass(sortShuffleManagerName, conf, 
isDriver());
-        }
-      }
-    }
-    return _sortShuffleManager;
-  }
-
-  private void initializeLifecycleManager() {
-    // Only create LifecycleManager singleton in Driver. When register shuffle 
multiple times, we
-    // need to ensure that LifecycleManager will only be created once. 
Parallelism needs to be
-    // considered in this place, because if there is one RDD that depends on 
multiple RDDs
-    // at the same time, it may bring parallel `register shuffle`, such as 
Join in Sql.
-    if (isDriver() && lifecycleManager == null) {
-      synchronized (this) {
-        if (lifecycleManager == null) {
-          lifecycleManager = new LifecycleManager(appUniqueId, celebornConf);
-          rssShuffleClient =
-              ShuffleClient.get(
-                  appUniqueId,
-                  lifecycleManager.getRssMetaServiceHost(),
-                  lifecycleManager.getRssMetaServicePort(),
-                  celebornConf,
-                  lifecycleManager.getUserIdentifier());
-        }
-      }
-    }
-  }
-
-  @Override
-  public <K, V, C> ShuffleHandle registerShuffle(
-      int shuffleId, ShuffleDependency<K, V, C> dependency) {
-    // Note: generate app unique id at driver side, make sure 
dependency.rdd.context
-    // is the same SparkContext among different shuffleIds.
-    // This method may be called many times.
-    appUniqueId = SparkUtils.appUniqueId(dependency.rdd().context());
-    initializeLifecycleManager();
-
-    if (fallbackPolicyRunner.applyAllFallbackPolicy(
-        lifecycleManager, dependency.partitioner().numPartitions())) {
-      logger.warn("Fallback to SortShuffleManager!");
-      sortShuffleIds.add(shuffleId);
-      return sortShuffleManager().registerShuffle(shuffleId, dependency);
-    } else {
-      return new RssShuffleHandle<>(
-          appUniqueId,
-          lifecycleManager.getRssMetaServiceHost(),
-          lifecycleManager.getRssMetaServicePort(),
-          lifecycleManager.getUserIdentifier(),
-          shuffleId,
-          dependency.rdd().getNumPartitions(),
-          dependency);
-    }
-  }
-
-  @Override
-  public boolean unregisterShuffle(int shuffleId) {
-    if (sortShuffleIds.contains(shuffleId)) {
-      return sortShuffleManager().unregisterShuffle(shuffleId);
-    }
-    if (appUniqueId == null) {
-      return true;
-    }
-    if (rssShuffleClient == null) {
-      return false;
-    }
-    return rssShuffleClient.unregisterShuffle(shuffleId, isDriver());
-  }
-
-  @Override
-  public ShuffleBlockResolver shuffleBlockResolver() {
-    return sortShuffleManager().shuffleBlockResolver();
-  }
-
-  @Override
-  public void stop() {
-    if (rssShuffleClient != null) {
-      rssShuffleClient.shutdown();
-      ShuffleClient.reset();
-      rssShuffleClient = null;
-    }
-    if (lifecycleManager != null) {
-      lifecycleManager.stop();
-      lifecycleManager = null;
-    }
-    if (sortShuffleManager() != null) {
-      sortShuffleManager().stop();
-      _sortShuffleManager = null;
-    }
-  }
-
-  @Override
-  public <K, V> ShuffleWriter<K, V> getWriter(
-      ShuffleHandle handle, long mapId, TaskContext context, 
ShuffleWriteMetricsReporter metrics) {
-    try {
-      if (handle instanceof RssShuffleHandle) {
-        @SuppressWarnings("unchecked")
-        RssShuffleHandle<K, V, ?> h = ((RssShuffleHandle<K, V, ?>) handle);
-        ShuffleClient client =
-            ShuffleClient.get(
-                h.appUniqueId(),
-                h.rssMetaServiceHost(),
-                h.rssMetaServicePort(),
-                celebornConf,
-                h.userIdentifier());
-        if (ShuffleMode.SORT.equals(celebornConf.shuffleWriterMode())) {
-          ExecutorService pushThread =
-              celebornConf.clientPushSortPipelineEnabled() ? getPusherThread() 
: null;
-          return new SortBasedShuffleWriter<>(
-              h.dependency(),
-              h.appUniqueId(),
-              h.numMappers(),
-              context,
-              celebornConf,
-              client,
-              metrics,
-              pushThread);
-        } else if (ShuffleMode.HASH.equals(celebornConf.shuffleWriterMode())) {
-          return new HashBasedShuffleWriter<>(
-              h, context, celebornConf, client, metrics, 
SendBufferPool.get(cores));
-        } else {
-          throw new UnsupportedOperationException(
-              "Unrecognized shuffle write mode!" + 
celebornConf.shuffleWriterMode());
-        }
-      } else {
-        sortShuffleIds.add(handle.shuffleId());
-        return sortShuffleManager().getWriter(handle, mapId, context, metrics);
-      }
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  // Added in SPARK-32055, for Spark 3.1 and above
-  public <K, C> ShuffleReader<K, C> getReader(
-      ShuffleHandle handle,
-      int startMapIndex,
-      int endMapIndex,
-      int startPartition,
-      int endPartition,
-      TaskContext context,
-      ShuffleReadMetricsReporter metrics) {
-    if (handle instanceof RssShuffleHandle) {
-      @SuppressWarnings("unchecked")
-      RssShuffleHandle<K, ?, C> h = (RssShuffleHandle<K, ?, C>) handle;
-      return new RssShuffleReader<>(
-          h,
-          startPartition,
-          endPartition,
-          startMapIndex,
-          endMapIndex,
-          context,
-          celebornConf,
-          metrics);
-    }
-    return SparkUtils.getReader(
-        sortShuffleManager(),
-        handle,
-        startMapIndex,
-        endMapIndex,
-        startPartition,
-        endPartition,
-        context,
-        metrics);
-  }
-
-  // Marked as final in SPARK-32055, reserved for Spark 3.0
-  public <K, C> ShuffleReader<K, C> getReader(
-      ShuffleHandle handle,
-      int startPartition,
-      int endPartition,
-      TaskContext context,
-      ShuffleReadMetricsReporter metrics) {
-    if (handle instanceof RssShuffleHandle) {
-      @SuppressWarnings("unchecked")
-      RssShuffleHandle<K, ?, C> h = (RssShuffleHandle<K, ?, C>) handle;
-      return new RssShuffleReader<>(
-          h, startPartition, endPartition, 0, Integer.MAX_VALUE, context, 
celebornConf, metrics);
-    }
-    return SparkUtils.getReader(
-        sortShuffleManager(),
-        handle,
-        0,
-        Integer.MAX_VALUE,
-        startPartition,
-        endPartition,
-        context,
-        metrics);
-  }
-
-  // Renamed to getReader in SPARK-32055, reserved for Spark 3.0
-  public <K, C> ShuffleReader<K, C> getReaderForRange(
-      ShuffleHandle handle,
-      int startMapIndex,
-      int endMapIndex,
-      int startPartition,
-      int endPartition,
-      TaskContext context,
-      ShuffleReadMetricsReporter metrics) {
-    if (handle instanceof RssShuffleHandle) {
-      @SuppressWarnings("unchecked")
-      RssShuffleHandle<K, ?, C> h = (RssShuffleHandle<K, ?, C>) handle;
-      return new RssShuffleReader<>(
-          h,
-          startPartition,
-          endPartition,
-          startMapIndex,
-          endMapIndex,
-          context,
-          celebornConf,
-          metrics);
-    }
-    return SparkUtils.getReader(
-        sortShuffleManager(),
-        handle,
-        startMapIndex,
-        endMapIndex,
-        startPartition,
-        endPartition,
-        context,
-        metrics);
-  }
-
-  private ExecutorService getPusherThread() {
-    ExecutorService pusherThread = asyncPushers[pusherIdx.get() % 
asyncPushers.length];
-    pusherIdx.incrementAndGet();
-    return pusherThread;
-  }
-
-  // for testing
-  public LifecycleManager getLifecycleManager() {
-    return this.lifecycleManager;
+    super(conf);
   }
 }
diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
similarity index 98%
copy from 
client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
copy to 
client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index dd597cc1a..71122c230 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -35,9 +35,9 @@ import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.common.protocol.ShuffleMode;
 import org.apache.celeborn.common.util.ThreadUtils;
 
-public class RssShuffleManager implements ShuffleManager {
+public class SparkShuffleManager implements ShuffleManager {
 
-  private static final Logger logger = 
LoggerFactory.getLogger(RssShuffleManager.class);
+  private static final Logger logger = 
LoggerFactory.getLogger(SparkShuffleManager.class);
 
   private static final String sortShuffleManagerName =
       "org.apache.spark.shuffle.sort.SortShuffleManager";
@@ -58,7 +58,7 @@ public class RssShuffleManager implements ShuffleManager {
   private final ExecutorService[] asyncPushers;
   private AtomicInteger pusherIdx = new AtomicInteger(0);
 
-  public RssShuffleManager(SparkConf conf) {
+  public SparkShuffleManager(SparkConf conf) {
     this.conf = conf;
     this.celebornConf = SparkUtils.fromSparkConf(conf);
     this.cores = conf.getInt(SparkLauncher.EXECUTOR_CORES, 1);
diff --git 
a/client-spark/spark-2/src/test/scala/org/apache/spark/shuffle/celeborn/RssShuffleManagerSuite.scala
 
b/client-spark/spark-3/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleManagerSuite.scala
similarity index 89%
rename from 
client-spark/spark-2/src/test/scala/org/apache/spark/shuffle/celeborn/RssShuffleManagerSuite.scala
rename to 
client-spark/spark-3/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleManagerSuite.scala
index 62047f1e1..5bc034e9f 100644
--- 
a/client-spark/spark-2/src/test/scala/org/apache/spark/shuffle/celeborn/RssShuffleManagerSuite.scala
+++ 
b/client-spark/spark-3/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleManagerSuite.scala
@@ -26,12 +26,13 @@ import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.internal.Logging
 
 @RunWith(classOf[JUnit4])
-class RssShuffleManagerSuite extends Logging {
-
+class SparkShuffleManagerSuite extends Logging {
   @junit.Test
   def testFallBack(): Unit = {
     val conf = new SparkConf().setIfMissing("spark.master", "local")
-      .setIfMissing("spark.shuffle.manager", 
"org.apache.spark.shuffle.celeborn.RssShuffleManager")
+      .setIfMissing(
+        "spark.shuffle.manager",
+        "org.apache.spark.shuffle.celeborn.SparkShuffleManager")
       .set(s"spark.${CelebornConf.MASTER_ENDPOINTS.key}", "localhost:9097")
       .set(s"spark.${CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key}", "false")
       .set("spark.shuffle.service.enabled", "false")
@@ -49,7 +50,9 @@ class RssShuffleManagerSuite extends Logging {
   @junit.Test
   def testClusterNotAvailable(): Unit = {
     val conf = new SparkConf().setIfMissing("spark.master", "local")
-      .setIfMissing("spark.shuffle.manager", 
"org.apache.spark.shuffle.celeborn.RssShuffleManager")
+      .setIfMissing(
+        "spark.shuffle.manager",
+        "org.apache.spark.shuffle.celeborn.SparkShuffleManager")
       .set(s"spark.${CelebornConf.MASTER_ENDPOINTS.key}", "localhost:9097")
       .set(s"spark.${CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key}", "false")
       .set("spark.shuffle.service.enabled", "false")
@@ -63,5 +66,4 @@ class RssShuffleManagerSuite extends Logging {
     // scalastyle:on println
     sc.stop()
   }
-
 }
diff --git 
a/common/src/main/java/org/apache/celeborn/common/protocol/RpcNameConstants.java
 
b/common/src/main/java/org/apache/celeborn/common/protocol/RpcNameConstants.java
index 37978a98e..d241d7ed1 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/protocol/RpcNameConstants.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/protocol/RpcNameConstants.java
@@ -30,7 +30,7 @@ public class RpcNameConstants {
   // Worker Endpoint Name
   public static String WORKER_EP = "WorkerEndpoint";
 
-  // For Driver(RssShuffleManager)
+  // For Driver(SparkShuffleManager)
   public static String RSS_METASERVICE_EP = "RssMetaServiceEndpoint";
   public static String RSS_METASERVICE_SYS = "RssMetaSys";
 }
diff --git a/docs/deploy.md b/docs/deploy.md
index bf5573ac5..4eed1abf8 100644
--- a/docs/deploy.md
+++ b/docs/deploy.md
@@ -154,7 +154,7 @@ Copy $CELEBORN_HOME/spark/*.jar to $SPARK_HOME/jars/
 ### Spark Configuration
 To use Celeborn, following spark configurations should be added.
 ```properties
-spark.shuffle.manager org.apache.spark.shuffle.celeborn.RssShuffleManager
+spark.shuffle.manager org.apache.spark.shuffle.celeborn.SparkShuffleManager
 # must use kryo serializer because java serializer do not support relocation
 spark.serializer org.apache.spark.serializer.KryoSerializer
 
diff --git a/docs/migration.md b/docs/migration.md
index 81170aa53..13dc3762a 100644
--- a/docs/migration.md
+++ b/docs/migration.md
@@ -61,3 +61,6 @@ license: |
      | `SlavePushDataHandshakeTime`  | `ReplicaPushDataHandshakeTime` |
      | `SlaveRegionStartTime`        | `ReplicaRegionStartTime`       |
      | `SlaveRegionFinishTime`       | `ReplicaRegionFinishTime`      |
+
+ - Since 0.3.0, Celeborn's spark shuffle manager change from 
`org.apache.spark.shuffle.celeborn.RssShuffleManager` to 
`org.apache.spark.shuffle.celeborn.SparkShuffleManager`. User can set spark 
property `spark.shuffle.manager` to 
`org.apache.spark.shuffle.celeborn.SparkShuffleManager` to use Celeborn remote 
shuffle service.
+   In 0.3.0, Celeborn still support 
`org.apache.spark.shuffle.celeborn.RssShuffleManager`, it will be removed in 
0.4.0.
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/PushDataTimeoutTest.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/PushDataTimeoutTest.scala
index bf80cc292..ed3dfb2b9 100644
--- 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/PushDataTimeoutTest.scala
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/PushDataTimeoutTest.scala
@@ -20,7 +20,7 @@ package org.apache.celeborn.tests.spark
 import scala.collection.JavaConverters._
 
 import org.apache.spark.{SparkConf, SparkContextHelper}
-import org.apache.spark.shuffle.celeborn.RssShuffleManager
+import org.apache.spark.shuffle.celeborn.SparkShuffleManager
 import org.apache.spark.sql.SparkSession
 import org.scalatest.BeforeAndAfterEach
 import org.scalatest.funsuite.AnyFunSuite
@@ -140,7 +140,7 @@ class PushDataTimeoutTest extends AnyFunSuite
     assert(PushDataHandler.pushReplicaMergeDataTimeoutTested.get())
     val excludedWorkers = SparkContextHelper.env
       .shuffleManager
-      .asInstanceOf[RssShuffleManager]
+      .asInstanceOf[SparkShuffleManager]
       .getLifecycleManager
       .workerStatusTracker
       .excludedWorkers
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/ShuffleFallbackSuite.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/ShuffleFallbackSuite.scala
index 6bbe5482e..2354fac55 100644
--- 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/ShuffleFallbackSuite.scala
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/ShuffleFallbackSuite.scala
@@ -41,7 +41,7 @@ class ShuffleFallbackSuite extends AnyFunSuite
   }
 
   private def enableRss(conf: SparkConf) = {
-    conf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.celeborn.RssShuffleManager")
+    conf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.celeborn.SparkShuffleManager")
       .set(s"spark.${CelebornConf.MASTER_ENDPOINTS.key}", 
masterInfo._1.rpcEnv.address.toString)
   }
 
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SkewJoinSuite.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SkewJoinSuite.scala
index 223ef8cea..c31d6f408 100644
--- 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SkewJoinSuite.scala
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SkewJoinSuite.scala
@@ -41,7 +41,7 @@ class SkewJoinSuite extends AnyFunSuite
   }
 
   private def enableRss(conf: SparkConf) = {
-    conf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.celeborn.RssShuffleManager")
+    conf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.celeborn.SparkShuffleManager")
       .set(s"spark.${CelebornConf.MASTER_ENDPOINTS.key}", 
masterInfo._1.rpcEnv.address.toString)
       .set(s"spark.${CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD.key}", 
"10MB")
   }
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SparkTestBase.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SparkTestBase.scala
index 5ee1c6ae6..c194a087b 100644
--- 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SparkTestBase.scala
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SparkTestBase.scala
@@ -48,7 +48,9 @@ trait SparkTestBase extends AnyFunSuite
 
   def updateSparkConf(sparkConf: SparkConf, mode: ShuffleMode): SparkConf = {
     sparkConf.set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
-    sparkConf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.celeborn.RssShuffleManager")
+    sparkConf.set(
+      "spark.shuffle.manager",
+      "org.apache.spark.shuffle.celeborn.SparkShuffleManager")
     sparkConf.set("spark.shuffle.useOldFetchProtocol", "true")
     sparkConf.set("spark.sql.adaptive.enabled", "false")
     sparkConf.set("spark.shuffle.service.enabled", "false")
diff --git a/toolkit/scripts/genConfs.py b/toolkit/scripts/genConfs.py
index b1d693033..004834892 100644
--- a/toolkit/scripts/genConfs.py
+++ b/toolkit/scripts/genConfs.py
@@ -94,7 +94,7 @@ def set_skew_join_confs(conf):
 
 
 def set_rss_confs(conf, replicate=False):
-    conf["spark.shuffle.manager"] = 
"org.apache.spark.shuffle.celeborn.RssShuffleManager"
+    conf["spark.shuffle.manager"] = 
"org.apache.spark.shuffle.celeborn.SparkShuffleManager"
     conf["spark.serializer"] = "org.apache.spark.serializer.KryoSerializer"
     conf["spark.celeborn.master.endpoints"] = "master-1-1:9097"
     conf["spark.shuffle.service.enabled"] = "false"


Reply via email to