This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new 5dca65143 [CELEBORN-926] Enabled GRACEFUL SHUTDOWN, will meet
IllegalMonitorStateException
5dca65143 is described below
commit 5dca65143abb7f961c07b03f5684ecf83ad343b0
Author: zwangsheng <[email protected]>
AuthorDate: Tue Aug 29 14:20:36 2023 +0800
[CELEBORN-926] Enabled GRACEFUL SHUTDOWN, will meet
IllegalMonitorStateException
### What changes were proposed in this pull request?
Using `awaitTermination` instead of `shutdownNow`.
### Why are the changes needed?
When we call `wait` function without get the object's monitor will meet
`IllegalMonitorStateException`.
And In `saveAllCommittedFileInfosToDB` this function, we are hard to get
threshold monitor.
`wait` method metioned:
>IllegalMonitorStateException – if the current thread is not the owner of
the object's monitor.
We can using `awaitTermination` replace of `shutdownNow`.
According to desc about `shutdownNow` method:
> This method does not wait for actively executing tasks to terminate. Use
awaitTermination to do that.
And `awaitTermination` metions:
> Blocks until all tasks have completed execution after a shutdown request,
or the timeout occurs, or the current thread is interrupted, whichever happens
first.
At all, `awaitTermination` is applicable to the current scenario.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT
Closes #1849 from zwangsheng/CELEBORN-926.
Authored-by: zwangsheng <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
(cherry picked from commit 7ab8c58aa755fb2a69906ec31d154ea52e806979)
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../deploy/worker/storage/StorageManager.scala | 3 +-
.../worker/storage/StorageManagerSuite.scala | 34 ++++++++++++++++++++++
2 files changed, 35 insertions(+), 2 deletions(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index a1bca599f..355c77e15 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -258,9 +258,8 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
}
def saveAllCommittedFileInfosToDB(): Unit = {
- val runnables = saveCommittedFileInfosExecutor.shutdownNow()
// save committed fileinfo to DB should be done within the time of
saveCommittedFileInfoInterval
- runnables.asScala.foreach(_.wait(saveCommittedFileInfoInterval))
+
saveCommittedFileInfosExecutor.awaitTermination(saveCommittedFileInfoInterval,
MILLISECONDS)
// graceful shutdown might be timed out, persist all committed fileinfos
to levelDB
// final flush write through
committedFileInfos.asScala.foreach { case (shuffleKey, files) =>
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala
new file mode 100644
index 000000000..5b747e9b1
--- /dev/null
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage
+
+import org.apache.celeborn.CelebornFunSuite
+import org.apache.celeborn.common.CelebornConf
+import
org.apache.celeborn.common.CelebornConf.{WORKER_GRACEFUL_SHUTDOWN_ENABLED,
WORKER_GRACEFUL_SHUTDOWN_RECOVER_PATH}
+import org.apache.celeborn.service.deploy.worker.WorkerSource
+
+class StorageManagerSuite extends CelebornFunSuite {
+
+ test("[CELEBORN-926] saveAllCommittedFileInfosToDB cause
IllegalMonitorStateException") {
+ val conf = new CelebornConf().set(WORKER_GRACEFUL_SHUTDOWN_ENABLED, true)
+ .set(WORKER_GRACEFUL_SHUTDOWN_RECOVER_PATH, "/tmp/recover")
+ val storageManager = new StorageManager(conf, new WorkerSource(conf))
+ // should not throw IllegalMonitorStateException exception
+ storageManager.saveAllCommittedFileInfosToDB()
+ }
+}