This is an automated email from the ASF dual-hosted git repository.
angerszhuuuu pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new 3a674fef1 [CELEBORN-819] Worker close should pass close status to
support handle graceful shutdown and decommission
3a674fef1 is described below
commit 3a674fef1a64795955987c1e0a830d6e5aa37bb9
Author: Angerszhuuuu <[email protected]>
AuthorDate: Tue Jul 25 14:54:01 2023 +0800
[CELEBORN-819] Worker close should pass close status to support handle
graceful shutdown and decommission
### What changes were proposed in this pull request?
Pass exit kind to each component, if the exit kind match:
- GRACEFUL_SHUTDOWN: Behavior as origin code's graceful == true
- Others: will clean the level db file.
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #1748 from AngersZhuuuu/CELEBORN-819.
Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: Angerszhuuuu <[email protected]>
(cherry picked from commit 2ab88f773aa08f50a3bde1eec7885f77901c5e32)
Signed-off-by: Angerszhuuuu <[email protected]>
---
.../common/network/server/TransportServer.java | 9 +-
.../celeborn/common/util/CelebornExitKind.scala | 30 +----
.../celeborn/service/deploy/master/Master.scala | 4 +-
.../service/deploy/master/MasterSuite.scala | 3 +-
.../celeborn/server/common/HttpService.scala | 6 +-
.../apache/celeborn/server/common/Service.scala | 2 +-
.../celeborn/server/common/http/HttpServer.scala | 8 +-
.../worker/storage/PartitionFilesSorter.java | 28 +++--
.../celeborn/service/deploy/worker/Worker.scala | 139 ++++++++++++---------
.../deploy/worker/storage/StorageManager.scala | 25 ++--
.../worker/storage/PartitionFilesSorterSuiteJ.java | 5 +-
.../service/deploy/MiniClusterFeature.scala | 8 +-
.../deploy/worker/storage/WorkerSuite.scala | 4 +-
13 files changed, 143 insertions(+), 128 deletions(-)
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java
index f323786bf..d4214ddb7 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
import org.apache.celeborn.common.metrics.source.AbstractSource;
import org.apache.celeborn.common.network.TransportContext;
import org.apache.celeborn.common.network.util.*;
+import org.apache.celeborn.common.util.CelebornExitKind;
import org.apache.celeborn.common.util.JavaUtils;
/** Server for the efficient, low-level streaming service. */
@@ -130,24 +131,24 @@ public class TransportServer implements Closeable {
@Override
public void close() {
- shutdown(true);
+ shutdown(CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN());
}
- public void shutdown(boolean graceful) {
+ public void shutdown(int exitKind) {
if (channelFuture != null) {
// close is a local operation and should finish within milliseconds;
timeout just to be safe
channelFuture.channel().close().awaitUninterruptibly(10,
TimeUnit.SECONDS);
channelFuture = null;
}
if (bootstrap != null && bootstrap.config().group() != null) {
- if (graceful) {
+ if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN()) {
bootstrap.config().group().shutdownGracefully();
} else {
bootstrap.config().group().shutdownGracefully(0, 0, TimeUnit.SECONDS);
}
}
if (bootstrap != null && bootstrap.config().childGroup() != null) {
- if (graceful) {
+ if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN()) {
bootstrap.config().childGroup().shutdownGracefully();
} else {
bootstrap.config().childGroup().shutdownGracefully(0, 0,
TimeUnit.SECONDS);
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/Service.scala
b/common/src/main/scala/org/apache/celeborn/common/util/CelebornExitKind.scala
similarity index 58%
copy from service/src/main/scala/org/apache/celeborn/server/common/Service.scala
copy to
common/src/main/scala/org/apache/celeborn/common/util/CelebornExitKind.scala
index 10dad1c2d..41287c475 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/Service.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/util/CelebornExitKind.scala
@@ -15,30 +15,10 @@
* limitations under the License.
*/
-package org.apache.celeborn.server.common
+package org.apache.celeborn.common.util
-import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.internal.Logging
-import org.apache.celeborn.common.metrics.MetricsSystem
-
-abstract class Service extends Logging {
- def serviceName: String
-
- def conf: CelebornConf
-
- def metricsSystem: MetricsSystem
-
- def initialize(): Unit = {
- if (conf.metricsSystemEnable) {
- logInfo(s"Metrics system enabled.")
- metricsSystem.start()
- }
- }
-
- def stop(graceful: Boolean): Unit = {}
-}
-
-object Service {
- val MASTER = "master"
- val WORKER = "worker"
+private[celeborn] object CelebornExitKind {
+ val EXIT_IMMEDIATELY = 0
+ val WORKER_GRACEFUL_SHUTDOWN = 1
+ val WORKER_DECOMMISSION = 2
}
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 4eac2d07d..690fd30b8 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -894,11 +894,11 @@ private[celeborn] class Master(
rpcEnv.awaitTermination()
}
- override def stop(graceful: Boolean): Unit = synchronized {
+ override def stop(exitKind: Int): Unit = synchronized {
if (!stopped) {
logInfo("Stopping Master")
rpcEnv.stop(self)
- super.stop(false)
+ super.stop(exitKind)
logInfo("Master stopped.")
stopped = true
}
diff --git
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
index e3960e5d2..7b86a6cf3 100644
---
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
+++
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.funsuite.AnyFunSuite
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.util.CelebornExitKind
class MasterSuite extends AnyFunSuite
with BeforeAndAfterAll
@@ -53,7 +54,7 @@ class MasterSuite extends AnyFunSuite
}
}.start()
Thread.sleep(5000L)
- master.stop(false)
+ master.stop(CelebornExitKind.EXIT_IMMEDIATELY)
master.rpcEnv.shutdown()
}
}
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
index 49978ba2c..322469144 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
@@ -105,11 +105,11 @@ abstract class HttpService extends Service with Logging {
startHttpServer()
}
- override def stop(graceful: Boolean): Unit = {
+ override def stop(exitKind: Int): Unit = {
// may be null when running the unit test
if (null != httpServer) {
- httpServer.stop(graceful)
+ httpServer.stop(exitKind)
}
- super.stop(graceful)
+ super.stop(exitKind)
}
}
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/Service.scala
b/service/src/main/scala/org/apache/celeborn/server/common/Service.scala
index 10dad1c2d..0b1f40feb 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/Service.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/Service.scala
@@ -35,7 +35,7 @@ abstract class Service extends Logging {
}
}
- def stop(graceful: Boolean): Unit = {}
+ def stop(exitKind: Int): Unit = {}
}
object Service {
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala
index 8f4f43891..f84151333 100644
---
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala
+++
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala
@@ -27,7 +27,7 @@ import io.netty.handler.logging.{LoggingHandler, LogLevel}
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.network.util.{IOMode, NettyUtils}
-import org.apache.celeborn.common.util.Utils
+import org.apache.celeborn.common.util.{CelebornExitKind, Utils}
class HttpServer(
role: String,
@@ -56,7 +56,7 @@ class HttpServer(
isStarted = true
}
- def stop(graceful: Boolean): Unit = synchronized {
+ def stop(exitCode: Int): Unit = synchronized {
if (isStarted) {
logInfo(s"$role: Stopping HttpServer")
if (bindFuture != null) {
@@ -66,7 +66,7 @@ class HttpServer(
}
if (bootstrap != null && bootstrap.config.group != null) {
Utils.tryLogNonFatalError {
- if (graceful) {
+ if (exitCode == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
bootstrap.config.group.shutdownGracefully(3, 5, TimeUnit.SECONDS)
} else {
bootstrap.config.group.shutdownGracefully(0, 0, TimeUnit.SECONDS)
@@ -75,7 +75,7 @@ class HttpServer(
}
if (bootstrap != null && bootstrap.config.childGroup != null) {
Utils.tryLogNonFatalError {
- if (graceful) {
+ if (exitCode == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
bootstrap.config.childGroup.shutdownGracefully(3, 5,
TimeUnit.SECONDS)
} else {
bootstrap.config.childGroup.shutdownGracefully(0, 0,
TimeUnit.SECONDS)
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
index 016290513..51f7af3f0 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
@@ -239,10 +239,10 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
}
}
- public void close() {
+ public void close(int exitKind) {
logger.info("Closing {}", this.getClass().getSimpleName());
shutdown = true;
- if (gracefulShutdown) {
+ if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN()) {
long start = System.currentTimeMillis();
try {
fileSorterExecutors.shutdown();
@@ -254,21 +254,29 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
} catch (InterruptedException e) {
logger.error("Await partition sorter executor shutdown catch
exception: ", e);
}
+ if (sortedFilesDb != null) {
+ try {
+ updateSortedShuffleFilesInDB();
+ sortedFilesDb.close();
+ } catch (IOException e) {
+ logger.error("Store recover data to LevelDB failed.", e);
+ }
+ }
long end = System.currentTimeMillis();
logger.info("Await partition sorter executor complete cost " + (end -
start) + "ms");
} else {
fileSorterSchedulerThread.interrupt();
fileSorterExecutors.shutdownNow();
- }
- cachedIndexMaps.clear();
- if (sortedFilesDb != null) {
- try {
- updateSortedShuffleFilesInDB();
- sortedFilesDb.close();
- } catch (IOException e) {
- logger.error("Store recover data to LevelDB failed.", e);
+ if (sortedFilesDb != null) {
+ try {
+ sortedFilesDb.close();
+ recoverFile.delete();
+ } catch (IOException e) {
+ logger.error("Clean LevelDB failed.", e);
+ }
}
}
+ cachedIndexMaps.clear();
}
private void reloadAndCleanSortedShuffleFiles(DB db) {
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index ff0e76e4f..19706a397 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -42,7 +42,7 @@ import org.apache.celeborn.common.protocol.{PartitionType,
PbRegisterWorkerRespo
import org.apache.celeborn.common.protocol.message.ControlMessages._
import org.apache.celeborn.common.quota.ResourceConsumption
import org.apache.celeborn.common.rpc._
-import org.apache.celeborn.common.util.{JavaUtils, ShutdownHookManager,
ThreadUtils, Utils}
+import org.apache.celeborn.common.util.{CelebornExitKind, JavaUtils,
ShutdownHookManager, ThreadUtils, Utils}
// Can Remove this if celeborn don't support scala211 in future
import org.apache.celeborn.common.util.FunctionConverter._
import org.apache.celeborn.server.common.{HttpService, Service}
@@ -77,12 +77,14 @@ private[celeborn] class Worker(
private val WORKER_SHUTDOWN_PRIORITY = 100
val shutdown = new AtomicBoolean(false)
private val gracefulShutdown = conf.workerGracefulShutdown
+ private val exitKind = CelebornExitKind.EXIT_IMMEDIATELY
assert(
!gracefulShutdown || (gracefulShutdown &&
conf.workerRpcPort > 0 && conf.workerFetchPort > 0 &&
conf.workerPushPort > 0 && conf.workerReplicatePort > 0),
"If enable graceful shutdown, the worker should use stable server port.")
if (gracefulShutdown) {
+ exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN
try {
val recoverRoot = new File(conf.workerGracefulShutdownRecoverPath)
if (!recoverRoot.exists()) {
@@ -392,12 +394,12 @@ private[celeborn] class Worker(
rpcEnv.awaitTermination()
}
- override def stop(graceful: Boolean): Unit = {
+ override def stop(exitKind: Int): Unit = {
if (!stopped) {
logInfo("Stopping Worker.")
if (sendHeartbeatTask != null) {
- if (graceful) {
+ if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
sendHeartbeatTask.cancel(false)
} else {
sendHeartbeatTask.cancel(true)
@@ -405,45 +407,38 @@ private[celeborn] class Worker(
sendHeartbeatTask = null
}
if (checkFastFailTask != null) {
- if (graceful) {
+ if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
checkFastFailTask.cancel(false)
} else {
checkFastFailTask.cancel(true)
}
checkFastFailTask = null
}
- if (graceful) {
+ if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
forwardMessageScheduler.shutdown()
replicateThreadPool.shutdown()
commitThreadPool.shutdown()
asyncReplyPool.shutdown()
- partitionsSorter.close()
} else {
forwardMessageScheduler.shutdownNow()
replicateThreadPool.shutdownNow()
commitThreadPool.shutdownNow()
asyncReplyPool.shutdownNow()
- partitionsSorter.close()
- }
-
- if (null != storageManager) {
- storageManager.close()
}
+ partitionsSorter.close(exitKind)
+ storageManager.close(exitKind)
memoryManager.close()
masterClient.close()
- replicateServer.shutdown(graceful)
- fetchServer.shutdown(graceful)
- pushServer.shutdown(graceful)
+ replicateServer.shutdown(exitKind)
+ fetchServer.shutdown(exitKind)
+ pushServer.shutdown(exitKind)
- super.stop(graceful)
+ super.stop(exitKind)
logInfo("Worker is stopped.")
stopped = true
}
- if (!graceful) {
- shutdown.set(true)
- }
}
private def registerWithMaster(): Unit = {
@@ -566,55 +561,77 @@ private[celeborn] class Worker(
sb.toString()
}
+ def shutdownGracefully(): Unit = {
+ // During shutdown, to avoid allocate slots in this worker,
+ // add this worker to master's excluded list. When restart, register
worker will
+ // make master remove this worker from excluded list.
+ try {
+ masterClient.askSync(
+ ReportWorkerUnavailable(List(workerInfo).asJava),
+ OneWayMessageResponse.getClass)
+ } catch {
+ case e: Throwable =>
+ logError(
+ s"Fail report to master, need wait PartitionLocation auto release:
\n$partitionLocationInfo",
+ e)
+ }
+ shutdown.set(true)
+ val interval = conf.workerGracefulShutdownCheckSlotsFinishedInterval
+ val timeout = conf.workerGracefulShutdownCheckSlotsFinishedTimeoutMs
+ var waitTimes = 0
+
+ def waitTime: Long = waitTimes * interval
+
+ while (!partitionLocationInfo.isEmpty && waitTime < timeout) {
+ Thread.sleep(interval)
+ waitTimes += 1
+ }
+ if (partitionLocationInfo.isEmpty) {
+ logInfo(s"Waiting for all PartitionLocation released cost
${waitTime}ms.")
+ } else {
+ logWarning(s"Waiting for all PartitionLocation release cost
${waitTime}ms, " +
+ s"unreleased PartitionLocation: \n$partitionLocationInfo")
+ }
+ stop(CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN)
+ }
+
+ def exitImmediately(): Unit = {
+ // During shutdown, to avoid allocate slots in this worker,
+ // add this worker to master's excluded list. When restart, register
worker will
+ // make master remove this worker from excluded list.
+ try {
+ masterClient.askSync[PbWorkerLostResponse](
+ WorkerLost(
+ host,
+ rpcPort,
+ pushPort,
+ fetchPort,
+ replicatePort,
+ MasterClient.genRequestId()),
+ classOf[PbWorkerLostResponse])
+ } catch {
+ case e: Throwable =>
+ logError(
+ s"Fail report to master, need wait PartitionLocation auto release:
\n$partitionLocationInfo",
+ e)
+ }
+ shutdown.set(true)
+ stop(CelebornExitKind.EXIT_IMMEDIATELY)
+ }
+
ShutdownHookManager.get().addShutdownHook(
new Thread(new Runnable {
override def run(): Unit = {
- logInfo("Shutdown hook called.")
- // During shutdown, to avoid allocate slots in this worker,
- // add this worker to master's excluded list. When restart, register
worker will
- // make master remove this worker from excluded list.
- try {
- if (gracefulShutdown) {
- masterClient.askSync(
- ReportWorkerUnavailable(List(workerInfo).asJava),
- OneWayMessageResponse.getClass)
- } else {
- masterClient.askSync[PbWorkerLostResponse](
- WorkerLost(
- host,
- rpcPort,
- pushPort,
- fetchPort,
- replicatePort,
- MasterClient.genRequestId()),
- classOf[PbWorkerLostResponse])
- }
- } catch {
- case e: Throwable =>
- logError(
- s"Fail report to master, need wait PartitionLocation auto
release: \n$partitionLocationInfo",
- e)
- }
- shutdown.set(true)
- if (gracefulShutdown) {
- val interval = conf.workerGracefulShutdownCheckSlotsFinishedInterval
- val timeout = conf.workerGracefulShutdownCheckSlotsFinishedTimeoutMs
- var waitTimes = 0
-
- def waitTime: Long = waitTimes * interval
-
- while (!partitionLocationInfo.isEmpty && waitTime < timeout) {
- Thread.sleep(interval)
- waitTimes += 1
- }
- if (partitionLocationInfo.isEmpty) {
- logInfo(s"Waiting for all PartitionLocation released cost
${waitTime}ms.")
+ if (stopped) {
+ logInfo("Worker already stopped before call ShutdownHook.")
+ } else {
+ logInfo("Shutdown hook called.")
+ if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
+ shutdownGracefully()
} else {
- logWarning(s"Waiting for all PartitionLocation release cost
${waitTime}ms, " +
- s"unreleased PartitionLocation: \n$partitionLocationInfo")
+ exitImmediately()
}
}
- stop(gracefulShutdown)
}
}),
WORKER_SHUTDOWN_PRIORITY)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 60991f37d..e180a785f 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -42,7 +42,7 @@ import
org.apache.celeborn.common.metrics.source.AbstractSource
import org.apache.celeborn.common.network.util.{NettyUtils, TransportConf}
import org.apache.celeborn.common.protocol.{PartitionLocation,
PartitionSplitMode, PartitionType}
import org.apache.celeborn.common.quota.ResourceConsumption
-import org.apache.celeborn.common.util.{CelebornHadoopUtils, JavaUtils,
PbSerDeUtils, ThreadUtils, Utils}
+import org.apache.celeborn.common.util.{CelebornExitKind, CelebornHadoopUtils,
JavaUtils, PbSerDeUtils, ThreadUtils, Utils}
import org.apache.celeborn.service.deploy.worker._
import
org.apache.celeborn.service.deploy.worker.memory.MemoryManager.MemoryPressureListener
import
org.apache.celeborn.service.deploy.worker.storage.StorageManager.hadoopFs
@@ -603,18 +603,25 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
false
}
- def close(): Unit = {
+ def close(exitKind: Int): Unit = {
if (db != null) {
- try {
- updateFileInfosInDB()
- db.close()
- } catch {
- case exception: Exception =>
- logError("Store recover data to LevelDB failed.", exception)
+ if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
+ try {
+ updateFileInfosInDB()
+ db.close()
+ } catch {
+ case exception: Exception =>
+ logError("Store recover data to LevelDB failed.", exception)
+ }
+ } else {
+ if (db != null) {
+ db.close()
+ new File(conf.workerGracefulShutdownRecoverPath,
RECOVERY_FILE_NAME).delete()
+ }
}
}
if (null != diskOperators) {
- if (!conf.workerGracefulShutdown) {
+ if (exitKind != CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
cleanupExpiredShuffleKey(shuffleKeySet())
}
ThreadUtils.parmap(
diff --git
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorterSuiteJ.java
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorterSuiteJ.java
index be73f6c2e..ab480065f 100644
---
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorterSuiteJ.java
+++
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorterSuiteJ.java
@@ -39,6 +39,7 @@ import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.meta.FileInfo;
import org.apache.celeborn.common.unsafe.Platform;
+import org.apache.celeborn.common.util.CelebornExitKind;
import org.apache.celeborn.common.util.Utils;
import org.apache.celeborn.service.deploy.worker.WorkerSource;
import org.apache.celeborn.service.deploy.worker.memory.MemoryManager;
@@ -184,7 +185,7 @@ public class PartitionFilesSorterSuiteJ {
partitionFilesSorter.initSortedShuffleFiles("application-3-1");
partitionFilesSorter.updateSortedShuffleFiles("application-3-1", "0-0-1",
0);
partitionFilesSorter.deleteSortedShuffleFiles("application-2-1");
- partitionFilesSorter.close();
+ partitionFilesSorter.close(CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN());
PartitionFilesSorter partitionFilesSorter2 =
new PartitionFilesSorter(MemoryManager.instance(), conf, new
WorkerSource(conf));
Assert.assertEquals(
@@ -193,7 +194,7 @@ public class PartitionFilesSorterSuiteJ {
Assert.assertEquals(partitionFilesSorter2.getSortedShuffleFiles("application-2-1"),
null);
Assert.assertEquals(
partitionFilesSorter2.getSortedShuffleFiles("application-3-1").toString(),
"[0-0-1]");
- partitionFilesSorter2.close();
+ partitionFilesSorter2.close(CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN());
recoverPath.delete();
}
}
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
index 03fc957ec..1514c7647 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
@@ -24,7 +24,7 @@ import scala.collection.mutable
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
-import org.apache.celeborn.common.util.Utils
+import org.apache.celeborn.common.util.{CelebornExitKind, Utils}
import org.apache.celeborn.service.deploy.master.{Master, MasterArguments}
import org.apache.celeborn.service.deploy.worker.{Worker, WorkerArguments}
import org.apache.celeborn.service.deploy.worker.memory.MemoryManager
@@ -119,19 +119,19 @@ trait MiniClusterFeature extends Logging {
// shutdown workers
workerInfos.foreach {
case (worker, _) =>
- worker.stop(false)
+ worker.stop(CelebornExitKind.EXIT_IMMEDIATELY)
worker.rpcEnv.shutdown()
}
// shutdown masters
- masterInfo._1.stop(false)
+ masterInfo._1.stop(CelebornExitKind.EXIT_IMMEDIATELY)
masterInfo._1.rpcEnv.shutdown()
// interrupt threads
Thread.sleep(5000)
workerInfos.foreach {
case (worker, thread) =>
- worker.stop(false)
+ worker.stop(CelebornExitKind.EXIT_IMMEDIATELY)
thread.interrupt()
}
workerInfos.clear()
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
index 5db152548..1fde28f0b 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
@@ -31,7 +31,7 @@ import org.scalatest.funsuite.AnyFunSuite
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.protocol.{PartitionLocation,
PartitionSplitMode, PartitionType}
-import org.apache.celeborn.common.util.JavaUtils
+import org.apache.celeborn.common.util.{CelebornExitKind, JavaUtils}
import org.apache.celeborn.service.deploy.worker.{Worker, WorkerArguments}
class WorkerSuite extends AnyFunSuite with BeforeAndAfterEach {
@@ -46,7 +46,7 @@ class WorkerSuite extends AnyFunSuite with BeforeAndAfterEach
{
override def afterEach(): Unit = {
if (null != worker) {
worker.rpcEnv.shutdown()
- worker.stop(false)
+ worker.stop(CelebornExitKind.EXIT_IMMEDIATELY)
worker = null
}
}