This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 794a681bf KUDU-3371 check for RocksDB dir presence upon opening 
FSManager
794a681bf is described below

commit 794a681bfdb83fd902f953da6558fe462ee17aba
Author: Yingchun Lai <[email protected]>
AuthorDate: Mon Apr 8 00:29:35 2024 +0800

    KUDU-3371 check for RocksDB dir presence upon opening FSManager
    
    When using RocksDB to store LBM metadata, specify the
    create_if_missing and error_if_exists options of
    rocksdb::Options to make sure we can open the RocksDB
    directory correctly. When creating a Kudu data directory,
    open it with the options enabled, otherwise, open it with
    the options disabled.
    
    Change-Id: Iab4bffc6b902ab96edf0ca6c44f51c8db2670d52
    Reviewed-on: http://gerrit.cloudera.org:8080/21295
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <[email protected]>
---
 src/kudu/fs/data_dirs.cc         |   7 +-
 src/kudu/fs/dir_manager.cc       |  42 +++++++++---
 src/kudu/fs/dir_manager.h        |  22 ++++---
 src/kudu/fs/fs_manager-test.cc   | 138 ++++++++++++++++++++++++++++++++++++++-
 src/kudu/fs/log_block_manager.cc |  11 +---
 5 files changed, 190 insertions(+), 30 deletions(-)

diff --git a/src/kudu/fs/data_dirs.cc b/src/kudu/fs/data_dirs.cc
index 730d284a9..f92d6a7c8 100644
--- a/src/kudu/fs/data_dirs.cc
+++ b/src/kudu/fs/data_dirs.cc
@@ -205,7 +205,8 @@ std::unique_ptr<Dir> DataDirManager::CreateNewDir(
     std::string dir, std::unique_ptr<DirInstanceMetadataFile> metadata_file,
     std::unique_ptr<ThreadPool> pool) {
   if (FLAGS_block_manager == "logr") {
-    return std::make_unique<RdbDir>(env, metrics, fs_type, std::move(dir),
+    bool newly_created = ContainsKey(created_fs_dir_paths_, dir);
+    return std::make_unique<RdbDir>(env, metrics, fs_type, newly_created, 
std::move(dir),
                                     std::move(metadata_file), std::move(pool));
   }
   return std::make_unique<Dir>(env, metrics, fs_type, std::move(dir),
@@ -231,6 +232,10 @@ Status DataDirManager::OpenExistingForTests(Env* env,
   for (const auto& r : data_fs_roots) {
     roots.push_back({ r, Status::OK() });
   }
+
+  // Reset the existing DataDirManager before opening the new one to release 
resources
+  // (e.g. RocksDB 'LOCK' file when --block_manager=logr) held by the existing 
one.
+  dd_manager->reset();
   return DataDirManager::OpenExisting(env, std::move(roots), opts, dd_manager);
 }
 
diff --git a/src/kudu/fs/dir_manager.cc b/src/kudu/fs/dir_manager.cc
index cc5282c11..81d120e9c 100644
--- a/src/kudu/fs/dir_manager.cc
+++ b/src/kudu/fs/dir_manager.cc
@@ -208,12 +208,31 @@ int Dir::reserved_bytes() {
 shared_ptr<rocksdb::Cache> RdbDir::s_block_cache_;
 RdbDir::RdbDir(Env* env, DirMetrics* metrics,
                FsType fs_type,
+               bool newly_created,
                string dir,
                unique_ptr<DirInstanceMetadataFile> metadata_file,
                unique_ptr<ThreadPool> pool)
-    : Dir(env, metrics, fs_type, std::move(dir), std::move(metadata_file), 
std::move(pool)) {}
+    : Dir(env, metrics, fs_type, std::move(dir), std::move(metadata_file), 
std::move(pool)) {
+  if (!metadata_file_->healthy()) {
+    LOG(WARNING) << Substitute("Skip initializing rocksdb instance for the 
non-healthy "
+                               "directory $0",
+                               dir_);
+    return;
+  }
+
+  // Initialize the directory only if it's healthy.
+  // Note: the unhealthy directories will be kept, but will be skipped when 
opening block manager.
+  auto s = InitRocksDBInstance(newly_created);
+  if (!s.ok()) {
+    s = s.CloneAndPrepend(Substitute("could not initialize $0", dir_));
+    LOG(WARNING) << s.ToString();
+    // Mark the directory as failed if it could not be initialized.
+    DCHECK(metadata_file_->healthy());
+    metadata_file_->SetInstanceFailed(s);
+  }
+}
 
-Status RdbDir::Prepare() {
+Status RdbDir::InitRocksDBInstance(bool newly_created) {
   DCHECK_STREQ(FLAGS_block_manager.c_str(), "logr");
   if (db_) {
     // Some unit tests (e.g. BlockManagerTest.PersistenceTest) reopen the 
block manager,
@@ -228,14 +247,15 @@ Status RdbDir::Prepare() {
   // https://github.com/facebook/rocksdb/blob/main/include/rocksdb/options.h
   rocksdb::Options opts;
   // A RocksDB instance is created if it does not exist when opening the Dir.
-  // TODO(yingchun): We should distinguish creating new data directory and 
opening existing data
-  //                 directory, and set proper options to avoid mishaps.
-  //                 When creating new data directory, set 
opts.error_if_exists = true.
-  //                 When opening existing data directory, set 
opts.create_if_missing = false.
-  opts.create_if_missing = true;
+  if (newly_created) {
+      opts.create_if_missing = true;
+      opts.error_if_exists = true;
+  } else {
+      opts.create_if_missing = false;
+      opts.error_if_exists = false;
+  }
   // TODO(yingchun): parameterize more rocksDB options, including:
   //  opts.use_fsync
-  //  opts.error_if_exists
   //  opts.db_log_dir
   //  opts.wal_dir
   //  opts.max_log_file_size
@@ -470,6 +490,8 @@ Status DirManager::CreateNewDirectoriesAndUpdateInstances(
 
   // Success: don't delete any files.
   deleter.cancel();
+  std::move(created_dirs.begin(), created_dirs.end(),
+            std::inserter(created_fs_dir_paths_, created_fs_dir_paths_.end()));
   return Status::OK();
 }
 
@@ -718,8 +740,8 @@ Status DirManager::Open() {
                   .set_max_threads(num_threads_per_dir_)
                   .set_trace_metric_prefix("dirs")
                   .Build(&pool));
-    unique_ptr<Dir> new_dir = CreateNewDir(env_, metrics_.get(), fs_type, dir, 
std::move(instance),
-                                           std::move(pool));
+    unique_ptr<Dir> new_dir = CreateNewDir(env_, metrics_.get(), fs_type, dir,
+                                           std::move(instance), 
std::move(pool));
     dirs.emplace_back(std::move(new_dir));
   }
 
diff --git a/src/kudu/fs/dir_manager.h b/src/kudu/fs/dir_manager.h
index ffefc391c..470f62607 100644
--- a/src/kudu/fs/dir_manager.h
+++ b/src/kudu/fs/dir_manager.h
@@ -121,9 +121,6 @@ class Dir {
       std::unique_ptr<ThreadPool> pool);
   virtual ~Dir();
 
-  // Some preparatory work before opening the directory.
-  virtual Status Prepare() { return Status::OK(); }
-
   // Shuts down this dir's thread pool, waiting for any closures submitted via
   // ExecClosure() to finish first.
   virtual void Shutdown();
@@ -206,21 +203,24 @@ class RdbDir: public Dir {
   RdbDir(Env* env,
          DirMetrics* metrics,
          FsType fs_type,
+         bool newly_created,
          std::string dir,
          std::unique_ptr<DirInstanceMetadataFile> metadata_file,
          std::unique_ptr<ThreadPool> pool);
 
-  // Initialize the RocksDB instance for the directory.
-  //
-  // Returns Status::OK() if prepared successfully, otherwise returns non-OK.
-  Status Prepare() override;
-
   // Similar to Dir::Shutdown(), but close the RocksDB instance additionally.
   void Shutdown() override;
 
   rocksdb::DB* rdb();
 
  private:
+  // Initialize the RocksDB instance for the directory.
+  //
+  // 'newly_created' indicates whether this is a newly created directory when
+  // opening DirManager.
+  // Returns Status::OK() if prepared successfully, otherwise returns non-OK.
+  Status InitRocksDBInstance(bool newly_created);
+
   // The shared RocksDB instance for this directory.
   std::unique_ptr<rocksdb::DB> db_;
   // The RocksDB full path.
@@ -468,6 +468,12 @@ class DirManager {
   // Common roots in the collections have been deduplicated.
   const CanonicalizedRootsList canonicalized_fs_roots_;
 
+  // Absolute paths of the FS directories created while opening the FSManager.
+  // This container is being populated when creating the initial file system
+  // layout with Create(), or by calling Open() on a newly added data 
directory.
+  // It's safe to use it after DirManager is opened completely.
+  std::set<std::string> created_fs_dir_paths_;
+
   // Directories tracked by this manager.
   std::vector<std::unique_ptr<Dir>> dirs_;
 
diff --git a/src/kudu/fs/fs_manager-test.cc b/src/kudu/fs/fs_manager-test.cc
index 7da7803a2..a39d0ad06 100644
--- a/src/kudu/fs/fs_manager-test.cc
+++ b/src/kudu/fs/fs_manager-test.cc
@@ -39,6 +39,8 @@
 #include <glog/logging.h>
 #include <glog/stl_logging.h>
 #include <gtest/gtest.h>
+#include <rocksdb/db.h>
+#include <rocksdb/options.h>
 
 #include "kudu/fs/block_manager.h"
 #include "kudu/fs/data_dirs.h"
@@ -944,7 +946,6 @@ TEST_P(FsManagerTestBase, 
TestOpenFailsWhenMissingImportantDir) {
   ASSERT_STR_CONTAINS(s.ToString(), "exists but is not a directory");
 }
 
-
 TEST_P(FsManagerTestBase, TestAddRemoveDataDirs) {
   if (FLAGS_block_manager == "file") {
     GTEST_SKIP() << "Skipping test, file block manager not supported";
@@ -1405,6 +1406,141 @@ TEST_P(FsManagerTestBase, 
TestFailToStartWithoutEncryptionKeys) {
   ASSERT_TRUE(fs_manager()->Open().IsIllegalState());
 }
 
+TEST_P(FsManagerTestBase, TestOpenDirectoryWithRdbMissing) {
+  if (FLAGS_block_manager != "logr") {
+    GTEST_SKIP() << "Skipping 'logr'-specific test";
+  }
+
+  // Add a new data dir.
+  const string new_path = GetTestPath("new_path");
+  FsManagerOpts opts;
+  opts.wal_root = fs_root_;
+  opts.data_roots = { fs_root_, new_path };
+  ReinitFsManagerWithOpts(opts);
+  // Opening the fs manager succeeds, both of the 2 dirs are healthy.
+  ASSERT_OK(fs_manager()->Open());
+  ASSERT_EQ(2, fs_manager()->dd_manager()->GetDirs().size());
+  ASSERT_EQ(0, fs_manager()->dd_manager()->GetFailedDirs().size());
+
+  // Write some data and reopen the fs manager, then some *.sst files will be 
generated.
+  for (int i = 0; i < 1000; i++) {
+    TestReadWriteDataFile(Slice("test0"));
+  }
+  ReinitFsManagerWithOpts(opts);
+  ASSERT_OK(fs_manager()->Open());
+
+  // 1. Corrupt the content of the RocksDB directory (by removing one *.sst 
file) in 'new_path'.
+  {
+    const auto rdb_dir = JoinPathSegments(JoinPathSegments(new_path, 
kDataDirName),
+                                          kRocksDBDirName);
+    std::vector<std::string> children;
+    ASSERT_OK(GetEnv()->GetChildren(rdb_dir, &children));
+    ASSERT_FALSE(children.empty());
+    std::vector<std::string> sst_files;
+    for (const auto& child : children) {
+      if (HasSuffixString(child, ".sst")) {
+        sst_files.push_back(child);
+      }
+    }
+    ASSERT_FALSE(sst_files.empty());
+    // Delete a *.sst file to corrupt the RocksDB.
+    ASSERT_OK(GetEnv()->DeleteFile(JoinPathSegments(rdb_dir, sst_files[0])));
+
+    ReinitFsManagerWithOpts(opts);
+    // Opening the fs manager succeeds, but 1 dir is failed with RocksDB 
related error.
+    ASSERT_OK(fs_manager()->Open());
+    ASSERT_EQ(2, fs_manager()->dd_manager()->GetDirs().size());
+    ASSERT_EQ(1, fs_manager()->dd_manager()->GetFailedDirs().size());
+    const auto uuid_idx = 
*(fs_manager()->dd_manager()->GetFailedDirs().begin());
+    const auto* failed_dir = 
fs_manager()->dd_manager()->FindDirByUuidIndex(uuid_idx);
+    ASSERT_STR_CONTAINS(failed_dir->dir(), new_path);
+    ASSERT_STR_CONTAINS(failed_dir->instance()->health_status().ToString(),
+                        "Corruption: IO error: No such file or directory: "
+                        "While open a file for random read");
+  }
+
+  // 2. Remove all the content under RocksDB top-level directory in 'new_path'.
+  {
+    const auto rdb_dir = JoinPathSegments(JoinPathSegments(new_path, 
kDataDirName),
+                                          kRocksDBDirName);
+    ASSERT_OK(GetEnv()->DeleteRecursively(rdb_dir));
+    ASSERT_OK(GetEnv()->CreateDir(rdb_dir));
+    ReinitFsManagerWithOpts(opts);
+    // Opening the fs manager succeeds, but 1 dir is failed with RocksDB 
related error.
+    ASSERT_OK(fs_manager()->Open());
+    ASSERT_EQ(2, fs_manager()->dd_manager()->GetDirs().size());
+    ASSERT_EQ(1, fs_manager()->dd_manager()->GetFailedDirs().size());
+    const auto uuid_idx = 
*(fs_manager()->dd_manager()->GetFailedDirs().begin());
+    const auto* failed_dir = 
fs_manager()->dd_manager()->FindDirByUuidIndex(uuid_idx);
+    ASSERT_STR_CONTAINS(failed_dir->dir(), new_path);
+    ASSERT_STR_CONTAINS(failed_dir->instance()->health_status().ToString(),
+                        "rdb/CURRENT: does not exist (create_if_missing is 
false)");
+  }
+
+  // 3. Remove the RocksDB top-level directory in 'new_path'.
+  {
+    const auto rdb_dir = JoinPathSegments(JoinPathSegments(new_path, 
kDataDirName),
+                                          kRocksDBDirName);
+    ASSERT_OK(GetEnv()->DeleteRecursively(rdb_dir));
+    ReinitFsManagerWithOpts(opts);
+    // Opening the fs manager succeeds, but 1 dir is failed with RocksDB 
related error.
+    ASSERT_OK(fs_manager()->Open());
+    ASSERT_EQ(2, fs_manager()->dd_manager()->GetDirs().size());
+    ASSERT_EQ(1, fs_manager()->dd_manager()->GetFailedDirs().size());
+    const auto uuid_idx = 
*(fs_manager()->dd_manager()->GetFailedDirs().begin());
+    const auto* failed_dir = 
fs_manager()->dd_manager()->FindDirByUuidIndex(uuid_idx);
+    ASSERT_STR_CONTAINS(failed_dir->dir(), new_path);
+    ASSERT_STR_CONTAINS(failed_dir->instance()->health_status().ToString(),
+                        "rdb/CURRENT: does not exist (create_if_missing is 
false)");
+  }
+
+  // 4. Remove the RocksDB top-level directory in 'fs_root_' as well.
+  {
+    const auto rdb_dir = JoinPathSegments(JoinPathSegments(fs_root_, 
kDataDirName),
+                                          kRocksDBDirName);
+    ASSERT_OK(GetEnv()->DeleteRecursively(rdb_dir));
+    ReinitFsManagerWithOpts(opts);
+    // Opening the fs manager failed, both of the 2 dirs are failed with 
RocksDB related error.
+    const auto s = fs_manager()->Open();
+    ASSERT_TRUE(s.IsIOError()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "All data dirs failed to open");
+    ASSERT_EQ(2, fs_manager()->dd_manager()->GetDirs().size());
+    ASSERT_EQ(2, fs_manager()->dd_manager()->GetFailedDirs().size());
+    for (const auto &uuid_idx : fs_manager()->dd_manager()->GetFailedDirs()) {
+      const auto* failed_dir = 
fs_manager()->dd_manager()->FindDirByUuidIndex(uuid_idx);
+      ASSERT_STR_CONTAINS(failed_dir->instance()->health_status().ToString(),
+                          "rdb/CURRENT: does not exist (create_if_missing is 
false)");
+    }
+  }
+}
+
+// This test is similar to FsManagerTestBase.TestCannotUseNonEmptyFsRoot,
+// but this one is 'logr'-specific.
+TEST_P(FsManagerTestBase, TestInitialOpenDirectoryWithRdbPresent) {
+  if (FLAGS_block_manager != "logr") {
+    GTEST_SKIP() << "Skipping 'logr'-specific test";
+  }
+
+  // Use a new data dir.
+  const auto& new_path = GetTestPath("new_path");
+  ReinitFsManagerWithPaths(new_path, { new_path });
+
+  // Create the RocksDB dir before opening.
+  const auto rdb_dir = JoinPathSegments(new_path, kDataDirName);
+  ASSERT_OK(GetEnv()->CreateDir(new_path));
+  ASSERT_OK(GetEnv()->CreateDir(rdb_dir));
+  rocksdb::Options opts;
+  opts.create_if_missing = true;
+  opts.error_if_exists = true;
+  rocksdb::DB* db = nullptr;
+  ASSERT_OK(FromRdbStatus(rocksdb::DB::Open(opts, rdb_dir, &db)));
+  delete db;
+
+  auto s = fs_manager()->CreateInitialFileSystemLayout();
+  ASSERT_TRUE(s.IsAlreadyPresent()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "FSManager roots already exist");
+}
+
 class OpenFsTypeTest : public KuduTest,
                        public ::testing::WithParamInterface<std::tuple<string, 
bool, bool>> {
  public:
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index e4654e5a1..7e0493896 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -3392,17 +3392,8 @@ void LogBlockManager::OpenDataDir(
     Status* result_status,
     std::atomic<int>* containers_processed,
     std::atomic<int>* containers_total) {
-  Status s = dir->Prepare();
-  if (!s.ok()) {
-    HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(
-        ErrorHandlerType::DISK_ERROR, dir));
-    *result_status = s.CloneAndPrepend(Substitute(
-        "Could not initialize $0", dir->dir()));
-    return;
-  }
-
   vector<string> children;
-  s = env_->GetChildren(dir->dir(), &children);
+  Status s = env_->GetChildren(dir->dir(), &children);
   if (!s.ok()) {
     HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(
         ErrorHandlerType::DISK_ERROR, dir, tenant_id()));

Reply via email to