This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 4dcbb1e28 feat: skip IO_ERROR dir_node when assign replicas (#1512)
4dcbb1e28 is described below
commit 4dcbb1e28188fe855043dd84b3c1f03bd439aa7e
Author: Yingchun Lai <[email protected]>
AuthorDate: Thu Jun 8 10:53:29 2023 +0800
feat: skip IO_ERROR dir_node when assign replicas (#1512)
https://github.com/apache/incubator-pegasus/issues/1383
A disk (a.k.a node_dir in Pegasus) is possible to become SPACE_INSUFFICIENT
or
IO_ERROR from NORMAL, meanwhile, it's possible to recovery from
SPACE_INSUFFICIENT
to NORMAL. So we can keep all node_dirs in system, but only reject to assign
replicas on abnormal node_dirs, reject to do write type of operations on
abnormal
node_dirs.
This patch also update some unit tests.
---
idl/metadata.thrift | 7 ++++++-
src/common/fs_manager.cpp | 35 ++++++++++++++++++++++++++++------
src/common/fs_manager.h | 5 ++++-
src/common/replication_enums.h | 1 +
src/common/test/fs_manager_test.cpp | 7 ++++---
src/replica/disk_cleaner.cpp | 7 +++++++
src/replica/replica_2pc.cpp | 21 +++++++++++++++++---
src/replica/replica_check.cpp | 1 +
src/replica/replica_context.cpp | 9 +++++----
src/replica/replica_context.h | 2 +-
src/replica/replica_disk_migrator.cpp | 1 +
src/replica/replica_stub.cpp | 4 ++++
src/replica/test/replica_disk_test.cpp | 1 +
src/utils/error_code.h | 2 ++
14 files changed, 84 insertions(+), 19 deletions(-)
diff --git a/idl/metadata.thrift b/idl/metadata.thrift
index 7c8448bee..5a7d3e4b3 100644
--- a/idl/metadata.thrift
+++ b/idl/metadata.thrift
@@ -58,7 +58,12 @@ enum split_status
enum disk_status
{
NORMAL = 0,
- SPACE_INSUFFICIENT
+ // Indicate the disk is in space insufficiency. See config
+ // [replication].disk_min_available_space_ratio for more details.
+ SPACE_INSUFFICIENT,
+ // Indicate the disk is in IO error. The disk will be marked as IO_ERROR
+ // when it's read/write unavailable.
+ IO_ERROR
}
enum manual_compaction_status
diff --git a/src/common/fs_manager.cpp b/src/common/fs_manager.cpp
index a33d20e54..3d3eda94f 100644
--- a/src/common/fs_manager.cpp
+++ b/src/common/fs_manager.cpp
@@ -127,7 +127,8 @@ void dir_node::update_disk_stat()
disk_available_ratio = static_cast<int>(
disk_capacity_mb == 0 ? 0 : std::round(disk_available_mb * 100.0 /
disk_capacity_mb));
- auto old_status = status;
+ // It's able to change status from NORMAL to SPACE_INSUFFICIENT, and vice
versa.
+ disk_status::type old_status = status;
auto new_status = disk_available_ratio <
FLAGS_disk_min_available_space_ratio
? disk_status::SPACE_INSUFFICIENT
: disk_status::NORMAL;
@@ -202,6 +203,7 @@ void fs_manager::initialize(const std::vector<std::string>
&data_dirs,
// Check the status of this directory.
std::string cdir;
std::string err_msg;
+ disk_status::type status = disk_status::NORMAL;
if (dsn_unlikely(!utils::filesystem::create_directory(dir, cdir,
err_msg) ||
!utils::filesystem::check_dir_rw(dir, err_msg))) {
if (FLAGS_ignore_broken_disk) {
@@ -209,9 +211,7 @@ void fs_manager::initialize(const std::vector<std::string>
&data_dirs,
} else {
CHECK(false, err_msg);
}
- // TODO(yingchun): Remove the 'continue' and mark its io error
status, regardless
- // the status of the disks, add all disks.
- continue;
+ status = disk_status::IO_ERROR;
}
// Normalize the data directories.
@@ -219,9 +219,12 @@ void fs_manager::initialize(const std::vector<std::string>
&data_dirs,
utils::filesystem::get_normalized_path(cdir, norm_path);
// Create and add this dir_node.
- auto dn = std::make_shared<dir_node>(dir_tag, norm_path);
+ auto dn = std::make_shared<dir_node>(dir_tag, norm_path, 0, 0, 0,
status);
dir_nodes.emplace_back(dn);
- LOG_INFO("mark data dir({}) as tag({})", norm_path, dir_tag);
+ LOG_INFO("mark data dir({}) as tag({}) with status({})",
+ norm_path,
+ dir_tag,
+ enum_to_string(status));
}
CHECK_FALSE(dir_nodes.empty());
@@ -268,6 +271,10 @@ dir_node *fs_manager::find_best_dir_for_new_replica(const
gpid &pid) const
zauto_write_lock l(_lock);
// Try to find the dir_node with the least replica count.
for (const auto &dn : _dir_nodes) {
+ // Do not allocate new replica on dir_node which is not NORMAL.
+ if (dn->status != disk_status::NORMAL) {
+ continue;
+ }
CHECK(!dn->has(pid), "gpid({}) already exists in dir_node({})",
pid, dn->tag);
uint64_t app_replicas_count = dn->replicas_count(pid.get_app_id());
uint64_t total_replicas_count = dn->replicas_count();
@@ -308,6 +315,7 @@ void
fs_manager::specify_dir_for_new_replica_for_test(dir_node *specified_dn,
}
}
CHECK(dn_found, "dir_node({}) is not exist", specified_dn->tag);
+ CHECK_EQ(disk_status::NORMAL, specified_dn->status);
const auto dir = specified_dn->replica_dir(app_type, pid);
CHECK_TRUE(dsn::utils::filesystem::create_directory(dir));
specified_dn->holding_replicas[pid.get_app_id()].emplace(pid);
@@ -346,6 +354,13 @@ void fs_manager::update_disk_stat()
zauto_write_lock l(_lock);
reset_disk_stat();
for (auto &dn : _dir_nodes) {
+ // If the disk is already in IO_ERROR status, it will not change to
other status, just skip
+ // it.
+ if (dn->status == disk_status::IO_ERROR) {
+ LOG_WARNING("skip to update disk stat for dir({}), because it is
in IO_ERROR status",
+ dn->tag);
+ continue;
+ }
dn->update_disk_stat();
_total_capacity_mb += dn->disk_capacity_mb;
_total_available_mb += dn->disk_available_mb;
@@ -401,6 +416,10 @@ dir_node *fs_manager::find_replica_dir(dsn::string_view
app_type, gpid pid)
{
zauto_read_lock l(_lock);
for (const auto &dn : _dir_nodes) {
+ // Skip IO error dir_node.
+ if (dn->status == disk_status::IO_ERROR) {
+ continue;
+ }
const auto dir = dn->replica_dir(app_type, pid);
if (utils::filesystem::directory_exists(dir)) {
// Check if there are duplicate replica instance directories.
@@ -455,6 +474,10 @@ dir_node
*fs_manager::create_child_replica_dir(dsn::string_view app_type,
{
zauto_read_lock l(_lock);
for (const auto &dn : _dir_nodes) {
+ // Skip non-available dir_node.
+ if (dn->status != disk_status::NORMAL) {
+ continue;
+ }
child_dir = dn->replica_dir(app_type, child_pid);
// <parent_dir> = <prefix>/<gpid>.<app_type>
// check if <parent_dir>'s <prefix> is equal to <data_dir>
diff --git a/src/common/fs_manager.h b/src/common/fs_manager.h
index be19d79b6..e4c51667d 100644
--- a/src/common/fs_manager.h
+++ b/src/common/fs_manager.h
@@ -19,6 +19,7 @@
#include <gtest/gtest_prod.h>
#include <stdint.h>
+#include <atomic>
#include <functional>
#include <map>
#include <memory>
@@ -48,7 +49,7 @@ public:
int64_t disk_capacity_mb;
int64_t disk_available_mb;
int disk_available_ratio;
- disk_status::type status;
+ std::atomic<disk_status::type> status;
std::map<app_id, std::set<gpid>> holding_replicas;
std::map<app_id, std::set<gpid>> holding_primary_replicas;
std::map<app_id, std::set<gpid>> holding_secondary_replicas;
@@ -147,6 +148,8 @@ private:
int _min_available_ratio = 100;
int _max_available_ratio = 0;
+ // Once dir_node has been added to '_dir_nodes', it will not be removed,
it will be marked
+ // as non-NORMAL status if it is not available.
std::vector<std::shared_ptr<dir_node>> _dir_nodes;
// ] end of lock
diff --git a/src/common/replication_enums.h b/src/common/replication_enums.h
index 8f07a70b9..5eef1710e 100644
--- a/src/common/replication_enums.h
+++ b/src/common/replication_enums.h
@@ -152,6 +152,7 @@ ENUM_END2(replication::disk_migration_status::type,
disk_migration_status)
ENUM_BEGIN2(replication::disk_status::type, disk_status,
replication::disk_status::NORMAL)
ENUM_REG(replication::disk_status::NORMAL)
ENUM_REG(replication::disk_status::SPACE_INSUFFICIENT)
+ENUM_REG(replication::disk_status::IO_ERROR)
ENUM_END2(replication::disk_status::type, disk_status)
ENUM_BEGIN2(replication::manual_compaction_status::type,
diff --git a/src/common/test/fs_manager_test.cpp
b/src/common/test/fs_manager_test.cpp
index a60ddc1f7..542b122f1 100644
--- a/src/common/test/fs_manager_test.cpp
+++ b/src/common/test/fs_manager_test.cpp
@@ -54,15 +54,16 @@ TEST(fs_manager, initialize)
{
std::string create_dir_ok;
std::string check_dir_rw_ok;
- int32_t data_dir_size;
- } tests[]{{"true", "true", 3}, {"true", "false", 2}, {"false", "false",
2}};
+ // Regardless of the status of the disk, the number of dir_nodes
should be 3.
+ int32_t dir_node_size;
+ } tests[]{{"true", "true", 3}, {"true", "false", 3}, {"false", "false",
3}};
int i = 0;
for (const auto &test : tests) {
fail::cfg("filesystem_create_directory", "return(" +
test.create_dir_ok + ")");
fail::cfg("filesystem_check_dir_rw", "return(" + test.check_dir_rw_ok
+ ")");
fs_manager fm;
fm.initialize({"disk1", "disk2", "disk3"}, {"tag1", "tag2", "tag3"});
- ASSERT_EQ(test.data_dir_size, fm.get_dir_nodes().size()) << i;
+ ASSERT_EQ(test.dir_node_size, fm.get_dir_nodes().size()) << i;
i++;
}
fail::teardown();
diff --git a/src/replica/disk_cleaner.cpp b/src/replica/disk_cleaner.cpp
index 04dd174bb..9488c32a6 100644
--- a/src/replica/disk_cleaner.cpp
+++ b/src/replica/disk_cleaner.cpp
@@ -23,8 +23,10 @@
#include <stdint.h>
#include <sys/types.h>
#include <algorithm>
+#include <atomic>
#include "common/fs_manager.h"
+#include "metadata_types.h"
#include "runtime/api_layer1.h"
#include "utils/error_code.h"
#include "utils/filesystem.h"
@@ -73,6 +75,11 @@ error_s disk_remove_useless_dirs(const
std::vector<std::shared_ptr<dir_node>> &d
{
std::vector<std::string> sub_list;
for (const auto &dn : dir_nodes) {
+ // It's allowed to clear up the directory when it's
SPACE_INSUFFICIENT, but not allowed when
+ // it's IO_ERROR.
+ if (dn->status == disk_status::IO_ERROR) {
+ continue;
+ }
std::vector<std::string> tmp_list;
if (!dsn::utils::filesystem::get_subdirectories(dn->full_dir,
tmp_list, false)) {
LOG_WARNING("gc_disk: failed to get subdirectories in {}",
dn->full_dir);
diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp
index be1328b73..2a8e47787 100644
--- a/src/replica/replica_2pc.cpp
+++ b/src/replica/replica_2pc.cpp
@@ -27,6 +27,7 @@
#include <fmt/core.h>
#include <inttypes.h>
#include <stddef.h>
+#include <atomic>
#include <chrono>
#include <functional>
#include <memory>
@@ -116,6 +117,21 @@ DSN_DEFINE_uint64(
DSN_DECLARE_int32(max_mutation_count_in_prepare_list);
DSN_DECLARE_int32(staleness_for_commit);
+namespace {
+error_code disk_status_to_error_code(disk_status::type ds)
+{
+ switch (ds) {
+ case disk_status::SPACE_INSUFFICIENT:
+ return dsn::ERR_DISK_INSUFFICIENT;
+ case disk_status::IO_ERROR:
+ return dsn::ERR_DISK_IO_ERROR;
+ default:
+ CHECK_EQ(disk_status::NORMAL, ds);
+ return dsn::ERR_OK;
+ }
+}
+} // anonymous namespace
+
void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling)
{
_checker.only_one_thread_access();
@@ -179,9 +195,8 @@ void replica::on_client_write(dsn::message_ex *request,
bool ignore_throttling)
}
if (FLAGS_reject_write_when_disk_insufficient &&
- (_dir_node->status == disk_status::SPACE_INSUFFICIENT ||
- _primary_states.secondary_disk_space_insufficient())) {
- response_client_write(request, ERR_DISK_INSUFFICIENT);
+ (_dir_node->status != disk_status::NORMAL ||
_primary_states.secondary_disk_abnormal())) {
+ response_client_write(request,
disk_status_to_error_code(_dir_node->status));
return;
}
diff --git a/src/replica/replica_check.cpp b/src/replica/replica_check.cpp
index e978a91db..bf4b0bff8 100644
--- a/src/replica/replica_check.cpp
+++ b/src/replica/replica_check.cpp
@@ -33,6 +33,7 @@
* xxxx-xx-xx, author, fix bug about xxx
*/
+#include <atomic>
#include <chrono>
#include <memory>
#include <unordered_map>
diff --git a/src/replica/replica_context.cpp b/src/replica/replica_context.cpp
index 6bb5dd48d..67833799e 100644
--- a/src/replica/replica_context.cpp
+++ b/src/replica/replica_context.cpp
@@ -180,13 +180,14 @@ void primary_context::cleanup_split_states()
split_stopped_secondary.clear();
}
-bool primary_context::secondary_disk_space_insufficient() const
+bool primary_context::secondary_disk_abnormal() const
{
for (const auto &kv : secondary_disk_status) {
- if (kv.second == disk_status::SPACE_INSUFFICIENT) {
- LOG_INFO("partition[{}] secondary[{}] disk space is insufficient",
+ if (kv.second != disk_status::NORMAL) {
+ LOG_INFO("partition[{}] secondary[{}] disk space is {}",
membership.pid,
- kv.first.to_string());
+ kv.first.to_string(),
+ enum_to_string(kv.second));
return true;
}
}
diff --git a/src/replica/replica_context.h b/src/replica/replica_context.h
index 5d8c15eb0..472492616 100644
--- a/src/replica/replica_context.h
+++ b/src/replica/replica_context.h
@@ -115,7 +115,7 @@ public:
void cleanup_split_states();
- bool secondary_disk_space_insufficient() const;
+ bool secondary_disk_abnormal() const;
public:
// membership mgr, including learners
diff --git a/src/replica/replica_disk_migrator.cpp
b/src/replica/replica_disk_migrator.cpp
index 0054f2734..1afc5c62e 100644
--- a/src/replica/replica_disk_migrator.cpp
+++ b/src/replica/replica_disk_migrator.cpp
@@ -129,6 +129,7 @@ bool
replica_disk_migrator::check_migration_args(replica_disk_migrate_rpc rpc)
bool valid_origin_disk = false;
bool valid_target_disk = false;
// _dir_nodes: std::vector<std::shared_ptr<dir_node>>
+ // TODO(yingchun): skip disks which are SPACE_INSUFFICIENT or IO_ERROR.
for (const auto &dir_node :
_replica->get_replica_stub()->_fs_manager._dir_nodes) {
if (dir_node->tag == req.origin_disk) {
valid_origin_disk = true;
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index 54b2879e9..dcacf4baf 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -618,6 +618,10 @@ void replica_stub::initialize(const replication_options
&opts, bool clear /* = f
LOG_INFO("start to load replicas");
std::map<dir_node *, std::vector<std::string>> dirs_by_dn;
for (const auto &dn : _fs_manager.get_dir_nodes()) {
+ // Skip IO error dir_node.
+ if (dsn_unlikely(dn->status == disk_status::IO_ERROR)) {
+ continue;
+ }
std::vector<std::string> sub_directories;
CHECK(dsn::utils::filesystem::get_subdirectories(dn->full_dir,
sub_directories, false),
"fail to get sub_directories in {}",
diff --git a/src/replica/test/replica_disk_test.cpp
b/src/replica/test/replica_disk_test.cpp
index ce7441199..11b1399ac 100644
--- a/src/replica/test/replica_disk_test.cpp
+++ b/src/replica/test/replica_disk_test.cpp
@@ -21,6 +21,7 @@
// IWYU pragma: no_include <gtest/gtest-test-part.h>
#include <gtest/gtest.h>
#include <unistd.h>
+#include <atomic>
#include <cstdint>
#include <map>
#include <memory>
diff --git a/src/utils/error_code.h b/src/utils/error_code.h
index 998b21872..25feabd48 100644
--- a/src/utils/error_code.h
+++ b/src/utils/error_code.h
@@ -174,4 +174,6 @@ DEFINE_ERR_CODE(ERR_RANGER_PARSE_ACL)
DEFINE_ERR_CODE(ERR_RANGER_POLICIES_NO_NEED_UPDATE)
DEFINE_ERR_CODE(ERR_RDB_CORRUPTION)
+
+DEFINE_ERR_CODE(ERR_DISK_IO_ERROR)
} // namespace dsn
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]