This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 31218281b [CELEBORN-1386] LevelDBProvider/RocksDBProvider should
create non-existent multi-level directory for LevelDB/RocksDB initialization
31218281b is described below
commit 31218281b4f329fb4f338060a972a244d50cad3d
Author: SteNicholas <[email protected]>
AuthorDate: Tue Apr 16 11:37:35 2024 +0800
[CELEBORN-1386] LevelDBProvider/RocksDBProvider should create non-existent
multi-level directory for LevelDB/RocksDB initialization
### What changes were proposed in this pull request?
`LevelDBProvider`/`RocksDBProvider` creates non-existent multi-level
directory for LevelDB/RocksDB initialization.
### Why are the changes needed?
`RocksDBProvider` creates database if missing via
`Options#setCreateIfMissing` when initializing RocksDB at present, which causes
the following exception when `dbFile` is non-existent multi-level directory.
```
2024-04-09T03:19:35.6807077Z 24/04/09 03:19:35,679 ERROR
[pool-1-thread-1-ScalaTest-running-StorageManagerSuite] RocksDBProvider: error
opening rocksdb file /tmp/recover/recovery.rdb. Creating new file, will not be
able to recover state for existing applications
2024-04-09T03:19:35.6810066Z org.rocksdb.RocksDBException: While mkdir if
missing: /tmp/recover/recovery.rdb: No such file or directory
2024-04-09T03:19:35.6811303Z at org.rocksdb.RocksDB.open(Native Method)
2024-04-09T03:19:35.6812052Z at
org.rocksdb.RocksDB.open(RocksDB.java:259)
2024-04-09T03:19:35.6813431Z at
org.apache.celeborn.service.deploy.worker.shuffledb.RocksDBProvider.initRockDB(RocksDBProvider.java:66)
2024-04-09T03:19:35.6815230Z at
org.apache.celeborn.service.deploy.worker.shuffledb.DBProvider.initDB(DBProvider.java:39)
2024-04-09T03:19:35.6816975Z at
org.apache.celeborn.service.deploy.worker.storage.StorageManager.<init>(StorageManager.scala:216)
2024-04-09T03:19:35.6818904Z at
org.apache.celeborn.service.deploy.worker.storage.StorageManagerSuite.$anonfun$new$1(StorageManagerSuite.scala:30)
2024-04-09T03:19:35.6820538Z at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
2024-04-09T03:19:35.6821620Z at
org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
2024-04-09T03:19:35.6822585Z at
org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
2024-04-09T03:19:35.6823948Z at
org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
2024-04-09T03:19:35.6824908Z at
org.scalatest.Transformer.apply(Transformer.scala:22)
2024-04-09T03:19:35.6825862Z at
org.scalatest.Transformer.apply(Transformer.scala:20)
2024-04-09T03:19:35.6827073Z at
org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
2024-04-09T03:19:35.6828439Z at
org.apache.celeborn.CelebornFunSuite.withFixture(CelebornFunSuite.scala:157)
2024-04-09T03:19:35.6829909Z at
org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
2024-04-09T03:19:35.6831386Z at
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
2024-04-09T03:19:35.6832590Z at
org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
2024-04-09T03:19:35.6833727Z at
org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
2024-04-09T03:19:35.6835034Z at
org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
2024-04-09T03:19:35.6836660Z at
org.apache.celeborn.CelebornFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(CelebornFunSuite.scala:35)
2024-04-09T03:19:35.6838253Z at
org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
2024-04-09T03:19:35.6839512Z at
org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
2024-04-09T03:19:35.6840766Z at
org.apache.celeborn.CelebornFunSuite.runTest(CelebornFunSuite.scala:35)
2024-04-09T03:19:35.6842131Z at
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
2024-04-09T03:19:35.6843459Z at
org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
2024-04-09T03:19:35.6844543Z at
scala.collection.immutable.List.foreach(List.scala:431)
2024-04-09T03:19:35.6845566Z at
org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
2024-04-09T03:19:35.6846677Z at
org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
2024-04-09T03:19:35.6847722Z at
org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
2024-04-09T03:19:35.6849045Z at
org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
2024-04-09T03:19:35.6850358Z at
org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
2024-04-09T03:19:35.6851608Z at
org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
2024-04-09T03:19:35.6852566Z at org.scalatest.Suite.run(Suite.scala:1114)
2024-04-09T03:19:35.6853295Z at
org.scalatest.Suite.run$(Suite.scala:1096)
2024-04-09T03:19:35.6854857Z at
org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
2024-04-09T03:19:35.6856472Z at
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
2024-04-09T03:19:35.6857654Z at
org.scalatest.SuperEngine.runImpl(Engine.scala:535)
2024-04-09T03:19:35.6858737Z at
org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
2024-04-09T03:19:35.6859974Z at
org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
2024-04-09T03:19:35.6861519Z at
org.apache.celeborn.CelebornFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(CelebornFunSuite.scala:35)
2024-04-09T03:19:35.6863041Z at
org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
2024-04-09T03:19:35.6864233Z at
org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
2024-04-09T03:19:35.6865355Z at
org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
2024-04-09T03:19:35.6866487Z at
org.apache.celeborn.CelebornFunSuite.run(CelebornFunSuite.scala:35)
2024-04-09T03:19:35.6867764Z at
org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
2024-04-09T03:19:35.6869119Z at
org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
2024-04-09T03:19:35.6870146Z at
sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
2024-04-09T03:19:35.6871069Z at
java.util.concurrent.FutureTask.run(FutureTask.java:266)
2024-04-09T03:19:35.6872490Z at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
2024-04-09T03:19:35.6873824Z at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
2024-04-09T03:19:35.6874805Z at java.lang.Thread.run(Thread.java:750)
2024-04-09T03:19:35.6887377Z 24/04/09 03:19:35,687 ERROR
[pool-1-thread-1-ScalaTest-running-StorageManagerSuite] StorageManager: Init
level DB failed:
2024-04-09T03:19:35.6889076Z java.io.IOException: Unable to create state
store
2024-04-09T03:19:35.6890473Z at
org.apache.celeborn.service.deploy.worker.shuffledb.RocksDBProvider.initRockDB(RocksDBProvider.java:98)
2024-04-09T03:19:35.6894605Z at
org.apache.celeborn.service.deploy.worker.shuffledb.DBProvider.initDB(DBProvider.java:39)
2024-04-09T03:19:35.6904452Z at
org.apache.celeborn.service.deploy.worker.storage.StorageManager.<init>(StorageManager.scala:216)
2024-04-09T03:19:35.6936013Z at
org.apache.celeborn.service.deploy.worker.storage.StorageManagerSuite.$anonfun$new$1(StorageManagerSuite.scala:30)
2024-04-09T03:19:35.6937634Z at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
2024-04-09T03:19:35.6938639Z at
org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
2024-04-09T03:19:35.6939493Z at
org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
2024-04-09T03:19:35.6940348Z at
org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
2024-04-09T03:19:35.6941200Z at
org.scalatest.Transformer.apply(Transformer.scala:22)
2024-04-09T03:19:35.6942029Z at
org.scalatest.Transformer.apply(Transformer.scala:20)
2024-04-09T03:19:35.6943079Z at
org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
2024-04-09T03:19:35.6944350Z at
org.apache.celeborn.CelebornFunSuite.withFixture(CelebornFunSuite.scala:157)
2024-04-09T03:19:35.6945683Z at
org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
2024-04-09T03:19:35.6947057Z at
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
2024-04-09T03:19:35.6948181Z at
org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
2024-04-09T03:19:35.6949222Z at
org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
2024-04-09T03:19:35.6950415Z at
org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
2024-04-09T03:19:35.6951915Z at
org.apache.celeborn.CelebornFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(CelebornFunSuite.scala:35)
2024-04-09T03:19:35.6953391Z at
org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
2024-04-09T03:19:35.6954811Z at
org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
2024-04-09T03:19:35.6955990Z at
org.apache.celeborn.CelebornFunSuite.runTest(CelebornFunSuite.scala:35)
2024-04-09T03:19:35.6957249Z at
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
2024-04-09T03:19:35.6958473Z at
org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
2024-04-09T03:19:35.6959465Z at
scala.collection.immutable.List.foreach(List.scala:431)
2024-04-09T03:19:35.6960422Z at
org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
2024-04-09T03:19:35.6961411Z at
org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
2024-04-09T03:19:35.6962347Z at
org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
2024-04-09T03:19:35.6963404Z at
org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
2024-04-09T03:19:35.6964635Z at
org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
2024-04-09T03:19:35.6965797Z at
org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
2024-04-09T03:19:35.6966679Z at org.scalatest.Suite.run(Suite.scala:1114)
2024-04-09T03:19:35.6967341Z at
org.scalatest.Suite.run$(Suite.scala:1096)
2024-04-09T03:19:35.6968835Z at
org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
2024-04-09T03:19:35.6970329Z at
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
2024-04-09T03:19:35.6971604Z at
org.scalatest.SuperEngine.runImpl(Engine.scala:535)
2024-04-09T03:19:35.6972573Z at
org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
2024-04-09T03:19:35.6973695Z at
org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
2024-04-09T03:19:35.6975279Z at
org.apache.celeborn.CelebornFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(CelebornFunSuite.scala:35)
2024-04-09T03:19:35.6976986Z at
org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
2024-04-09T03:19:35.6978212Z at
org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
2024-04-09T03:19:35.6979969Z at
org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
2024-04-09T03:19:35.6982049Z at
org.apache.celeborn.CelebornFunSuite.run(CelebornFunSuite.scala:35)
2024-04-09T03:19:35.6982851Z at
org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
2024-04-09T03:19:35.6983608Z at
org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
2024-04-09T03:19:35.6984185Z at
sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
2024-04-09T03:19:35.6984688Z at
java.util.concurrent.FutureTask.run(FutureTask.java:266)
2024-04-09T03:19:35.6985336Z at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
2024-04-09T03:19:35.6986127Z at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
2024-04-09T03:19:35.6986690Z at java.lang.Thread.run(Thread.java:750)
2024-04-09T03:19:35.6987383Z Caused by: org.rocksdb.RocksDBException: While
mkdir if missing: /tmp/recover/recovery.rdb: No such file or directory
2024-04-09T03:19:35.6988110Z at org.rocksdb.RocksDB.open(Native Method)
2024-04-09T03:19:35.6988520Z at
org.rocksdb.RocksDB.open(RocksDB.java:259)
2024-04-09T03:19:35.6989257Z at
org.apache.celeborn.service.deploy.worker.shuffledb.RocksDBProvider.initRockDB(RocksDBProvider.java:96)
2024-04-09T03:19:35.6989929Z ... 48 more
```
Because `mkdir` does not support creating non-existent multi-level
directory, `CreateDirIfMissing` does not support creation of non-existent
multi-level directory in
[CreateDirIfMissing](https://github.com/facebook/rocksdb/blob/main/env/fs_posix.cc#L637).
Meanwhile, `CreateDir` also does not support the creation in
[CreateDir](https://github.com/google/leveldb/blob/main/util/env_posix.cc#L625).
Therefore `LevelDBProvider`/`RocksDBProvider` should create non-existent
multi-level direct [...]
```
IOStatus CreateDirIfMissing(const std::string& name,
const IOOptions& /*opts*/,
IODebugContext* /*dbg*/) override {
if (mkdir(name.c_str(), 0755) != 0) {
if (errno != EEXIST) {
return IOError("While mkdir if missing", name, errno);
} else if (!DirExists(name)) { // Check that name is actually a
// directory.
// Message is taken from mkdir
return IOStatus::IOError("`" + name +
"' exists but is not a directory");
}
}
return IOStatus::OK();
}
```
```
Status CreateDir(const std::string& dirname) override {
if (::mkdir(dirname.c_str(), 0755) != 0) {
return PosixError(dirname, errno);
}
return Status::OK();
}
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
`DBProviderSuiteJ#testRockDBCheckVersionFailed`
Closes #2458 from SteNicholas/CELEBORN-1386.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../deploy/worker/shuffledb/LevelDBProvider.java | 41 ++++++++++++-------
.../deploy/worker/shuffledb/RocksDBProvider.java | 46 ++++++++++++++++------
.../deploy/worker/shuffledb/DBProviderSuiteJ.java | 24 +++++++++--
3 files changed, 81 insertions(+), 30 deletions(-)
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/LevelDBProvider.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/LevelDBProvider.java
index 7bac20415..839b47dba 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/LevelDBProvider.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/LevelDBProvider.java
@@ -21,6 +21,7 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import org.apache.commons.io.FileUtils;
import org.fusesource.leveldbjni.JniDBFactory;
import org.fusesource.leveldbjni.internal.NativeDB;
import org.iq80.leveldb.DB;
@@ -38,8 +39,7 @@ import org.apache.celeborn.common.util.PbSerDeUtils;
public class LevelDBProvider {
private static final Logger logger =
LoggerFactory.getLogger(LevelDBProvider.class);
- public static org.iq80.leveldb.DB initLevelDB(File dbFile, StoreVersion
version)
- throws IOException {
+ public static DB initLevelDB(File dbFile, StoreVersion version) throws
IOException {
org.iq80.leveldb.DB tmpDb = null;
if (dbFile != null) {
Options options = new Options();
@@ -50,7 +50,7 @@ public class LevelDBProvider {
} catch (NativeDB.DBException e) {
if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
logger.info("Creating state database at " + dbFile);
- options.createIfMissing(true);
+ createIfMissing(options, dbFile);
try {
tmpDb = JniDBFactory.factory.open(dbFile, options);
} catch (NativeDB.DBException dbExc) {
@@ -60,21 +60,21 @@ public class LevelDBProvider {
// the leveldb file seems to be corrupt somehow. Lets just blow it
away and create a new
// one, so we can keep processing new apps
logger.error(
- "error opening leveldb file {}. Creating new file, will not be
able to "
+ "Error opening leveldb file {}. Creating new file, will not be
able to "
+ "recover state for existing applications",
dbFile,
e);
if (dbFile.isDirectory()) {
for (File f : dbFile.listFiles()) {
if (!f.delete()) {
- logger.warn("error deleting {}", f.getPath());
+ logger.warn("Error deleting {}", f.getPath());
}
}
}
if (!dbFile.delete()) {
- logger.warn("error deleting {}", dbFile.getPath());
+ logger.warn("Error deleting {}", dbFile.getPath());
}
- options.createIfMissing(true);
+ createIfMissing(options, dbFile);
try {
tmpDb = JniDBFactory.factory.open(dbFile, options);
} catch (NativeDB.DBException dbExc) {
@@ -93,6 +93,19 @@ public class LevelDBProvider {
return tmpDb;
}
+ private static void createIfMissing(Options dbOptions, File dbFile) {
+ logger.info("Creating database file {} if missing", dbFile);
+ dbOptions.createIfMissing(true);
+ // LevelDB does not support creating non-existent multi-level directory.
+ if (!dbFile.exists()) {
+ try {
+ FileUtils.forceMkdir(dbFile);
+ } catch (IOException e) {
+ logger.warn("Failed to create database file {}", dbFile, e);
+ }
+ }
+ }
+
private static class LevelDBLogger implements org.iq80.leveldb.Logger {
private static final Logger LOG =
LoggerFactory.getLogger(LevelDBLogger.class);
@@ -107,26 +120,26 @@ public class LevelDBProvider {
* Minor version differences are allowed -- meaning we should be able to
read dbs that are either
* earlier *or* later on the minor version.
*/
- public static void checkVersion(DB db, StoreVersion newversion) throws
IOException {
+ public static void checkVersion(DB db, StoreVersion newVersion) throws
IOException {
byte[] bytes = db.get(StoreVersion.KEY);
if (bytes == null) {
- storeVersion(db, newversion);
+ storeVersion(db, newVersion);
} else {
ArrayList<Integer> versions = PbSerDeUtils.fromPbStoreVersion(bytes);
StoreVersion version = new StoreVersion(versions.get(0),
versions.get(1));
- if (version.major != newversion.major) {
+ if (version.major != newVersion.major) {
throw new IOException(
- "cannot read state DB with version "
+ "Cannot read state DB with version "
+ version
+ ", incompatible "
+ "with current version "
- + newversion);
+ + newVersion);
}
- storeVersion(db, newversion);
+ storeVersion(db, newVersion);
}
}
- public static void storeVersion(DB db, StoreVersion version) throws
IOException {
+ public static void storeVersion(DB db, StoreVersion version) {
db.put(StoreVersion.KEY, PbSerDeUtils.toPbStoreVersion(version.major,
version.minor));
}
}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/RocksDBProvider.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/RocksDBProvider.java
index 4229008c0..75f728a30 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/RocksDBProvider.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/RocksDBProvider.java
@@ -22,7 +22,14 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Objects;
-import org.rocksdb.*;
+import org.apache.commons.io.FileUtils;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.BloomFilter;
+import org.rocksdb.CompressionType;
+import org.rocksdb.InfoLogLevel;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,7 +74,7 @@ public class RocksDBProvider {
} catch (RocksDBException e) {
if (e.getStatus().getCode() == Status.Code.NotFound) {
logger.info("Creating state database at " + dbFile);
- dbOptions.setCreateIfMissing(true);
+ createIfMissing(dbOptions, dbFile);
try {
tmpDb = org.rocksdb.RocksDB.open(dbOptions, dbFile.toString());
} catch (RocksDBException dbExc) {
@@ -77,21 +84,21 @@ public class RocksDBProvider {
// the RocksDB file seems to be corrupt somehow. Let's just blow it
away and create
// a new one, so we can keep processing new apps
logger.error(
- "error opening rocksdb file {}. Creating new file, will not be
able to "
+ "Error opening rocksdb file {}. Creating new file, will not be
able to "
+ "recover state for existing applications",
dbFile,
e);
if (dbFile.isDirectory()) {
for (File f : Objects.requireNonNull(dbFile.listFiles())) {
if (!f.delete()) {
- logger.warn("error deleting {}", f.getPath());
+ logger.warn("Error deleting {}", f.getPath());
}
}
}
if (!dbFile.delete()) {
- logger.warn("error deleting {}", dbFile.getPath());
+ logger.warn("Error deleting {}", dbFile.getPath());
}
- dbOptions.setCreateIfMissing(true);
+ createIfMissing(dbOptions, dbFile);
try {
tmpDb = org.rocksdb.RocksDB.open(dbOptions, dbFile.toString());
} catch (RocksDBException dbExc) {
@@ -114,6 +121,19 @@ public class RocksDBProvider {
return tmpDb;
}
+ private static void createIfMissing(Options dbOptions, File dbFile) {
+ logger.info("Creating database file {} if missing", dbFile);
+ dbOptions.setCreateIfMissing(true);
+ // RocksDB does not support creating non-existent multi-level directory.
+ if (!dbFile.exists()) {
+ try {
+ FileUtils.forceMkdir(dbFile);
+ } catch (IOException e) {
+ logger.warn("Failed to create database file {}", dbFile, e);
+ }
+ }
+ }
+
private static class RocksDBLogger extends org.rocksdb.Logger {
private static final Logger LOG =
LoggerFactory.getLogger(RocksDBLogger.class);
@@ -134,28 +154,28 @@ public class RocksDBProvider {
* Minor version differences are allowed -- meaning we should be able to
read dbs that are either
* earlier *or* later on the minor version.
*/
- public static void checkVersion(org.rocksdb.RocksDB db, StoreVersion
newversion)
+ public static void checkVersion(org.rocksdb.RocksDB db, StoreVersion
newVersion)
throws IOException, RocksDBException {
byte[] bytes = db.get(StoreVersion.KEY);
if (bytes == null) {
- storeVersion(db, newversion);
+ storeVersion(db, newVersion);
} else {
ArrayList<Integer> versions = PbSerDeUtils.fromPbStoreVersion(bytes);
StoreVersion version = new StoreVersion(versions.get(0),
versions.get(1));
- if (version.major != newversion.major) {
+ if (version.major != newVersion.major) {
throw new IOException(
- "cannot read state DB with version "
+ "Cannot read state DB with version "
+ version
+ ", incompatible "
+ "with current version "
- + newversion);
+ + newVersion);
}
- storeVersion(db, newversion);
+ storeVersion(db, newVersion);
}
}
public static void storeVersion(org.rocksdb.RocksDB db, StoreVersion version)
- throws IOException, RocksDBException {
+ throws RocksDBException {
db.put(StoreVersion.KEY, PbSerDeUtils.toPbStoreVersion(version.major,
version.minor));
}
}
diff --git
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/shuffledb/DBProviderSuiteJ.java
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/shuffledb/DBProviderSuiteJ.java
index e9399210b..69f0aa190 100644
---
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/shuffledb/DBProviderSuiteJ.java
+++
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/shuffledb/DBProviderSuiteJ.java
@@ -23,15 +23,30 @@ import static org.junit.Assume.assumeFalse;
import java.io.File;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
import org.apache.commons.lang3.SystemUtils;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
import org.apache.celeborn.common.util.JavaUtils;
import org.apache.celeborn.common.util.Utils;
+@RunWith(Parameterized.class)
public class DBProviderSuiteJ {
+ @Parameter public boolean createDirectory;
+
+ @Parameters(name = "createDirectory: {0}")
+ public static Collection<Object> data() {
+ return Arrays.asList(new Object[] {true, false});
+ }
+
@Test
public void testRockDBCheckVersionFailed() throws IOException {
testCheckVersionFailed(DBBackend.ROCKSDB, "rocksdb");
@@ -44,8 +59,11 @@ public class DBProviderSuiteJ {
}
private void testCheckVersionFailed(DBBackend dbBackend, String namePrefix)
throws IOException {
- String root = System.getProperty("java.io.tmpdir");
- File dbFile = Utils.createDirectory(root, namePrefix);
+ File dbDir = new File(System.getProperty("java.io.tmpdir"), namePrefix);
+ File dbFile =
+ createDirectory
+ ? Utils.createDirectory(dbDir.getPath(), namePrefix)
+ : new File(dbDir.getPath(), String.format("%s-%s", namePrefix,
UUID.randomUUID()));
try {
StoreVersion v1 = new StoreVersion(1, 0);
DBProvider.initDB(dbBackend, dbFile, v1).close();
@@ -54,7 +72,7 @@ public class DBProviderSuiteJ {
assertThrows(IOException.class, () -> DBProvider.initDB(dbBackend,
dbFile, v2));
assertTrue(ioe.getMessage().contains("incompatible with current version
StoreVersion[2.0]"));
} finally {
- JavaUtils.deleteRecursively(dbFile);
+ JavaUtils.deleteRecursively(dbDir);
}
}
}