This is an automated email from the ASF dual-hosted git repository.

angerszhuuuu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 76201c92f [CELEBORN-820] Merge service shutdown and close method
76201c92f is described below

commit 76201c92f89671a4e21d1b82d2b4d0b25c16a24e
Author: Angerszhuuuu <[email protected]>
AuthorDate: Sat Jul 22 21:04:29 2023 +0800

    [CELEBORN-820] Merge service shutdown and close method
    
    ### What changes were proposed in this pull request?
    As title
    
    ### Why are the changes needed?
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #1742 from AngersZhuuuu/CELEBORN-820.
    
    Authored-by: Angerszhuuuu <[email protected]>
    Signed-off-by: Angerszhuuuu <[email protected]>
---
 .../org/apache/celeborn/service/deploy/master/Master.scala   |  6 +++---
 .../apache/celeborn/service/deploy/master/MasterSuite.scala  |  2 +-
 .../org/apache/celeborn/server/common/HttpService.scala      | 12 ++----------
 .../scala/org/apache/celeborn/server/common/Service.scala    |  4 +---
 .../org/apache/celeborn/service/deploy/worker/Worker.scala   | 10 +++-------
 .../apache/celeborn/service/deploy/MiniClusterFeature.scala  |  6 +++---
 .../celeborn/service/deploy/worker/storage/WorkerSuite.scala |  2 +-
 7 files changed, 14 insertions(+), 28 deletions(-)

diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 100e64477..4207dfc74 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -882,11 +882,11 @@ private[celeborn] class Master(
     rpcEnv.awaitTermination()
   }
 
-  override def close(): Unit = synchronized {
+  override def stop(graceful: Boolean): Unit = synchronized {
     if (!stopped) {
       logInfo("Stopping Master")
-      stop()
-      super.close()
+      rpcEnv.stop(self)
+      super.stop(false)
       logInfo("Master stopped.")
       stopped = true
     }
diff --git 
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
index 5fceab10b..e3960e5d2 100644
--- 
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
+++ 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
@@ -53,7 +53,7 @@ class MasterSuite extends AnyFunSuite
       }
     }.start()
     Thread.sleep(5000L)
-    master.close()
+    master.stop(false)
     master.rpcEnv.shutdown()
   }
 }
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala 
b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
index 3f2d19b90..49978ba2c 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
@@ -105,19 +105,11 @@ abstract class HttpService extends Service with Logging {
     startHttpServer()
   }
 
-  override def close(): Unit = {
-    // may be null when running the unit test
-    if (null != httpServer) {
-      httpServer.stop(true)
-    }
-    super.close()
-  }
-
-  override def shutdown(graceful: Boolean): Unit = {
+  override def stop(graceful: Boolean): Unit = {
     // may be null when running the unit test
     if (null != httpServer) {
       httpServer.stop(graceful)
     }
-    super.shutdown(graceful)
+    super.stop(graceful)
   }
 }
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/Service.scala 
b/service/src/main/scala/org/apache/celeborn/server/common/Service.scala
index 129f28f4d..10dad1c2d 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/Service.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/Service.scala
@@ -35,9 +35,7 @@ abstract class Service extends Logging {
     }
   }
 
-  def close(): Unit = {}
-
-  def shutdown(graceful: Boolean): Unit = {}
+  def stop(graceful: Boolean): Unit = {}
 }
 
 object Service {
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 37174b2c0..ff0e76e4f 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -392,11 +392,7 @@ private[celeborn] class Worker(
     rpcEnv.awaitTermination()
   }
 
-  override def close(): Unit = synchronized {
-    shutdown(gracefulShutdown)
-  }
-
-  override def shutdown(graceful: Boolean): Unit = {
+  override def stop(graceful: Boolean): Unit = {
     if (!stopped) {
       logInfo("Stopping Worker.")
 
@@ -440,7 +436,7 @@ private[celeborn] class Worker(
       fetchServer.shutdown(graceful)
       pushServer.shutdown(graceful)
 
-      super.shutdown(graceful)
+      super.stop(graceful)
 
       logInfo("Worker is stopped.")
       stopped = true
@@ -618,7 +614,7 @@ private[celeborn] class Worker(
               s"unreleased PartitionLocation: \n$partitionLocationInfo")
           }
         }
-        close()
+        stop(gracefulShutdown)
       }
     }),
     WORKER_SHUTDOWN_PRIORITY)
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
index a61854057..03fc957ec 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
@@ -119,19 +119,19 @@ trait MiniClusterFeature extends Logging {
     // shutdown workers
     workerInfos.foreach {
       case (worker, _) =>
-        worker.close()
+        worker.stop(false)
         worker.rpcEnv.shutdown()
     }
 
     // shutdown masters
-    masterInfo._1.close()
+    masterInfo._1.stop(false)
     masterInfo._1.rpcEnv.shutdown()
 
     // interrupt threads
     Thread.sleep(5000)
     workerInfos.foreach {
       case (worker, thread) =>
-        worker.shutdown(graceful = false)
+        worker.stop(false)
         thread.interrupt()
     }
     workerInfos.clear()
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
index 01c823c0c..5db152548 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
@@ -46,7 +46,7 @@ class WorkerSuite extends AnyFunSuite with BeforeAndAfterEach 
{
   override def afterEach(): Unit = {
     if (null != worker) {
       worker.rpcEnv.shutdown()
-      worker.shutdown(false)
+      worker.stop(false)
       worker = null
     }
   }

Reply via email to