This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 7ab8c58aa [CELEBORN-926] Enabled GRACEFUL SHUTDOWN, will meet 
IllegalMonitorStateException
7ab8c58aa is described below

commit 7ab8c58aa755fb2a69906ec31d154ea52e806979
Author: zwangsheng <[email protected]>
AuthorDate: Tue Aug 29 14:20:36 2023 +0800

    [CELEBORN-926] Enabled GRACEFUL SHUTDOWN, will meet 
IllegalMonitorStateException
    
    ### What changes were proposed in this pull request?
    Using `awaitTermination` instead of `shutdownNow`.
    
    ### Why are the changes needed?
    When we call `wait` function without get the object's monitor will meet 
`IllegalMonitorStateException`.
    And In `saveAllCommittedFileInfosToDB` this function, we are hard to get 
threshold monitor.
    
    `wait` method metioned:
    >IllegalMonitorStateException – if the current thread is not the owner of 
the object's monitor.
    
    We can using `awaitTermination` replace of  `shutdownNow`.
    
    According to desc about `shutdownNow` method:
    > This method does not wait for actively executing tasks to terminate. Use 
awaitTermination to do that.
    
    And `awaitTermination` metions:
    > Blocks until all tasks have completed execution after a shutdown request, 
or the timeout occurs, or the current thread is interrupted, whichever happens 
first.
    
    At all, `awaitTermination` is applicable to the current scenario.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    UT
    
    Closes #1849 from zwangsheng/CELEBORN-926.
    
    Authored-by: zwangsheng <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../deploy/worker/storage/StorageManager.scala     |  3 +-
 .../worker/storage/StorageManagerSuite.scala       | 34 ++++++++++++++++++++++
 2 files changed, 35 insertions(+), 2 deletions(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index a1bca599f..355c77e15 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -258,9 +258,8 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
   }
 
   def saveAllCommittedFileInfosToDB(): Unit = {
-    val runnables = saveCommittedFileInfosExecutor.shutdownNow()
     // save committed fileinfo to DB should be done within the time of 
saveCommittedFileInfoInterval
-    runnables.asScala.foreach(_.wait(saveCommittedFileInfoInterval))
+    
saveCommittedFileInfosExecutor.awaitTermination(saveCommittedFileInfoInterval, 
MILLISECONDS)
     // graceful shutdown might be timed out, persist all committed fileinfos 
to levelDB
     // final flush write through
     committedFileInfos.asScala.foreach { case (shuffleKey, files) =>
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala
new file mode 100644
index 000000000..5b747e9b1
--- /dev/null
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage
+
+import org.apache.celeborn.CelebornFunSuite
+import org.apache.celeborn.common.CelebornConf
+import 
org.apache.celeborn.common.CelebornConf.{WORKER_GRACEFUL_SHUTDOWN_ENABLED, 
WORKER_GRACEFUL_SHUTDOWN_RECOVER_PATH}
+import org.apache.celeborn.service.deploy.worker.WorkerSource
+
+class StorageManagerSuite extends CelebornFunSuite {
+
+  test("[CELEBORN-926] saveAllCommittedFileInfosToDB cause 
IllegalMonitorStateException") {
+    val conf = new CelebornConf().set(WORKER_GRACEFUL_SHUTDOWN_ENABLED, true)
+      .set(WORKER_GRACEFUL_SHUTDOWN_RECOVER_PATH, "/tmp/recover")
+    val storageManager = new StorageManager(conf, new WorkerSource(conf))
+    // should not throw IllegalMonitorStateException exception
+    storageManager.saveAllCommittedFileInfosToDB()
+  }
+}

Reply via email to