This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 794a681bf KUDU-3371 check for RocksDB dir presence upon opening
FSManager
794a681bf is described below
commit 794a681bfdb83fd902f953da6558fe462ee17aba
Author: Yingchun Lai <[email protected]>
AuthorDate: Mon Apr 8 00:29:35 2024 +0800
KUDU-3371 check for RocksDB dir presence upon opening FSManager
When using RocksDB to store LBM metadata, specify the
create_if_missing and error_if_exists options of
rocksdb::Options to make sure we can open the RocksDB
directory correctly. When creating a Kudu data directory,
open it with the options enabled, otherwise, open it with
the options disabled.
Change-Id: Iab4bffc6b902ab96edf0ca6c44f51c8db2670d52
Reviewed-on: http://gerrit.cloudera.org:8080/21295
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <[email protected]>
---
src/kudu/fs/data_dirs.cc | 7 +-
src/kudu/fs/dir_manager.cc | 42 +++++++++---
src/kudu/fs/dir_manager.h | 22 ++++---
src/kudu/fs/fs_manager-test.cc | 138 ++++++++++++++++++++++++++++++++++++++-
src/kudu/fs/log_block_manager.cc | 11 +---
5 files changed, 190 insertions(+), 30 deletions(-)
diff --git a/src/kudu/fs/data_dirs.cc b/src/kudu/fs/data_dirs.cc
index 730d284a9..f92d6a7c8 100644
--- a/src/kudu/fs/data_dirs.cc
+++ b/src/kudu/fs/data_dirs.cc
@@ -205,7 +205,8 @@ std::unique_ptr<Dir> DataDirManager::CreateNewDir(
std::string dir, std::unique_ptr<DirInstanceMetadataFile> metadata_file,
std::unique_ptr<ThreadPool> pool) {
if (FLAGS_block_manager == "logr") {
- return std::make_unique<RdbDir>(env, metrics, fs_type, std::move(dir),
+ bool newly_created = ContainsKey(created_fs_dir_paths_, dir);
+ return std::make_unique<RdbDir>(env, metrics, fs_type, newly_created,
std::move(dir),
std::move(metadata_file), std::move(pool));
}
return std::make_unique<Dir>(env, metrics, fs_type, std::move(dir),
@@ -231,6 +232,10 @@ Status DataDirManager::OpenExistingForTests(Env* env,
for (const auto& r : data_fs_roots) {
roots.push_back({ r, Status::OK() });
}
+
+ // Reset the existing DataDirManager before opening the new one to release
resources
+ // (e.g. RocksDB 'LOCK' file when --block_manager=logr) held by the existing
one.
+ dd_manager->reset();
return DataDirManager::OpenExisting(env, std::move(roots), opts, dd_manager);
}
diff --git a/src/kudu/fs/dir_manager.cc b/src/kudu/fs/dir_manager.cc
index cc5282c11..81d120e9c 100644
--- a/src/kudu/fs/dir_manager.cc
+++ b/src/kudu/fs/dir_manager.cc
@@ -208,12 +208,31 @@ int Dir::reserved_bytes() {
shared_ptr<rocksdb::Cache> RdbDir::s_block_cache_;
RdbDir::RdbDir(Env* env, DirMetrics* metrics,
FsType fs_type,
+ bool newly_created,
string dir,
unique_ptr<DirInstanceMetadataFile> metadata_file,
unique_ptr<ThreadPool> pool)
- : Dir(env, metrics, fs_type, std::move(dir), std::move(metadata_file),
std::move(pool)) {}
+ : Dir(env, metrics, fs_type, std::move(dir), std::move(metadata_file),
std::move(pool)) {
+ if (!metadata_file_->healthy()) {
+ LOG(WARNING) << Substitute("Skip initializing rocksdb instance for the
non-healthy "
+ "directory $0",
+ dir_);
+ return;
+ }
+
+ // Initialize the directory only if it's healthy.
+ // Note: the unhealthy directories will be kept, but will be skipped when
opening block manager.
+ auto s = InitRocksDBInstance(newly_created);
+ if (!s.ok()) {
+ s = s.CloneAndPrepend(Substitute("could not initialize $0", dir_));
+ LOG(WARNING) << s.ToString();
+ // Mark the directory as failed if it could not be initialized.
+ DCHECK(metadata_file_->healthy());
+ metadata_file_->SetInstanceFailed(s);
+ }
+}
-Status RdbDir::Prepare() {
+Status RdbDir::InitRocksDBInstance(bool newly_created) {
DCHECK_STREQ(FLAGS_block_manager.c_str(), "logr");
if (db_) {
// Some unit tests (e.g. BlockManagerTest.PersistenceTest) reopen the
block manager,
@@ -228,14 +247,15 @@ Status RdbDir::Prepare() {
// https://github.com/facebook/rocksdb/blob/main/include/rocksdb/options.h
rocksdb::Options opts;
// A RocksDB instance is created if it does not exist when opening the Dir.
- // TODO(yingchun): We should distinguish creating new data directory and
opening existing data
- // directory, and set proper options to avoid mishaps.
- // When creating new data directory, set
opts.error_if_exists = true.
- // When opening existing data directory, set
opts.create_if_missing = false.
- opts.create_if_missing = true;
+ if (newly_created) {
+ opts.create_if_missing = true;
+ opts.error_if_exists = true;
+ } else {
+ opts.create_if_missing = false;
+ opts.error_if_exists = false;
+ }
// TODO(yingchun): parameterize more rocksDB options, including:
// opts.use_fsync
- // opts.error_if_exists
// opts.db_log_dir
// opts.wal_dir
// opts.max_log_file_size
@@ -470,6 +490,8 @@ Status DirManager::CreateNewDirectoriesAndUpdateInstances(
// Success: don't delete any files.
deleter.cancel();
+ std::move(created_dirs.begin(), created_dirs.end(),
+ std::inserter(created_fs_dir_paths_, created_fs_dir_paths_.end()));
return Status::OK();
}
@@ -718,8 +740,8 @@ Status DirManager::Open() {
.set_max_threads(num_threads_per_dir_)
.set_trace_metric_prefix("dirs")
.Build(&pool));
- unique_ptr<Dir> new_dir = CreateNewDir(env_, metrics_.get(), fs_type, dir,
std::move(instance),
- std::move(pool));
+ unique_ptr<Dir> new_dir = CreateNewDir(env_, metrics_.get(), fs_type, dir,
+ std::move(instance),
std::move(pool));
dirs.emplace_back(std::move(new_dir));
}
diff --git a/src/kudu/fs/dir_manager.h b/src/kudu/fs/dir_manager.h
index ffefc391c..470f62607 100644
--- a/src/kudu/fs/dir_manager.h
+++ b/src/kudu/fs/dir_manager.h
@@ -121,9 +121,6 @@ class Dir {
std::unique_ptr<ThreadPool> pool);
virtual ~Dir();
- // Some preparatory work before opening the directory.
- virtual Status Prepare() { return Status::OK(); }
-
// Shuts down this dir's thread pool, waiting for any closures submitted via
// ExecClosure() to finish first.
virtual void Shutdown();
@@ -206,21 +203,24 @@ class RdbDir: public Dir {
RdbDir(Env* env,
DirMetrics* metrics,
FsType fs_type,
+ bool newly_created,
std::string dir,
std::unique_ptr<DirInstanceMetadataFile> metadata_file,
std::unique_ptr<ThreadPool> pool);
- // Initialize the RocksDB instance for the directory.
- //
- // Returns Status::OK() if prepared successfully, otherwise returns non-OK.
- Status Prepare() override;
-
// Similar to Dir::Shutdown(), but close the RocksDB instance additionally.
void Shutdown() override;
rocksdb::DB* rdb();
private:
+ // Initialize the RocksDB instance for the directory.
+ //
+ // 'newly_created' indicates whether this is a newly created directory when
+ // opening DirManager.
+ // Returns Status::OK() if prepared successfully, otherwise returns non-OK.
+ Status InitRocksDBInstance(bool newly_created);
+
// The shared RocksDB instance for this directory.
std::unique_ptr<rocksdb::DB> db_;
// The RocksDB full path.
@@ -468,6 +468,12 @@ class DirManager {
// Common roots in the collections have been deduplicated.
const CanonicalizedRootsList canonicalized_fs_roots_;
+ // Absolute paths of the FS directories created while opening the FSManager.
+ // This container is being populated when creating the initial file system
+ // layout with Create(), or by calling Open() on a newly added data
directory.
+ // It's safe to use it after DirManager is opened completely.
+ std::set<std::string> created_fs_dir_paths_;
+
// Directories tracked by this manager.
std::vector<std::unique_ptr<Dir>> dirs_;
diff --git a/src/kudu/fs/fs_manager-test.cc b/src/kudu/fs/fs_manager-test.cc
index 7da7803a2..a39d0ad06 100644
--- a/src/kudu/fs/fs_manager-test.cc
+++ b/src/kudu/fs/fs_manager-test.cc
@@ -39,6 +39,8 @@
#include <glog/logging.h>
#include <glog/stl_logging.h>
#include <gtest/gtest.h>
+#include <rocksdb/db.h>
+#include <rocksdb/options.h>
#include "kudu/fs/block_manager.h"
#include "kudu/fs/data_dirs.h"
@@ -944,7 +946,6 @@ TEST_P(FsManagerTestBase,
TestOpenFailsWhenMissingImportantDir) {
ASSERT_STR_CONTAINS(s.ToString(), "exists but is not a directory");
}
-
TEST_P(FsManagerTestBase, TestAddRemoveDataDirs) {
if (FLAGS_block_manager == "file") {
GTEST_SKIP() << "Skipping test, file block manager not supported";
@@ -1405,6 +1406,141 @@ TEST_P(FsManagerTestBase,
TestFailToStartWithoutEncryptionKeys) {
ASSERT_TRUE(fs_manager()->Open().IsIllegalState());
}
+TEST_P(FsManagerTestBase, TestOpenDirectoryWithRdbMissing) {
+ if (FLAGS_block_manager != "logr") {
+ GTEST_SKIP() << "Skipping 'logr'-specific test";
+ }
+
+ // Add a new data dir.
+ const string new_path = GetTestPath("new_path");
+ FsManagerOpts opts;
+ opts.wal_root = fs_root_;
+ opts.data_roots = { fs_root_, new_path };
+ ReinitFsManagerWithOpts(opts);
+ // Opening the fs manager succeeds, both of the 2 dirs are healthy.
+ ASSERT_OK(fs_manager()->Open());
+ ASSERT_EQ(2, fs_manager()->dd_manager()->GetDirs().size());
+ ASSERT_EQ(0, fs_manager()->dd_manager()->GetFailedDirs().size());
+
+ // Write some data and reopen the fs manager, then some *.sst files will be
generated.
+ for (int i = 0; i < 1000; i++) {
+ TestReadWriteDataFile(Slice("test0"));
+ }
+ ReinitFsManagerWithOpts(opts);
+ ASSERT_OK(fs_manager()->Open());
+
+ // 1. Corrupt the content of the RocksDB directory (by removing one *.sst
file) in 'new_path'.
+ {
+ const auto rdb_dir = JoinPathSegments(JoinPathSegments(new_path,
kDataDirName),
+ kRocksDBDirName);
+ std::vector<std::string> children;
+ ASSERT_OK(GetEnv()->GetChildren(rdb_dir, &children));
+ ASSERT_FALSE(children.empty());
+ std::vector<std::string> sst_files;
+ for (const auto& child : children) {
+ if (HasSuffixString(child, ".sst")) {
+ sst_files.push_back(child);
+ }
+ }
+ ASSERT_FALSE(sst_files.empty());
+ // Delete a *.sst file to corrupt the RocksDB.
+ ASSERT_OK(GetEnv()->DeleteFile(JoinPathSegments(rdb_dir, sst_files[0])));
+
+ ReinitFsManagerWithOpts(opts);
+ // Opening the fs manager succeeds, but 1 dir is failed with RocksDB
related error.
+ ASSERT_OK(fs_manager()->Open());
+ ASSERT_EQ(2, fs_manager()->dd_manager()->GetDirs().size());
+ ASSERT_EQ(1, fs_manager()->dd_manager()->GetFailedDirs().size());
+ const auto uuid_idx =
*(fs_manager()->dd_manager()->GetFailedDirs().begin());
+ const auto* failed_dir =
fs_manager()->dd_manager()->FindDirByUuidIndex(uuid_idx);
+ ASSERT_STR_CONTAINS(failed_dir->dir(), new_path);
+ ASSERT_STR_CONTAINS(failed_dir->instance()->health_status().ToString(),
+ "Corruption: IO error: No such file or directory: "
+ "While open a file for random read");
+ }
+
+ // 2. Remove all the content under RocksDB top-level directory in 'new_path'.
+ {
+ const auto rdb_dir = JoinPathSegments(JoinPathSegments(new_path,
kDataDirName),
+ kRocksDBDirName);
+ ASSERT_OK(GetEnv()->DeleteRecursively(rdb_dir));
+ ASSERT_OK(GetEnv()->CreateDir(rdb_dir));
+ ReinitFsManagerWithOpts(opts);
+ // Opening the fs manager succeeds, but 1 dir is failed with RocksDB
related error.
+ ASSERT_OK(fs_manager()->Open());
+ ASSERT_EQ(2, fs_manager()->dd_manager()->GetDirs().size());
+ ASSERT_EQ(1, fs_manager()->dd_manager()->GetFailedDirs().size());
+ const auto uuid_idx =
*(fs_manager()->dd_manager()->GetFailedDirs().begin());
+ const auto* failed_dir =
fs_manager()->dd_manager()->FindDirByUuidIndex(uuid_idx);
+ ASSERT_STR_CONTAINS(failed_dir->dir(), new_path);
+ ASSERT_STR_CONTAINS(failed_dir->instance()->health_status().ToString(),
+ "rdb/CURRENT: does not exist (create_if_missing is
false)");
+ }
+
+ // 3. Remove the RocksDB top-level directory in 'new_path'.
+ {
+ const auto rdb_dir = JoinPathSegments(JoinPathSegments(new_path,
kDataDirName),
+ kRocksDBDirName);
+ ASSERT_OK(GetEnv()->DeleteRecursively(rdb_dir));
+ ReinitFsManagerWithOpts(opts);
+ // Opening the fs manager succeeds, but 1 dir is failed with RocksDB
related error.
+ ASSERT_OK(fs_manager()->Open());
+ ASSERT_EQ(2, fs_manager()->dd_manager()->GetDirs().size());
+ ASSERT_EQ(1, fs_manager()->dd_manager()->GetFailedDirs().size());
+ const auto uuid_idx =
*(fs_manager()->dd_manager()->GetFailedDirs().begin());
+ const auto* failed_dir =
fs_manager()->dd_manager()->FindDirByUuidIndex(uuid_idx);
+ ASSERT_STR_CONTAINS(failed_dir->dir(), new_path);
+ ASSERT_STR_CONTAINS(failed_dir->instance()->health_status().ToString(),
+ "rdb/CURRENT: does not exist (create_if_missing is
false)");
+ }
+
+ // 4. Remove the RocksDB top-level directory in 'fs_root_' as well.
+ {
+ const auto rdb_dir = JoinPathSegments(JoinPathSegments(fs_root_,
kDataDirName),
+ kRocksDBDirName);
+ ASSERT_OK(GetEnv()->DeleteRecursively(rdb_dir));
+ ReinitFsManagerWithOpts(opts);
+ // Opening the fs manager failed, both of the 2 dirs are failed with
RocksDB related error.
+ const auto s = fs_manager()->Open();
+ ASSERT_TRUE(s.IsIOError()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "All data dirs failed to open");
+ ASSERT_EQ(2, fs_manager()->dd_manager()->GetDirs().size());
+ ASSERT_EQ(2, fs_manager()->dd_manager()->GetFailedDirs().size());
+ for (const auto &uuid_idx : fs_manager()->dd_manager()->GetFailedDirs()) {
+ const auto* failed_dir =
fs_manager()->dd_manager()->FindDirByUuidIndex(uuid_idx);
+ ASSERT_STR_CONTAINS(failed_dir->instance()->health_status().ToString(),
+ "rdb/CURRENT: does not exist (create_if_missing is
false)");
+ }
+ }
+}
+
+// This test is similar to FsManagerTestBase.TestCannotUseNonEmptyFsRoot,
+// but this one is 'logr'-specific.
+TEST_P(FsManagerTestBase, TestInitialOpenDirectoryWithRdbPresent) {
+ if (FLAGS_block_manager != "logr") {
+ GTEST_SKIP() << "Skipping 'logr'-specific test";
+ }
+
+ // Use a new data dir.
+ const auto& new_path = GetTestPath("new_path");
+ ReinitFsManagerWithPaths(new_path, { new_path });
+
+ // Create the RocksDB dir before opening.
+ const auto rdb_dir = JoinPathSegments(new_path, kDataDirName);
+ ASSERT_OK(GetEnv()->CreateDir(new_path));
+ ASSERT_OK(GetEnv()->CreateDir(rdb_dir));
+ rocksdb::Options opts;
+ opts.create_if_missing = true;
+ opts.error_if_exists = true;
+ rocksdb::DB* db = nullptr;
+ ASSERT_OK(FromRdbStatus(rocksdb::DB::Open(opts, rdb_dir, &db)));
+ delete db;
+
+ auto s = fs_manager()->CreateInitialFileSystemLayout();
+ ASSERT_TRUE(s.IsAlreadyPresent()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "FSManager roots already exist");
+}
+
class OpenFsTypeTest : public KuduTest,
public ::testing::WithParamInterface<std::tuple<string,
bool, bool>> {
public:
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index e4654e5a1..7e0493896 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -3392,17 +3392,8 @@ void LogBlockManager::OpenDataDir(
Status* result_status,
std::atomic<int>* containers_processed,
std::atomic<int>* containers_total) {
- Status s = dir->Prepare();
- if (!s.ok()) {
- HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(
- ErrorHandlerType::DISK_ERROR, dir));
- *result_status = s.CloneAndPrepend(Substitute(
- "Could not initialize $0", dir->dir()));
- return;
- }
-
vector<string> children;
- s = env_->GetChildren(dir->dir(), &children);
+ Status s = env_->GetChildren(dir->dir(), &children);
if (!s.ok()) {
HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(
ErrorHandlerType::DISK_ERROR, dir, tenant_id()));