This is an automated email from the ASF dual-hosted git repository.

angerszhuuuu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 2ab88f773 [CELEBORN-819] Worker close should pass close status to 
support handle graceful shutdown and decommission
2ab88f773 is described below

commit 2ab88f773aa08f50a3bde1eec7885f77901c5e32
Author: Angerszhuuuu <[email protected]>
AuthorDate: Tue Jul 25 14:54:01 2023 +0800

    [CELEBORN-819] Worker close should pass close status to support handle 
graceful shutdown and decommission
    
    ### What changes were proposed in this pull request?
    Pass exit kind to each component, if the exit kind match:
    
    - GRACEFUL_SHUTDOWN: Behavior as origin code's graceful == true
    - Others: will clean the level db file.
    
    ### Why are the changes needed?
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #1748 from AngersZhuuuu/CELEBORN-819.
    
    Authored-by: Angerszhuuuu <[email protected]>
    Signed-off-by: Angerszhuuuu <[email protected]>
---
 .../common/network/server/TransportServer.java     |   9 +-
 .../celeborn/common/util/CelebornExitKind.scala    |  30 +----
 .../celeborn/service/deploy/master/Master.scala    |   4 +-
 .../service/deploy/master/MasterSuite.scala        |   3 +-
 .../celeborn/server/common/HttpService.scala       |   6 +-
 .../apache/celeborn/server/common/Service.scala    |   2 +-
 .../celeborn/server/common/http/HttpServer.scala   |   8 +-
 .../worker/storage/PartitionFilesSorter.java       |  28 +++--
 .../celeborn/service/deploy/worker/Worker.scala    | 139 ++++++++++++---------
 .../deploy/worker/storage/StorageManager.scala     |  25 ++--
 .../worker/storage/PartitionFilesSorterSuiteJ.java |   5 +-
 .../service/deploy/MiniClusterFeature.scala        |   8 +-
 .../deploy/worker/storage/WorkerSuite.scala        |   4 +-
 13 files changed, 143 insertions(+), 128 deletions(-)

diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java
 
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java
index f323786bf..d4214ddb7 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.celeborn.common.metrics.source.AbstractSource;
 import org.apache.celeborn.common.network.TransportContext;
 import org.apache.celeborn.common.network.util.*;
+import org.apache.celeborn.common.util.CelebornExitKind;
 import org.apache.celeborn.common.util.JavaUtils;
 
 /** Server for the efficient, low-level streaming service. */
@@ -130,24 +131,24 @@ public class TransportServer implements Closeable {
 
   @Override
   public void close() {
-    shutdown(true);
+    shutdown(CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN());
   }
 
-  public void shutdown(boolean graceful) {
+  public void shutdown(int exitKind) {
     if (channelFuture != null) {
       // close is a local operation and should finish within milliseconds; 
timeout just to be safe
       channelFuture.channel().close().awaitUninterruptibly(10, 
TimeUnit.SECONDS);
       channelFuture = null;
     }
     if (bootstrap != null && bootstrap.config().group() != null) {
-      if (graceful) {
+      if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN()) {
         bootstrap.config().group().shutdownGracefully();
       } else {
         bootstrap.config().group().shutdownGracefully(0, 0, TimeUnit.SECONDS);
       }
     }
     if (bootstrap != null && bootstrap.config().childGroup() != null) {
-      if (graceful) {
+      if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN()) {
         bootstrap.config().childGroup().shutdownGracefully();
       } else {
         bootstrap.config().childGroup().shutdownGracefully(0, 0, 
TimeUnit.SECONDS);
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/Service.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/CelebornExitKind.scala
similarity index 58%
copy from service/src/main/scala/org/apache/celeborn/server/common/Service.scala
copy to 
common/src/main/scala/org/apache/celeborn/common/util/CelebornExitKind.scala
index 10dad1c2d..41287c475 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/Service.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/util/CelebornExitKind.scala
@@ -15,30 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.celeborn.server.common
+package org.apache.celeborn.common.util
 
-import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.internal.Logging
-import org.apache.celeborn.common.metrics.MetricsSystem
-
-abstract class Service extends Logging {
-  def serviceName: String
-
-  def conf: CelebornConf
-
-  def metricsSystem: MetricsSystem
-
-  def initialize(): Unit = {
-    if (conf.metricsSystemEnable) {
-      logInfo(s"Metrics system enabled.")
-      metricsSystem.start()
-    }
-  }
-
-  def stop(graceful: Boolean): Unit = {}
-}
-
-object Service {
-  val MASTER = "master"
-  val WORKER = "worker"
+private[celeborn] object CelebornExitKind {
+  val EXIT_IMMEDIATELY = 0
+  val WORKER_GRACEFUL_SHUTDOWN = 1
+  val WORKER_DECOMMISSION = 2
 }
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 4207dfc74..828889820 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -882,11 +882,11 @@ private[celeborn] class Master(
     rpcEnv.awaitTermination()
   }
 
-  override def stop(graceful: Boolean): Unit = synchronized {
+  override def stop(exitKind: Int): Unit = synchronized {
     if (!stopped) {
       logInfo("Stopping Master")
       rpcEnv.stop(self)
-      super.stop(false)
+      super.stop(exitKind)
       logInfo("Master stopped.")
       stopped = true
     }
diff --git 
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
index e3960e5d2..7b86a6cf3 100644
--- 
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
+++ 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.funsuite.AnyFunSuite
 
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.util.CelebornExitKind
 
 class MasterSuite extends AnyFunSuite
   with BeforeAndAfterAll
@@ -53,7 +54,7 @@ class MasterSuite extends AnyFunSuite
       }
     }.start()
     Thread.sleep(5000L)
-    master.stop(false)
+    master.stop(CelebornExitKind.EXIT_IMMEDIATELY)
     master.rpcEnv.shutdown()
   }
 }
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala 
b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
index 49978ba2c..322469144 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
@@ -105,11 +105,11 @@ abstract class HttpService extends Service with Logging {
     startHttpServer()
   }
 
-  override def stop(graceful: Boolean): Unit = {
+  override def stop(exitKind: Int): Unit = {
     // may be null when running the unit test
     if (null != httpServer) {
-      httpServer.stop(graceful)
+      httpServer.stop(exitKind)
     }
-    super.stop(graceful)
+    super.stop(exitKind)
   }
 }
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/Service.scala 
b/service/src/main/scala/org/apache/celeborn/server/common/Service.scala
index 10dad1c2d..0b1f40feb 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/Service.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/Service.scala
@@ -35,7 +35,7 @@ abstract class Service extends Logging {
     }
   }
 
-  def stop(graceful: Boolean): Unit = {}
+  def stop(exitKind: Int): Unit = {}
 }
 
 object Service {
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala
 
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala
index 8f4f43891..f84151333 100644
--- 
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala
+++ 
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala
@@ -27,7 +27,7 @@ import io.netty.handler.logging.{LoggingHandler, LogLevel}
 
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.network.util.{IOMode, NettyUtils}
-import org.apache.celeborn.common.util.Utils
+import org.apache.celeborn.common.util.{CelebornExitKind, Utils}
 
 class HttpServer(
     role: String,
@@ -56,7 +56,7 @@ class HttpServer(
     isStarted = true
   }
 
-  def stop(graceful: Boolean): Unit = synchronized {
+  def stop(exitCode: Int): Unit = synchronized {
     if (isStarted) {
       logInfo(s"$role: Stopping HttpServer")
       if (bindFuture != null) {
@@ -66,7 +66,7 @@ class HttpServer(
       }
       if (bootstrap != null && bootstrap.config.group != null) {
         Utils.tryLogNonFatalError {
-          if (graceful) {
+          if (exitCode == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
             bootstrap.config.group.shutdownGracefully(3, 5, TimeUnit.SECONDS)
           } else {
             bootstrap.config.group.shutdownGracefully(0, 0, TimeUnit.SECONDS)
@@ -75,7 +75,7 @@ class HttpServer(
       }
       if (bootstrap != null && bootstrap.config.childGroup != null) {
         Utils.tryLogNonFatalError {
-          if (graceful) {
+          if (exitCode == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
             bootstrap.config.childGroup.shutdownGracefully(3, 5, 
TimeUnit.SECONDS)
           } else {
             bootstrap.config.childGroup.shutdownGracefully(0, 0, 
TimeUnit.SECONDS)
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
index 016290513..51f7af3f0 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
@@ -239,10 +239,10 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
     }
   }
 
-  public void close() {
+  public void close(int exitKind) {
     logger.info("Closing {}", this.getClass().getSimpleName());
     shutdown = true;
-    if (gracefulShutdown) {
+    if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN()) {
       long start = System.currentTimeMillis();
       try {
         fileSorterExecutors.shutdown();
@@ -254,21 +254,29 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
       } catch (InterruptedException e) {
         logger.error("Await partition sorter executor shutdown catch 
exception: ", e);
       }
+      if (sortedFilesDb != null) {
+        try {
+          updateSortedShuffleFilesInDB();
+          sortedFilesDb.close();
+        } catch (IOException e) {
+          logger.error("Store recover data to LevelDB failed.", e);
+        }
+      }
       long end = System.currentTimeMillis();
       logger.info("Await partition sorter executor complete cost " + (end - 
start) + "ms");
     } else {
       fileSorterSchedulerThread.interrupt();
       fileSorterExecutors.shutdownNow();
-    }
-    cachedIndexMaps.clear();
-    if (sortedFilesDb != null) {
-      try {
-        updateSortedShuffleFilesInDB();
-        sortedFilesDb.close();
-      } catch (IOException e) {
-        logger.error("Store recover data to LevelDB failed.", e);
+      if (sortedFilesDb != null) {
+        try {
+          sortedFilesDb.close();
+          recoverFile.delete();
+        } catch (IOException e) {
+          logger.error("Clean LevelDB failed.", e);
+        }
       }
     }
+    cachedIndexMaps.clear();
   }
 
   private void reloadAndCleanSortedShuffleFiles(DB db) {
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index ff0e76e4f..19706a397 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -42,7 +42,7 @@ import org.apache.celeborn.common.protocol.{PartitionType, 
PbRegisterWorkerRespo
 import org.apache.celeborn.common.protocol.message.ControlMessages._
 import org.apache.celeborn.common.quota.ResourceConsumption
 import org.apache.celeborn.common.rpc._
-import org.apache.celeborn.common.util.{JavaUtils, ShutdownHookManager, 
ThreadUtils, Utils}
+import org.apache.celeborn.common.util.{CelebornExitKind, JavaUtils, 
ShutdownHookManager, ThreadUtils, Utils}
 // Can Remove this if celeborn don't support scala211 in future
 import org.apache.celeborn.common.util.FunctionConverter._
 import org.apache.celeborn.server.common.{HttpService, Service}
@@ -77,12 +77,14 @@ private[celeborn] class Worker(
   private val WORKER_SHUTDOWN_PRIORITY = 100
   val shutdown = new AtomicBoolean(false)
   private val gracefulShutdown = conf.workerGracefulShutdown
+  private val exitKind = CelebornExitKind.EXIT_IMMEDIATELY
   assert(
     !gracefulShutdown || (gracefulShutdown &&
       conf.workerRpcPort > 0 && conf.workerFetchPort > 0 &&
       conf.workerPushPort > 0 && conf.workerReplicatePort > 0),
     "If enable graceful shutdown, the worker should use stable server port.")
   if (gracefulShutdown) {
+    exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN
     try {
       val recoverRoot = new File(conf.workerGracefulShutdownRecoverPath)
       if (!recoverRoot.exists()) {
@@ -392,12 +394,12 @@ private[celeborn] class Worker(
     rpcEnv.awaitTermination()
   }
 
-  override def stop(graceful: Boolean): Unit = {
+  override def stop(exitKind: Int): Unit = {
     if (!stopped) {
       logInfo("Stopping Worker.")
 
       if (sendHeartbeatTask != null) {
-        if (graceful) {
+        if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
           sendHeartbeatTask.cancel(false)
         } else {
           sendHeartbeatTask.cancel(true)
@@ -405,45 +407,38 @@ private[celeborn] class Worker(
         sendHeartbeatTask = null
       }
       if (checkFastFailTask != null) {
-        if (graceful) {
+        if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
           checkFastFailTask.cancel(false)
         } else {
           checkFastFailTask.cancel(true)
         }
         checkFastFailTask = null
       }
-      if (graceful) {
+      if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
         forwardMessageScheduler.shutdown()
         replicateThreadPool.shutdown()
         commitThreadPool.shutdown()
         asyncReplyPool.shutdown()
-        partitionsSorter.close()
       } else {
         forwardMessageScheduler.shutdownNow()
         replicateThreadPool.shutdownNow()
         commitThreadPool.shutdownNow()
         asyncReplyPool.shutdownNow()
-        partitionsSorter.close()
-      }
-
-      if (null != storageManager) {
-        storageManager.close()
       }
+      partitionsSorter.close(exitKind)
+      storageManager.close(exitKind)
       memoryManager.close()
 
       masterClient.close()
-      replicateServer.shutdown(graceful)
-      fetchServer.shutdown(graceful)
-      pushServer.shutdown(graceful)
+      replicateServer.shutdown(exitKind)
+      fetchServer.shutdown(exitKind)
+      pushServer.shutdown(exitKind)
 
-      super.stop(graceful)
+      super.stop(exitKind)
 
       logInfo("Worker is stopped.")
       stopped = true
     }
-    if (!graceful) {
-      shutdown.set(true)
-    }
   }
 
   private def registerWithMaster(): Unit = {
@@ -566,55 +561,77 @@ private[celeborn] class Worker(
     sb.toString()
   }
 
+  def shutdownGracefully(): Unit = {
+    // During shutdown, to avoid allocate slots in this worker,
+    // add this worker to master's excluded list. When restart, register 
worker will
+    // make master remove this worker from excluded list.
+    try {
+      masterClient.askSync(
+        ReportWorkerUnavailable(List(workerInfo).asJava),
+        OneWayMessageResponse.getClass)
+    } catch {
+      case e: Throwable =>
+        logError(
+          s"Fail report to master, need wait PartitionLocation auto release: 
\n$partitionLocationInfo",
+          e)
+    }
+    shutdown.set(true)
+    val interval = conf.workerGracefulShutdownCheckSlotsFinishedInterval
+    val timeout = conf.workerGracefulShutdownCheckSlotsFinishedTimeoutMs
+    var waitTimes = 0
+
+    def waitTime: Long = waitTimes * interval
+
+    while (!partitionLocationInfo.isEmpty && waitTime < timeout) {
+      Thread.sleep(interval)
+      waitTimes += 1
+    }
+    if (partitionLocationInfo.isEmpty) {
+      logInfo(s"Waiting for all PartitionLocation released cost 
${waitTime}ms.")
+    } else {
+      logWarning(s"Waiting for all PartitionLocation release cost 
${waitTime}ms, " +
+        s"unreleased PartitionLocation: \n$partitionLocationInfo")
+    }
+    stop(CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN)
+  }
+
+  def exitImmediately(): Unit = {
+    // During shutdown, to avoid allocate slots in this worker,
+    // add this worker to master's excluded list. When restart, register 
worker will
+    // make master remove this worker from excluded list.
+    try {
+      masterClient.askSync[PbWorkerLostResponse](
+        WorkerLost(
+          host,
+          rpcPort,
+          pushPort,
+          fetchPort,
+          replicatePort,
+          MasterClient.genRequestId()),
+        classOf[PbWorkerLostResponse])
+    } catch {
+      case e: Throwable =>
+        logError(
+          s"Fail report to master, need wait PartitionLocation auto release: 
\n$partitionLocationInfo",
+          e)
+    }
+    shutdown.set(true)
+    stop(CelebornExitKind.EXIT_IMMEDIATELY)
+  }
+
   ShutdownHookManager.get().addShutdownHook(
     new Thread(new Runnable {
       override def run(): Unit = {
-        logInfo("Shutdown hook called.")
-        // During shutdown, to avoid allocate slots in this worker,
-        // add this worker to master's excluded list. When restart, register 
worker will
-        // make master remove this worker from excluded list.
-        try {
-          if (gracefulShutdown) {
-            masterClient.askSync(
-              ReportWorkerUnavailable(List(workerInfo).asJava),
-              OneWayMessageResponse.getClass)
-          } else {
-            masterClient.askSync[PbWorkerLostResponse](
-              WorkerLost(
-                host,
-                rpcPort,
-                pushPort,
-                fetchPort,
-                replicatePort,
-                MasterClient.genRequestId()),
-              classOf[PbWorkerLostResponse])
-          }
-        } catch {
-          case e: Throwable =>
-            logError(
-              s"Fail report to master, need wait PartitionLocation auto 
release: \n$partitionLocationInfo",
-              e)
-        }
-        shutdown.set(true)
-        if (gracefulShutdown) {
-          val interval = conf.workerGracefulShutdownCheckSlotsFinishedInterval
-          val timeout = conf.workerGracefulShutdownCheckSlotsFinishedTimeoutMs
-          var waitTimes = 0
-
-          def waitTime: Long = waitTimes * interval
-
-          while (!partitionLocationInfo.isEmpty && waitTime < timeout) {
-            Thread.sleep(interval)
-            waitTimes += 1
-          }
-          if (partitionLocationInfo.isEmpty) {
-            logInfo(s"Waiting for all PartitionLocation released cost 
${waitTime}ms.")
+        if (stopped) {
+          logInfo("Worker already stopped before call ShutdownHook.")
+        } else {
+          logInfo("Shutdown hook called.")
+          if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
+            shutdownGracefully()
           } else {
-            logWarning(s"Waiting for all PartitionLocation release cost 
${waitTime}ms, " +
-              s"unreleased PartitionLocation: \n$partitionLocationInfo")
+            exitImmediately()
           }
         }
-        stop(gracefulShutdown)
       }
     }),
     WORKER_SHUTDOWN_PRIORITY)
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 60991f37d..e180a785f 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -42,7 +42,7 @@ import 
org.apache.celeborn.common.metrics.source.AbstractSource
 import org.apache.celeborn.common.network.util.{NettyUtils, TransportConf}
 import org.apache.celeborn.common.protocol.{PartitionLocation, 
PartitionSplitMode, PartitionType}
 import org.apache.celeborn.common.quota.ResourceConsumption
-import org.apache.celeborn.common.util.{CelebornHadoopUtils, JavaUtils, 
PbSerDeUtils, ThreadUtils, Utils}
+import org.apache.celeborn.common.util.{CelebornExitKind, CelebornHadoopUtils, 
JavaUtils, PbSerDeUtils, ThreadUtils, Utils}
 import org.apache.celeborn.service.deploy.worker._
 import 
org.apache.celeborn.service.deploy.worker.memory.MemoryManager.MemoryPressureListener
 import 
org.apache.celeborn.service.deploy.worker.storage.StorageManager.hadoopFs
@@ -603,18 +603,25 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
     false
   }
 
-  def close(): Unit = {
+  def close(exitKind: Int): Unit = {
     if (db != null) {
-      try {
-        updateFileInfosInDB()
-        db.close()
-      } catch {
-        case exception: Exception =>
-          logError("Store recover data to LevelDB failed.", exception)
+      if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
+        try {
+          updateFileInfosInDB()
+          db.close()
+        } catch {
+          case exception: Exception =>
+            logError("Store recover data to LevelDB failed.", exception)
+        }
+      } else {
+        if (db != null) {
+          db.close()
+          new File(conf.workerGracefulShutdownRecoverPath, 
RECOVERY_FILE_NAME).delete()
+        }
       }
     }
     if (null != diskOperators) {
-      if (!conf.workerGracefulShutdown) {
+      if (exitKind != CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
         cleanupExpiredShuffleKey(shuffleKeySet())
       }
       ThreadUtils.parmap(
diff --git 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorterSuiteJ.java
 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorterSuiteJ.java
index be73f6c2e..ab480065f 100644
--- 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorterSuiteJ.java
+++ 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorterSuiteJ.java
@@ -39,6 +39,7 @@ import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.common.identity.UserIdentifier;
 import org.apache.celeborn.common.meta.FileInfo;
 import org.apache.celeborn.common.unsafe.Platform;
+import org.apache.celeborn.common.util.CelebornExitKind;
 import org.apache.celeborn.common.util.Utils;
 import org.apache.celeborn.service.deploy.worker.WorkerSource;
 import org.apache.celeborn.service.deploy.worker.memory.MemoryManager;
@@ -184,7 +185,7 @@ public class PartitionFilesSorterSuiteJ {
     partitionFilesSorter.initSortedShuffleFiles("application-3-1");
     partitionFilesSorter.updateSortedShuffleFiles("application-3-1", "0-0-1", 
0);
     partitionFilesSorter.deleteSortedShuffleFiles("application-2-1");
-    partitionFilesSorter.close();
+    partitionFilesSorter.close(CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN());
     PartitionFilesSorter partitionFilesSorter2 =
         new PartitionFilesSorter(MemoryManager.instance(), conf, new 
WorkerSource(conf));
     Assert.assertEquals(
@@ -193,7 +194,7 @@ public class PartitionFilesSorterSuiteJ {
     
Assert.assertEquals(partitionFilesSorter2.getSortedShuffleFiles("application-2-1"),
 null);
     Assert.assertEquals(
         
partitionFilesSorter2.getSortedShuffleFiles("application-3-1").toString(), 
"[0-0-1]");
-    partitionFilesSorter2.close();
+    partitionFilesSorter2.close(CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN());
     recoverPath.delete();
   }
 }
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
index 03fc957ec..1514c7647 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
@@ -24,7 +24,7 @@ import scala.collection.mutable
 
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.internal.Logging
-import org.apache.celeborn.common.util.Utils
+import org.apache.celeborn.common.util.{CelebornExitKind, Utils}
 import org.apache.celeborn.service.deploy.master.{Master, MasterArguments}
 import org.apache.celeborn.service.deploy.worker.{Worker, WorkerArguments}
 import org.apache.celeborn.service.deploy.worker.memory.MemoryManager
@@ -119,19 +119,19 @@ trait MiniClusterFeature extends Logging {
     // shutdown workers
     workerInfos.foreach {
       case (worker, _) =>
-        worker.stop(false)
+        worker.stop(CelebornExitKind.EXIT_IMMEDIATELY)
         worker.rpcEnv.shutdown()
     }
 
     // shutdown masters
-    masterInfo._1.stop(false)
+    masterInfo._1.stop(CelebornExitKind.EXIT_IMMEDIATELY)
     masterInfo._1.rpcEnv.shutdown()
 
     // interrupt threads
     Thread.sleep(5000)
     workerInfos.foreach {
       case (worker, thread) =>
-        worker.stop(false)
+        worker.stop(CelebornExitKind.EXIT_IMMEDIATELY)
         thread.interrupt()
     }
     workerInfos.clear()
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
index 5db152548..1fde28f0b 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
@@ -31,7 +31,7 @@ import org.scalatest.funsuite.AnyFunSuite
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.identity.UserIdentifier
 import org.apache.celeborn.common.protocol.{PartitionLocation, 
PartitionSplitMode, PartitionType}
-import org.apache.celeborn.common.util.JavaUtils
+import org.apache.celeborn.common.util.{CelebornExitKind, JavaUtils}
 import org.apache.celeborn.service.deploy.worker.{Worker, WorkerArguments}
 
 class WorkerSuite extends AnyFunSuite with BeforeAndAfterEach {
@@ -46,7 +46,7 @@ class WorkerSuite extends AnyFunSuite with BeforeAndAfterEach 
{
   override def afterEach(): Unit = {
     if (null != worker) {
       worker.rpcEnv.shutdown()
-      worker.stop(false)
+      worker.stop(CelebornExitKind.EXIT_IMMEDIATELY)
       worker = null
     }
   }

Reply via email to