This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 31218281b [CELEBORN-1386] LevelDBProvider/RocksDBProvider should 
create non-existent multi-level directory for LevelDB/RocksDB initialization
31218281b is described below

commit 31218281b4f329fb4f338060a972a244d50cad3d
Author: SteNicholas <[email protected]>
AuthorDate: Tue Apr 16 11:37:35 2024 +0800

    [CELEBORN-1386] LevelDBProvider/RocksDBProvider should create non-existent 
multi-level directory for LevelDB/RocksDB initialization
    
    ### What changes were proposed in this pull request?
    
    `LevelDBProvider`/`RocksDBProvider` creates non-existent multi-level 
directory for LevelDB/RocksDB initialization.
    
    ### Why are the changes needed?
    
    `RocksDBProvider` creates database if missing via 
`Options#setCreateIfMissing` when initializing RocksDB at present, which causes 
the following exception when `dbFile` is non-existent multi-level directory.
    
    ```
    2024-04-09T03:19:35.6807077Z 24/04/09 03:19:35,679 ERROR 
[pool-1-thread-1-ScalaTest-running-StorageManagerSuite] RocksDBProvider: error 
opening rocksdb file /tmp/recover/recovery.rdb. Creating new file, will not be 
able to recover state for existing applications
    2024-04-09T03:19:35.6810066Z org.rocksdb.RocksDBException: While mkdir if 
missing: /tmp/recover/recovery.rdb: No such file or directory
    2024-04-09T03:19:35.6811303Z    at org.rocksdb.RocksDB.open(Native Method)
    2024-04-09T03:19:35.6812052Z    at 
org.rocksdb.RocksDB.open(RocksDB.java:259)
    2024-04-09T03:19:35.6813431Z    at 
org.apache.celeborn.service.deploy.worker.shuffledb.RocksDBProvider.initRockDB(RocksDBProvider.java:66)
    2024-04-09T03:19:35.6815230Z    at 
org.apache.celeborn.service.deploy.worker.shuffledb.DBProvider.initDB(DBProvider.java:39)
    2024-04-09T03:19:35.6816975Z    at 
org.apache.celeborn.service.deploy.worker.storage.StorageManager.<init>(StorageManager.scala:216)
    2024-04-09T03:19:35.6818904Z    at 
org.apache.celeborn.service.deploy.worker.storage.StorageManagerSuite.$anonfun$new$1(StorageManagerSuite.scala:30)
    2024-04-09T03:19:35.6820538Z    at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    2024-04-09T03:19:35.6821620Z    at 
org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
    2024-04-09T03:19:35.6822585Z    at 
org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
    2024-04-09T03:19:35.6823948Z    at 
org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
    2024-04-09T03:19:35.6824908Z    at 
org.scalatest.Transformer.apply(Transformer.scala:22)
    2024-04-09T03:19:35.6825862Z    at 
org.scalatest.Transformer.apply(Transformer.scala:20)
    2024-04-09T03:19:35.6827073Z    at 
org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
    2024-04-09T03:19:35.6828439Z    at 
org.apache.celeborn.CelebornFunSuite.withFixture(CelebornFunSuite.scala:157)
    2024-04-09T03:19:35.6829909Z    at 
org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
    2024-04-09T03:19:35.6831386Z    at 
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
    2024-04-09T03:19:35.6832590Z    at 
org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
    2024-04-09T03:19:35.6833727Z    at 
org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
    2024-04-09T03:19:35.6835034Z    at 
org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
    2024-04-09T03:19:35.6836660Z    at 
org.apache.celeborn.CelebornFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(CelebornFunSuite.scala:35)
    2024-04-09T03:19:35.6838253Z    at 
org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
    2024-04-09T03:19:35.6839512Z    at 
org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
    2024-04-09T03:19:35.6840766Z    at 
org.apache.celeborn.CelebornFunSuite.runTest(CelebornFunSuite.scala:35)
    2024-04-09T03:19:35.6842131Z    at 
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
    2024-04-09T03:19:35.6843459Z    at 
org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
    2024-04-09T03:19:35.6844543Z    at 
scala.collection.immutable.List.foreach(List.scala:431)
    2024-04-09T03:19:35.6845566Z    at 
org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
    2024-04-09T03:19:35.6846677Z    at 
org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
    2024-04-09T03:19:35.6847722Z    at 
org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
    2024-04-09T03:19:35.6849045Z    at 
org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
    2024-04-09T03:19:35.6850358Z    at 
org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
    2024-04-09T03:19:35.6851608Z    at 
org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
    2024-04-09T03:19:35.6852566Z    at org.scalatest.Suite.run(Suite.scala:1114)
    2024-04-09T03:19:35.6853295Z    at 
org.scalatest.Suite.run$(Suite.scala:1096)
    2024-04-09T03:19:35.6854857Z    at 
org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
    2024-04-09T03:19:35.6856472Z    at 
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
    2024-04-09T03:19:35.6857654Z    at 
org.scalatest.SuperEngine.runImpl(Engine.scala:535)
    2024-04-09T03:19:35.6858737Z    at 
org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
    2024-04-09T03:19:35.6859974Z    at 
org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
    2024-04-09T03:19:35.6861519Z    at 
org.apache.celeborn.CelebornFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(CelebornFunSuite.scala:35)
    2024-04-09T03:19:35.6863041Z    at 
org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
    2024-04-09T03:19:35.6864233Z    at 
org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
    2024-04-09T03:19:35.6865355Z    at 
org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
    2024-04-09T03:19:35.6866487Z    at 
org.apache.celeborn.CelebornFunSuite.run(CelebornFunSuite.scala:35)
    2024-04-09T03:19:35.6867764Z    at 
org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
    2024-04-09T03:19:35.6869119Z    at 
org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
    2024-04-09T03:19:35.6870146Z    at 
sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
    2024-04-09T03:19:35.6871069Z    at 
java.util.concurrent.FutureTask.run(FutureTask.java:266)
    2024-04-09T03:19:35.6872490Z    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    2024-04-09T03:19:35.6873824Z    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    2024-04-09T03:19:35.6874805Z    at java.lang.Thread.run(Thread.java:750)
    2024-04-09T03:19:35.6887377Z 24/04/09 03:19:35,687 ERROR 
[pool-1-thread-1-ScalaTest-running-StorageManagerSuite] StorageManager: Init 
level DB failed:
    2024-04-09T03:19:35.6889076Z java.io.IOException: Unable to create state 
store
    2024-04-09T03:19:35.6890473Z    at 
org.apache.celeborn.service.deploy.worker.shuffledb.RocksDBProvider.initRockDB(RocksDBProvider.java:98)
    2024-04-09T03:19:35.6894605Z    at 
org.apache.celeborn.service.deploy.worker.shuffledb.DBProvider.initDB(DBProvider.java:39)
    2024-04-09T03:19:35.6904452Z    at 
org.apache.celeborn.service.deploy.worker.storage.StorageManager.<init>(StorageManager.scala:216)
    2024-04-09T03:19:35.6936013Z    at 
org.apache.celeborn.service.deploy.worker.storage.StorageManagerSuite.$anonfun$new$1(StorageManagerSuite.scala:30)
    2024-04-09T03:19:35.6937634Z    at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    2024-04-09T03:19:35.6938639Z    at 
org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
    2024-04-09T03:19:35.6939493Z    at 
org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
    2024-04-09T03:19:35.6940348Z    at 
org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
    2024-04-09T03:19:35.6941200Z    at 
org.scalatest.Transformer.apply(Transformer.scala:22)
    2024-04-09T03:19:35.6942029Z    at 
org.scalatest.Transformer.apply(Transformer.scala:20)
    2024-04-09T03:19:35.6943079Z    at 
org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
    2024-04-09T03:19:35.6944350Z    at 
org.apache.celeborn.CelebornFunSuite.withFixture(CelebornFunSuite.scala:157)
    2024-04-09T03:19:35.6945683Z    at 
org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
    2024-04-09T03:19:35.6947057Z    at 
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
    2024-04-09T03:19:35.6948181Z    at 
org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
    2024-04-09T03:19:35.6949222Z    at 
org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
    2024-04-09T03:19:35.6950415Z    at 
org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
    2024-04-09T03:19:35.6951915Z    at 
org.apache.celeborn.CelebornFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(CelebornFunSuite.scala:35)
    2024-04-09T03:19:35.6953391Z    at 
org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
    2024-04-09T03:19:35.6954811Z    at 
org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
    2024-04-09T03:19:35.6955990Z    at 
org.apache.celeborn.CelebornFunSuite.runTest(CelebornFunSuite.scala:35)
    2024-04-09T03:19:35.6957249Z    at 
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
    2024-04-09T03:19:35.6958473Z    at 
org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
    2024-04-09T03:19:35.6959465Z    at 
scala.collection.immutable.List.foreach(List.scala:431)
    2024-04-09T03:19:35.6960422Z    at 
org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
    2024-04-09T03:19:35.6961411Z    at 
org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
    2024-04-09T03:19:35.6962347Z    at 
org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
    2024-04-09T03:19:35.6963404Z    at 
org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
    2024-04-09T03:19:35.6964635Z    at 
org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
    2024-04-09T03:19:35.6965797Z    at 
org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
    2024-04-09T03:19:35.6966679Z    at org.scalatest.Suite.run(Suite.scala:1114)
    2024-04-09T03:19:35.6967341Z    at 
org.scalatest.Suite.run$(Suite.scala:1096)
    2024-04-09T03:19:35.6968835Z    at 
org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
    2024-04-09T03:19:35.6970329Z    at 
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
    2024-04-09T03:19:35.6971604Z    at 
org.scalatest.SuperEngine.runImpl(Engine.scala:535)
    2024-04-09T03:19:35.6972573Z    at 
org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
    2024-04-09T03:19:35.6973695Z    at 
org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
    2024-04-09T03:19:35.6975279Z    at 
org.apache.celeborn.CelebornFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(CelebornFunSuite.scala:35)
    2024-04-09T03:19:35.6976986Z    at 
org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
    2024-04-09T03:19:35.6978212Z    at 
org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
    2024-04-09T03:19:35.6979969Z    at 
org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
    2024-04-09T03:19:35.6982049Z    at 
org.apache.celeborn.CelebornFunSuite.run(CelebornFunSuite.scala:35)
    2024-04-09T03:19:35.6982851Z    at 
org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
    2024-04-09T03:19:35.6983608Z    at 
org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
    2024-04-09T03:19:35.6984185Z    at 
sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
    2024-04-09T03:19:35.6984688Z    at 
java.util.concurrent.FutureTask.run(FutureTask.java:266)
    2024-04-09T03:19:35.6985336Z    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    2024-04-09T03:19:35.6986127Z    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    2024-04-09T03:19:35.6986690Z    at java.lang.Thread.run(Thread.java:750)
    2024-04-09T03:19:35.6987383Z Caused by: org.rocksdb.RocksDBException: While 
mkdir if missing: /tmp/recover/recovery.rdb: No such file or directory
    2024-04-09T03:19:35.6988110Z    at org.rocksdb.RocksDB.open(Native Method)
    2024-04-09T03:19:35.6988520Z    at 
org.rocksdb.RocksDB.open(RocksDB.java:259)
    2024-04-09T03:19:35.6989257Z    at 
org.apache.celeborn.service.deploy.worker.shuffledb.RocksDBProvider.initRockDB(RocksDBProvider.java:96)
    2024-04-09T03:19:35.6989929Z    ... 48 more
    ```
    
    Because `mkdir` does not support creating non-existent multi-level 
directory, `CreateDirIfMissing` does not support creation of non-existent 
multi-level directory in 
[CreateDirIfMissing](https://github.com/facebook/rocksdb/blob/main/env/fs_posix.cc#L637).
 Meanwhile, `CreateDir` also does not support the creation in 
[CreateDir](https://github.com/google/leveldb/blob/main/util/env_posix.cc#L625).
 Therefore `LevelDBProvider`/`RocksDBProvider` should create non-existent 
multi-level direct [...]
    
    ```
    IOStatus CreateDirIfMissing(const std::string& name,
                                  const IOOptions& /*opts*/,
                                  IODebugContext* /*dbg*/) override {
        if (mkdir(name.c_str(), 0755) != 0) {
          if (errno != EEXIST) {
            return IOError("While mkdir if missing", name, errno);
          } else if (!DirExists(name)) {  // Check that name is actually a
                                          // directory.
            // Message is taken from mkdir
            return IOStatus::IOError("`" + name +
                                     "' exists but is not a directory");
          }
        }
        return IOStatus::OK();
     }
    ```
    
    ```
    Status CreateDir(const std::string& dirname) override {
        if (::mkdir(dirname.c_str(), 0755) != 0) {
          return PosixError(dirname, errno);
        }
        return Status::OK();
    }
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    `DBProviderSuiteJ#testRockDBCheckVersionFailed`
    
    Closes #2458 from SteNicholas/CELEBORN-1386.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../deploy/worker/shuffledb/LevelDBProvider.java   | 41 ++++++++++++-------
 .../deploy/worker/shuffledb/RocksDBProvider.java   | 46 ++++++++++++++++------
 .../deploy/worker/shuffledb/DBProviderSuiteJ.java  | 24 +++++++++--
 3 files changed, 81 insertions(+), 30 deletions(-)

diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/LevelDBProvider.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/LevelDBProvider.java
index 7bac20415..839b47dba 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/LevelDBProvider.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/LevelDBProvider.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 
+import org.apache.commons.io.FileUtils;
 import org.fusesource.leveldbjni.JniDBFactory;
 import org.fusesource.leveldbjni.internal.NativeDB;
 import org.iq80.leveldb.DB;
@@ -38,8 +39,7 @@ import org.apache.celeborn.common.util.PbSerDeUtils;
 public class LevelDBProvider {
   private static final Logger logger = 
LoggerFactory.getLogger(LevelDBProvider.class);
 
-  public static org.iq80.leveldb.DB initLevelDB(File dbFile, StoreVersion 
version)
-      throws IOException {
+  public static DB initLevelDB(File dbFile, StoreVersion version) throws 
IOException {
     org.iq80.leveldb.DB tmpDb = null;
     if (dbFile != null) {
       Options options = new Options();
@@ -50,7 +50,7 @@ public class LevelDBProvider {
       } catch (NativeDB.DBException e) {
         if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
           logger.info("Creating state database at " + dbFile);
-          options.createIfMissing(true);
+          createIfMissing(options, dbFile);
           try {
             tmpDb = JniDBFactory.factory.open(dbFile, options);
           } catch (NativeDB.DBException dbExc) {
@@ -60,21 +60,21 @@ public class LevelDBProvider {
           // the leveldb file seems to be corrupt somehow.  Lets just blow it 
away and create a new
           // one, so we can keep processing new apps
           logger.error(
-              "error opening leveldb file {}.  Creating new file, will not be 
able to "
+              "Error opening leveldb file {}.  Creating new file, will not be 
able to "
                   + "recover state for existing applications",
               dbFile,
               e);
           if (dbFile.isDirectory()) {
             for (File f : dbFile.listFiles()) {
               if (!f.delete()) {
-                logger.warn("error deleting {}", f.getPath());
+                logger.warn("Error deleting {}", f.getPath());
               }
             }
           }
           if (!dbFile.delete()) {
-            logger.warn("error deleting {}", dbFile.getPath());
+            logger.warn("Error deleting {}", dbFile.getPath());
           }
-          options.createIfMissing(true);
+          createIfMissing(options, dbFile);
           try {
             tmpDb = JniDBFactory.factory.open(dbFile, options);
           } catch (NativeDB.DBException dbExc) {
@@ -93,6 +93,19 @@ public class LevelDBProvider {
     return tmpDb;
   }
 
+  private static void createIfMissing(Options dbOptions, File dbFile) {
+    logger.info("Creating database file {} if missing", dbFile);
+    dbOptions.createIfMissing(true);
+    // LevelDB does not support creating non-existent multi-level directory.
+    if (!dbFile.exists()) {
+      try {
+        FileUtils.forceMkdir(dbFile);
+      } catch (IOException e) {
+        logger.warn("Failed to create database file {}", dbFile, e);
+      }
+    }
+  }
+
   private static class LevelDBLogger implements org.iq80.leveldb.Logger {
     private static final Logger LOG = 
LoggerFactory.getLogger(LevelDBLogger.class);
 
@@ -107,26 +120,26 @@ public class LevelDBProvider {
    * Minor version differences are allowed -- meaning we should be able to 
read dbs that are either
    * earlier *or* later on the minor version.
    */
-  public static void checkVersion(DB db, StoreVersion newversion) throws 
IOException {
+  public static void checkVersion(DB db, StoreVersion newVersion) throws 
IOException {
     byte[] bytes = db.get(StoreVersion.KEY);
     if (bytes == null) {
-      storeVersion(db, newversion);
+      storeVersion(db, newVersion);
     } else {
       ArrayList<Integer> versions = PbSerDeUtils.fromPbStoreVersion(bytes);
       StoreVersion version = new StoreVersion(versions.get(0), 
versions.get(1));
-      if (version.major != newversion.major) {
+      if (version.major != newVersion.major) {
         throw new IOException(
-            "cannot read state DB with version "
+            "Cannot read state DB with version "
                 + version
                 + ", incompatible "
                 + "with current version "
-                + newversion);
+                + newVersion);
       }
-      storeVersion(db, newversion);
+      storeVersion(db, newVersion);
     }
   }
 
-  public static void storeVersion(DB db, StoreVersion version) throws 
IOException {
+  public static void storeVersion(DB db, StoreVersion version) {
     db.put(StoreVersion.KEY, PbSerDeUtils.toPbStoreVersion(version.major, 
version.minor));
   }
 }
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/RocksDBProvider.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/RocksDBProvider.java
index 4229008c0..75f728a30 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/RocksDBProvider.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/RocksDBProvider.java
@@ -22,7 +22,14 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Objects;
 
-import org.rocksdb.*;
+import org.apache.commons.io.FileUtils;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.BloomFilter;
+import org.rocksdb.CompressionType;
+import org.rocksdb.InfoLogLevel;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.Status;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,7 +74,7 @@ public class RocksDBProvider {
       } catch (RocksDBException e) {
         if (e.getStatus().getCode() == Status.Code.NotFound) {
           logger.info("Creating state database at " + dbFile);
-          dbOptions.setCreateIfMissing(true);
+          createIfMissing(dbOptions, dbFile);
           try {
             tmpDb = org.rocksdb.RocksDB.open(dbOptions, dbFile.toString());
           } catch (RocksDBException dbExc) {
@@ -77,21 +84,21 @@ public class RocksDBProvider {
           // the RocksDB file seems to be corrupt somehow.  Let's just blow it 
away and create
           // a new one, so we can keep processing new apps
           logger.error(
-              "error opening rocksdb file {}. Creating new file, will not be 
able to "
+              "Error opening rocksdb file {}. Creating new file, will not be 
able to "
                   + "recover state for existing applications",
               dbFile,
               e);
           if (dbFile.isDirectory()) {
             for (File f : Objects.requireNonNull(dbFile.listFiles())) {
               if (!f.delete()) {
-                logger.warn("error deleting {}", f.getPath());
+                logger.warn("Error deleting {}", f.getPath());
               }
             }
           }
           if (!dbFile.delete()) {
-            logger.warn("error deleting {}", dbFile.getPath());
+            logger.warn("Error deleting {}", dbFile.getPath());
           }
-          dbOptions.setCreateIfMissing(true);
+          createIfMissing(dbOptions, dbFile);
           try {
             tmpDb = org.rocksdb.RocksDB.open(dbOptions, dbFile.toString());
           } catch (RocksDBException dbExc) {
@@ -114,6 +121,19 @@ public class RocksDBProvider {
     return tmpDb;
   }
 
+  private static void createIfMissing(Options dbOptions, File dbFile) {
+    logger.info("Creating database file {} if missing", dbFile);
+    dbOptions.setCreateIfMissing(true);
+    // RocksDB does not support creating non-existent multi-level directory.
+    if (!dbFile.exists()) {
+      try {
+        FileUtils.forceMkdir(dbFile);
+      } catch (IOException e) {
+        logger.warn("Failed to create database file {}", dbFile, e);
+      }
+    }
+  }
+
   private static class RocksDBLogger extends org.rocksdb.Logger {
     private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBLogger.class);
 
@@ -134,28 +154,28 @@ public class RocksDBProvider {
    * Minor version differences are allowed -- meaning we should be able to 
read dbs that are either
    * earlier *or* later on the minor version.
    */
-  public static void checkVersion(org.rocksdb.RocksDB db, StoreVersion 
newversion)
+  public static void checkVersion(org.rocksdb.RocksDB db, StoreVersion 
newVersion)
       throws IOException, RocksDBException {
     byte[] bytes = db.get(StoreVersion.KEY);
     if (bytes == null) {
-      storeVersion(db, newversion);
+      storeVersion(db, newVersion);
     } else {
       ArrayList<Integer> versions = PbSerDeUtils.fromPbStoreVersion(bytes);
       StoreVersion version = new StoreVersion(versions.get(0), 
versions.get(1));
-      if (version.major != newversion.major) {
+      if (version.major != newVersion.major) {
         throw new IOException(
-            "cannot read state DB with version "
+            "Cannot read state DB with version "
                 + version
                 + ", incompatible "
                 + "with current version "
-                + newversion);
+                + newVersion);
       }
-      storeVersion(db, newversion);
+      storeVersion(db, newVersion);
     }
   }
 
   public static void storeVersion(org.rocksdb.RocksDB db, StoreVersion version)
-      throws IOException, RocksDBException {
+      throws RocksDBException {
     db.put(StoreVersion.KEY, PbSerDeUtils.toPbStoreVersion(version.major, 
version.minor));
   }
 }
diff --git 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/shuffledb/DBProviderSuiteJ.java
 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/shuffledb/DBProviderSuiteJ.java
index e9399210b..69f0aa190 100644
--- 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/shuffledb/DBProviderSuiteJ.java
+++ 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/shuffledb/DBProviderSuiteJ.java
@@ -23,15 +23,30 @@ import static org.junit.Assume.assumeFalse;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
 
 import org.apache.commons.lang3.SystemUtils;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
 
 import org.apache.celeborn.common.util.JavaUtils;
 import org.apache.celeborn.common.util.Utils;
 
+@RunWith(Parameterized.class)
 public class DBProviderSuiteJ {
 
+  @Parameter public boolean createDirectory;
+
+  @Parameters(name = "createDirectory: {0}")
+  public static Collection<Object> data() {
+    return Arrays.asList(new Object[] {true, false});
+  }
+
   @Test
   public void testRockDBCheckVersionFailed() throws IOException {
     testCheckVersionFailed(DBBackend.ROCKSDB, "rocksdb");
@@ -44,8 +59,11 @@ public class DBProviderSuiteJ {
   }
 
   private void testCheckVersionFailed(DBBackend dbBackend, String namePrefix) 
throws IOException {
-    String root = System.getProperty("java.io.tmpdir");
-    File dbFile = Utils.createDirectory(root, namePrefix);
+    File dbDir = new File(System.getProperty("java.io.tmpdir"), namePrefix);
+    File dbFile =
+        createDirectory
+            ? Utils.createDirectory(dbDir.getPath(), namePrefix)
+            : new File(dbDir.getPath(), String.format("%s-%s", namePrefix, 
UUID.randomUUID()));
     try {
       StoreVersion v1 = new StoreVersion(1, 0);
       DBProvider.initDB(dbBackend, dbFile, v1).close();
@@ -54,7 +72,7 @@ public class DBProviderSuiteJ {
           assertThrows(IOException.class, () -> DBProvider.initDB(dbBackend, 
dbFile, v2));
       assertTrue(ioe.getMessage().contains("incompatible with current version 
StoreVersion[2.0]"));
     } finally {
-      JavaUtils.deleteRecursively(dbFile);
+      JavaUtils.deleteRecursively(dbDir);
     }
   }
 }

Reply via email to