This is an automated email from the ASF dual-hosted git repository.
angerszhuuuu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 76201c92f [CELEBORN-820] Merge service shutdown and close method
76201c92f is described below
commit 76201c92f89671a4e21d1b82d2b4d0b25c16a24e
Author: Angerszhuuuu <[email protected]>
AuthorDate: Sat Jul 22 21:04:29 2023 +0800
[CELEBORN-820] Merge service shutdown and close method
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #1742 from AngersZhuuuu/CELEBORN-820.
Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: Angerszhuuuu <[email protected]>
---
.../org/apache/celeborn/service/deploy/master/Master.scala | 6 +++---
.../apache/celeborn/service/deploy/master/MasterSuite.scala | 2 +-
.../org/apache/celeborn/server/common/HttpService.scala | 12 ++----------
.../scala/org/apache/celeborn/server/common/Service.scala | 4 +---
.../org/apache/celeborn/service/deploy/worker/Worker.scala | 10 +++-------
.../apache/celeborn/service/deploy/MiniClusterFeature.scala | 6 +++---
.../celeborn/service/deploy/worker/storage/WorkerSuite.scala | 2 +-
7 files changed, 14 insertions(+), 28 deletions(-)
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 100e64477..4207dfc74 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -882,11 +882,11 @@ private[celeborn] class Master(
rpcEnv.awaitTermination()
}
- override def close(): Unit = synchronized {
+ override def stop(graceful: Boolean): Unit = synchronized {
if (!stopped) {
logInfo("Stopping Master")
- stop()
- super.close()
+ rpcEnv.stop(self)
+ super.stop(false)
logInfo("Master stopped.")
stopped = true
}
diff --git
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
index 5fceab10b..e3960e5d2 100644
---
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
+++
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
@@ -53,7 +53,7 @@ class MasterSuite extends AnyFunSuite
}
}.start()
Thread.sleep(5000L)
- master.close()
+ master.stop(false)
master.rpcEnv.shutdown()
}
}
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
index 3f2d19b90..49978ba2c 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
@@ -105,19 +105,11 @@ abstract class HttpService extends Service with Logging {
startHttpServer()
}
- override def close(): Unit = {
- // may be null when running the unit test
- if (null != httpServer) {
- httpServer.stop(true)
- }
- super.close()
- }
-
- override def shutdown(graceful: Boolean): Unit = {
+ override def stop(graceful: Boolean): Unit = {
// may be null when running the unit test
if (null != httpServer) {
httpServer.stop(graceful)
}
- super.shutdown(graceful)
+ super.stop(graceful)
}
}
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/Service.scala
b/service/src/main/scala/org/apache/celeborn/server/common/Service.scala
index 129f28f4d..10dad1c2d 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/Service.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/Service.scala
@@ -35,9 +35,7 @@ abstract class Service extends Logging {
}
}
- def close(): Unit = {}
-
- def shutdown(graceful: Boolean): Unit = {}
+ def stop(graceful: Boolean): Unit = {}
}
object Service {
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 37174b2c0..ff0e76e4f 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -392,11 +392,7 @@ private[celeborn] class Worker(
rpcEnv.awaitTermination()
}
- override def close(): Unit = synchronized {
- shutdown(gracefulShutdown)
- }
-
- override def shutdown(graceful: Boolean): Unit = {
+ override def stop(graceful: Boolean): Unit = {
if (!stopped) {
logInfo("Stopping Worker.")
@@ -440,7 +436,7 @@ private[celeborn] class Worker(
fetchServer.shutdown(graceful)
pushServer.shutdown(graceful)
- super.shutdown(graceful)
+ super.stop(graceful)
logInfo("Worker is stopped.")
stopped = true
@@ -618,7 +614,7 @@ private[celeborn] class Worker(
s"unreleased PartitionLocation: \n$partitionLocationInfo")
}
}
- close()
+ stop(gracefulShutdown)
}
}),
WORKER_SHUTDOWN_PRIORITY)
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
index a61854057..03fc957ec 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
@@ -119,19 +119,19 @@ trait MiniClusterFeature extends Logging {
// shutdown workers
workerInfos.foreach {
case (worker, _) =>
- worker.close()
+ worker.stop(false)
worker.rpcEnv.shutdown()
}
// shutdown masters
- masterInfo._1.close()
+ masterInfo._1.stop(false)
masterInfo._1.rpcEnv.shutdown()
// interrupt threads
Thread.sleep(5000)
workerInfos.foreach {
case (worker, thread) =>
- worker.shutdown(graceful = false)
+ worker.stop(false)
thread.interrupt()
}
workerInfos.clear()
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
index 01c823c0c..5db152548 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
@@ -46,7 +46,7 @@ class WorkerSuite extends AnyFunSuite with BeforeAndAfterEach
{
override def afterEach(): Unit = {
if (null != worker) {
worker.rpcEnv.shutdown()
- worker.shutdown(false)
+ worker.stop(false)
worker = null
}
}