This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 5471a6afe [CELEBORN-804] ShuffleClient should cleanup shuffle infos
when trigger unregisterShuffle
5471a6afe is described below
commit 5471a6afe5fd443956c44a51dfc63903411ca35e
Author: Angerszhuuuu <[email protected]>
AuthorDate: Wed Jul 19 20:50:18 2023 +0800
[CELEBORN-804] ShuffleClient should cleanup shuffle infos when trigger
unregisterShuffle
### What changes were proposed in this pull request?
After discussion, we make sure that `shuffleManager.unregisterShuffle()`
will be triggered by Spark both in driver and executor. In this pr:
1. Add shuffle client both in driver and executor side in ShuffleManager
2. ShuffleClient call cleanupShuffle() when trigger `unregisterShuffle`.
This replaced https://github.com/apache/incubator-celeborn/pull/1719
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #1726 from AngersZhuuuu/CELEBORN-804.
Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../celeborn/plugin/flink/RemoteShuffleMaster.java | 2 +-
.../shuffle/celeborn/SparkShuffleManager.java | 23 ++++++----------
.../shuffle/celeborn/SparkShuffleManager.java | 31 ++++++++--------------
.../org/apache/celeborn/client/ShuffleClient.java | 5 +++-
.../apache/celeborn/client/ShuffleClientImpl.java | 14 +---------
.../apache/celeborn/client/LifecycleManager.scala | 8 +-----
.../apache/celeborn/client/DummyShuffleClient.java | 2 +-
7 files changed, 27 insertions(+), 58 deletions(-)
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
index ad07aedb0..7bf39d5a2 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
@@ -100,7 +100,7 @@ public class RemoteShuffleMaster implements
ShuffleMaster<RemoteShuffleDescripto
() -> {
try {
for (Integer shuffleId : shuffleIds) {
- lifecycleManager.handleUnregisterShuffle(shuffleId);
+ lifecycleManager.unregisterShuffle(shuffleId);
shuffleTaskInfo.removeExpiredShuffle(shuffleId);
}
shuffleResourceTracker.unRegisterJob(jobID);
diff --git
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index 4383fea88..3827e5891 100644
---
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -100,13 +100,6 @@ public class SparkShuffleManager implements ShuffleManager
{
synchronized (this) {
if (lifecycleManager == null) {
lifecycleManager = new LifecycleManager(appId, celebornConf);
- shuffleClient =
- ShuffleClient.get(
- appUniqueId,
- lifecycleManager.getHost(),
- lifecycleManager.getPort(),
- celebornConf,
- lifecycleManager.getUserIdentifier());
}
}
}
@@ -143,13 +136,15 @@ public class SparkShuffleManager implements
ShuffleManager {
if (sortShuffleIds.contains(shuffleId)) {
return sortShuffleManager().unregisterShuffle(shuffleId);
}
- if (appUniqueId == null) {
- return true;
+ // For Spark driver side trigger unregister shuffle.
+ if (lifecycleManager != null) {
+ lifecycleManager.unregisterShuffle(shuffleId);
}
- if (shuffleClient == null) {
- return false;
+ // For Spark executor side cleanup shuffle related info.
+ if (shuffleClient != null) {
+ shuffleClient.cleanupShuffle(shuffleId);
}
- return shuffleClient.unregisterShuffle(shuffleId, isDriver);
+ return true;
}
@Override
@@ -159,9 +154,7 @@ public class SparkShuffleManager implements ShuffleManager {
@Override
public void stop() {
- if (shuffleClient != null) {
- shuffleClient.shutdown();
- }
+ ShuffleClient.reset();
if (lifecycleManager != null) {
lifecycleManager.stop();
}
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index 79ac75df1..b876d48d6 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -108,13 +108,6 @@ public class SparkShuffleManager implements ShuffleManager
{
synchronized (this) {
if (lifecycleManager == null) {
lifecycleManager = new LifecycleManager(appUniqueId, celebornConf);
- shuffleClient =
- ShuffleClient.get(
- appUniqueId,
- lifecycleManager.getHost(),
- lifecycleManager.getPort(),
- celebornConf,
- lifecycleManager.getUserIdentifier());
}
}
}
@@ -151,13 +144,15 @@ public class SparkShuffleManager implements
ShuffleManager {
if (sortShuffleIds.contains(shuffleId)) {
return sortShuffleManager().unregisterShuffle(shuffleId);
}
- if (appUniqueId == null) {
- return true;
+ // For Spark driver side trigger unregister shuffle.
+ if (lifecycleManager != null) {
+ lifecycleManager.unregisterShuffle(shuffleId);
}
- if (shuffleClient == null) {
- return false;
+ // For Spark executor side cleanup shuffle related info.
+ if (shuffleClient != null) {
+ shuffleClient.cleanupShuffle(shuffleId);
}
- return shuffleClient.unregisterShuffle(shuffleId, isDriver);
+ return true;
}
@Override
@@ -167,11 +162,7 @@ public class SparkShuffleManager implements ShuffleManager
{
@Override
public void stop() {
- if (shuffleClient != null) {
- shuffleClient.shutdown();
- ShuffleClient.reset();
- shuffleClient = null;
- }
+ ShuffleClient.reset();
if (lifecycleManager != null) {
lifecycleManager.stop();
lifecycleManager = null;
@@ -189,7 +180,7 @@ public class SparkShuffleManager implements ShuffleManager {
if (handle instanceof CelebornShuffleHandle) {
@SuppressWarnings("unchecked")
CelebornShuffleHandle<K, V, ?> h = ((CelebornShuffleHandle<K, V, ?>)
handle);
- ShuffleClient client =
+ shuffleClient =
ShuffleClient.get(
h.appUniqueId(),
h.lifecycleManagerHost(),
@@ -204,13 +195,13 @@ public class SparkShuffleManager implements
ShuffleManager {
h.numMappers(),
context,
celebornConf,
- client,
+ shuffleClient,
metrics,
pushThread,
SendBufferPool.get(cores));
} else if (ShuffleMode.HASH.equals(celebornConf.shuffleWriterMode())) {
return new HashBasedShuffleWriter<>(
- h, context, celebornConf, client, metrics,
SendBufferPool.get(cores));
+ h, context, celebornConf, shuffleClient, metrics,
SendBufferPool.get(cores));
} else {
throw new UnsupportedOperationException(
"Unrecognized shuffle write mode!" +
celebornConf.shuffleWriterMode());
diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
index b77c82780..e1e0c1468 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
@@ -44,6 +44,9 @@ public abstract class ShuffleClient {
// for testing
public static void reset() {
+ if (_instance != null) {
+ _instance.shutdown();
+ }
_instance = null;
initialized = false;
hdfsFs = null;
@@ -149,7 +152,7 @@ public abstract class ShuffleClient {
public abstract CelebornInputStream readPartition(
int shuffleId, int partitionId, int attemptNumber) throws IOException;
- public abstract boolean unregisterShuffle(int shuffleId, boolean isDriver);
+ public abstract boolean cleanupShuffle(int shuffleId);
public abstract void shutdown();
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index 00e95898e..7544f11b6 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -37,7 +37,6 @@ import org.slf4j.LoggerFactory;
import org.apache.celeborn.client.compress.Compressor;
import org.apache.celeborn.client.read.CelebornInputStream;
import org.apache.celeborn.common.CelebornConf;
-import org.apache.celeborn.common.client.MasterClient;
import org.apache.celeborn.common.exception.CelebornIOException;
import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.network.TransportContext;
@@ -1497,18 +1496,7 @@ public class ShuffleClientImpl extends ShuffleClient {
}
@Override
- public boolean unregisterShuffle(int shuffleId, boolean isDriver) {
- if (isDriver) {
- try {
- lifecycleManagerRef.send(
- UnregisterShuffle$.MODULE$.apply(appUniqueId, shuffleId,
MasterClient.genRequestId()));
- } catch (Exception e) {
- // If some exceptions need to be ignored, they shouldn't be logged as
error-level,
- // otherwise it will mislead users.
- logger.error("Send UnregisterShuffle failed, ignore.", e);
- }
- }
-
+ public boolean cleanupShuffle(int shuffleId) {
// clear status
reducePartitionMap.remove(shuffleId);
reduceFileGroupsMap.remove(shuffleId);
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 3b291c538..bc11519fd 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -197,11 +197,6 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
case StageEnd(shuffleId) =>
logInfo(s"Received StageEnd request, shuffleId $shuffleId.")
handleStageEnd(shuffleId)
- case pb: PbUnregisterShuffle =>
- val shuffleId = pb.getShuffleId
- logDebug(s"Received UnregisterShuffle request," +
- s"shuffleId $shuffleId.")
- handleUnregisterShuffle(shuffleId)
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any,
Unit] = {
@@ -616,8 +611,7 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
reply(mapperAttemptFinishedSuccess)
}
- def handleUnregisterShuffle(
- shuffleId: Int): Unit = {
+ def unregisterShuffle(shuffleId: Int): Unit = {
if (getPartitionType(shuffleId) == PartitionType.REDUCE) {
// if StageEnd has not been handled, trigger StageEnd
if (!commitManager.isStageEnd(shuffleId)) {
diff --git
a/client/src/test/java/org/apache/celeborn/client/DummyShuffleClient.java
b/client/src/test/java/org/apache/celeborn/client/DummyShuffleClient.java
index 2bd04da95..b593442e5 100644
--- a/client/src/test/java/org/apache/celeborn/client/DummyShuffleClient.java
+++ b/client/src/test/java/org/apache/celeborn/client/DummyShuffleClient.java
@@ -122,7 +122,7 @@ public class DummyShuffleClient extends ShuffleClient {
}
@Override
- public boolean unregisterShuffle(int shuffleId, boolean isDriver) {
+ public boolean cleanupShuffle(int shuffleId) {
return false;
}