This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 5471a6afe [CELEBORN-804] ShuffleClient should cleanup shuffle infos 
when trigger unregisterShuffle
5471a6afe is described below

commit 5471a6afe5fd443956c44a51dfc63903411ca35e
Author: Angerszhuuuu <[email protected]>
AuthorDate: Wed Jul 19 20:50:18 2023 +0800

    [CELEBORN-804] ShuffleClient should cleanup shuffle infos when trigger 
unregisterShuffle
    
    ### What changes were proposed in this pull request?
    
    After discussion, we make sure that `shuffleManager.unregisterShuffle()` 
will be triggered by Spark both in driver and executor. In this pr:
    
      1. Add shuffle client both in driver and executor side in ShuffleManager
      2. ShuffleClient call cleanupShuffle() when trigger `unregisterShuffle`.
    
    This replaced https://github.com/apache/incubator-celeborn/pull/1719
    
    ### Why are the changes needed?
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #1726 from AngersZhuuuu/CELEBORN-804.
    
    Authored-by: Angerszhuuuu <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../celeborn/plugin/flink/RemoteShuffleMaster.java |  2 +-
 .../shuffle/celeborn/SparkShuffleManager.java      | 23 ++++++----------
 .../shuffle/celeborn/SparkShuffleManager.java      | 31 ++++++++--------------
 .../org/apache/celeborn/client/ShuffleClient.java  |  5 +++-
 .../apache/celeborn/client/ShuffleClientImpl.java  | 14 +---------
 .../apache/celeborn/client/LifecycleManager.scala  |  8 +-----
 .../apache/celeborn/client/DummyShuffleClient.java |  2 +-
 7 files changed, 27 insertions(+), 58 deletions(-)

diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
index ad07aedb0..7bf39d5a2 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
@@ -100,7 +100,7 @@ public class RemoteShuffleMaster implements 
ShuffleMaster<RemoteShuffleDescripto
           () -> {
             try {
               for (Integer shuffleId : shuffleIds) {
-                lifecycleManager.handleUnregisterShuffle(shuffleId);
+                lifecycleManager.unregisterShuffle(shuffleId);
                 shuffleTaskInfo.removeExpiredShuffle(shuffleId);
               }
               shuffleResourceTracker.unRegisterJob(jobID);
diff --git 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index 4383fea88..3827e5891 100644
--- 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++ 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -100,13 +100,6 @@ public class SparkShuffleManager implements ShuffleManager 
{
       synchronized (this) {
         if (lifecycleManager == null) {
           lifecycleManager = new LifecycleManager(appId, celebornConf);
-          shuffleClient =
-              ShuffleClient.get(
-                  appUniqueId,
-                  lifecycleManager.getHost(),
-                  lifecycleManager.getPort(),
-                  celebornConf,
-                  lifecycleManager.getUserIdentifier());
         }
       }
     }
@@ -143,13 +136,15 @@ public class SparkShuffleManager implements 
ShuffleManager {
     if (sortShuffleIds.contains(shuffleId)) {
       return sortShuffleManager().unregisterShuffle(shuffleId);
     }
-    if (appUniqueId == null) {
-      return true;
+    // For Spark driver side trigger unregister shuffle.
+    if (lifecycleManager != null) {
+      lifecycleManager.unregisterShuffle(shuffleId);
     }
-    if (shuffleClient == null) {
-      return false;
+    // For Spark executor side cleanup shuffle related info.
+    if (shuffleClient != null) {
+      shuffleClient.cleanupShuffle(shuffleId);
     }
-    return shuffleClient.unregisterShuffle(shuffleId, isDriver);
+    return true;
   }
 
   @Override
@@ -159,9 +154,7 @@ public class SparkShuffleManager implements ShuffleManager {
 
   @Override
   public void stop() {
-    if (shuffleClient != null) {
-      shuffleClient.shutdown();
-    }
+    ShuffleClient.reset();
     if (lifecycleManager != null) {
       lifecycleManager.stop();
     }
diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index 79ac75df1..b876d48d6 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -108,13 +108,6 @@ public class SparkShuffleManager implements ShuffleManager 
{
       synchronized (this) {
         if (lifecycleManager == null) {
           lifecycleManager = new LifecycleManager(appUniqueId, celebornConf);
-          shuffleClient =
-              ShuffleClient.get(
-                  appUniqueId,
-                  lifecycleManager.getHost(),
-                  lifecycleManager.getPort(),
-                  celebornConf,
-                  lifecycleManager.getUserIdentifier());
         }
       }
     }
@@ -151,13 +144,15 @@ public class SparkShuffleManager implements 
ShuffleManager {
     if (sortShuffleIds.contains(shuffleId)) {
       return sortShuffleManager().unregisterShuffle(shuffleId);
     }
-    if (appUniqueId == null) {
-      return true;
+    // For Spark driver side trigger unregister shuffle.
+    if (lifecycleManager != null) {
+      lifecycleManager.unregisterShuffle(shuffleId);
     }
-    if (shuffleClient == null) {
-      return false;
+    // For Spark executor side cleanup shuffle related info.
+    if (shuffleClient != null) {
+      shuffleClient.cleanupShuffle(shuffleId);
     }
-    return shuffleClient.unregisterShuffle(shuffleId, isDriver);
+    return true;
   }
 
   @Override
@@ -167,11 +162,7 @@ public class SparkShuffleManager implements ShuffleManager 
{
 
   @Override
   public void stop() {
-    if (shuffleClient != null) {
-      shuffleClient.shutdown();
-      ShuffleClient.reset();
-      shuffleClient = null;
-    }
+    ShuffleClient.reset();
     if (lifecycleManager != null) {
       lifecycleManager.stop();
       lifecycleManager = null;
@@ -189,7 +180,7 @@ public class SparkShuffleManager implements ShuffleManager {
       if (handle instanceof CelebornShuffleHandle) {
         @SuppressWarnings("unchecked")
         CelebornShuffleHandle<K, V, ?> h = ((CelebornShuffleHandle<K, V, ?>) 
handle);
-        ShuffleClient client =
+        shuffleClient =
             ShuffleClient.get(
                 h.appUniqueId(),
                 h.lifecycleManagerHost(),
@@ -204,13 +195,13 @@ public class SparkShuffleManager implements 
ShuffleManager {
               h.numMappers(),
               context,
               celebornConf,
-              client,
+              shuffleClient,
               metrics,
               pushThread,
               SendBufferPool.get(cores));
         } else if (ShuffleMode.HASH.equals(celebornConf.shuffleWriterMode())) {
           return new HashBasedShuffleWriter<>(
-              h, context, celebornConf, client, metrics, 
SendBufferPool.get(cores));
+              h, context, celebornConf, shuffleClient, metrics, 
SendBufferPool.get(cores));
         } else {
           throw new UnsupportedOperationException(
               "Unrecognized shuffle write mode!" + 
celebornConf.shuffleWriterMode());
diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
index b77c82780..e1e0c1468 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
@@ -44,6 +44,9 @@ public abstract class ShuffleClient {
 
   // for testing
   public static void reset() {
+    if (_instance != null) {
+      _instance.shutdown();
+    }
     _instance = null;
     initialized = false;
     hdfsFs = null;
@@ -149,7 +152,7 @@ public abstract class ShuffleClient {
   public abstract CelebornInputStream readPartition(
       int shuffleId, int partitionId, int attemptNumber) throws IOException;
 
-  public abstract boolean unregisterShuffle(int shuffleId, boolean isDriver);
+  public abstract boolean cleanupShuffle(int shuffleId);
 
   public abstract void shutdown();
 
diff --git 
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index 00e95898e..7544f11b6 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -37,7 +37,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.celeborn.client.compress.Compressor;
 import org.apache.celeborn.client.read.CelebornInputStream;
 import org.apache.celeborn.common.CelebornConf;
-import org.apache.celeborn.common.client.MasterClient;
 import org.apache.celeborn.common.exception.CelebornIOException;
 import org.apache.celeborn.common.identity.UserIdentifier;
 import org.apache.celeborn.common.network.TransportContext;
@@ -1497,18 +1496,7 @@ public class ShuffleClientImpl extends ShuffleClient {
   }
 
   @Override
-  public boolean unregisterShuffle(int shuffleId, boolean isDriver) {
-    if (isDriver) {
-      try {
-        lifecycleManagerRef.send(
-            UnregisterShuffle$.MODULE$.apply(appUniqueId, shuffleId, 
MasterClient.genRequestId()));
-      } catch (Exception e) {
-        // If some exceptions need to be ignored, they shouldn't be logged as 
error-level,
-        // otherwise it will mislead users.
-        logger.error("Send UnregisterShuffle failed, ignore.", e);
-      }
-    }
-
+  public boolean cleanupShuffle(int shuffleId) {
     // clear status
     reducePartitionMap.remove(shuffleId);
     reduceFileGroupsMap.remove(shuffleId);
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 3b291c538..bc11519fd 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -197,11 +197,6 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
     case StageEnd(shuffleId) =>
       logInfo(s"Received StageEnd request, shuffleId $shuffleId.")
       handleStageEnd(shuffleId)
-    case pb: PbUnregisterShuffle =>
-      val shuffleId = pb.getShuffleId
-      logDebug(s"Received UnregisterShuffle request," +
-        s"shuffleId $shuffleId.")
-      handleUnregisterShuffle(shuffleId)
   }
 
   override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, 
Unit] = {
@@ -616,8 +611,7 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
     reply(mapperAttemptFinishedSuccess)
   }
 
-  def handleUnregisterShuffle(
-      shuffleId: Int): Unit = {
+  def unregisterShuffle(shuffleId: Int): Unit = {
     if (getPartitionType(shuffleId) == PartitionType.REDUCE) {
       // if StageEnd has not been handled, trigger StageEnd
       if (!commitManager.isStageEnd(shuffleId)) {
diff --git 
a/client/src/test/java/org/apache/celeborn/client/DummyShuffleClient.java 
b/client/src/test/java/org/apache/celeborn/client/DummyShuffleClient.java
index 2bd04da95..b593442e5 100644
--- a/client/src/test/java/org/apache/celeborn/client/DummyShuffleClient.java
+++ b/client/src/test/java/org/apache/celeborn/client/DummyShuffleClient.java
@@ -122,7 +122,7 @@ public class DummyShuffleClient extends ShuffleClient {
   }
 
   @Override
-  public boolean unregisterShuffle(int shuffleId, boolean isDriver) {
+  public boolean cleanupShuffle(int shuffleId) {
     return false;
   }
 

Reply via email to