acelyc111 commented on code in PR #2078:
URL: 
https://github.com/apache/incubator-pegasus/pull/2078#discussion_r1697060097


##########
src/replica/replica_stub.cpp:
##########
@@ -411,6 +432,173 @@ void replica_stub::initialize(bool clear /* = false*/)
     _access_controller = std::make_unique<dsn::security::access_controller>();
 }
 
+replica_stub::disk_dirs replica_stub::get_all_disk_dirs() const
+{
+    disk_dirs disks;
+    for (const auto &dn : _fs_manager.get_dir_nodes()) {
+        if (dsn_unlikely(dn->status == disk_status::IO_ERROR)) {
+            // Skip disks with IO errors.
+            continue;
+        }
+
+        std::vector<std::string> sub_dirs;
+        CHECK(dsn::utils::filesystem::get_subdirectories(dn->full_dir, 
sub_dirs, false),
+              "failed to get sub_directories in {}",
+              dn->full_dir);
+        disks.emplace_back(dn.get(), std::move(sub_dirs));
+    }
+
+    return disks;
+}
+
+void replica_stub::load_replica(dir_node *dn,
+                                const std::string &dir,
+                                utils::ex_lock &reps_lock,
+                                replicas &reps)
+{
+    LOG_INFO("loading replica: tag={}, dir={}", dn->tag, dir);
+
+    const auto *const worker = task::get_current_worker2();
+    if (worker != nullptr) {
+        CHECK(!(worker->pool()->spec().partitioned),
+              "The thread pool for loading replicas must not be partitioned 
since load balancing "
+              "is required among multiple threads");
+    }
+
+    auto rep = load_replica(dn, dir.c_str());
+    if (rep == nullptr) {
+        return;
+    }
+
+    LOG_INFO("{}@{}: load replica successfully, tag={}, dir={}, 
last_durable_decree={}, "
+             "last_committed_decree={}, last_prepared_decree={}",
+             rep->get_gpid(),
+             dsn_primary_host_port(),
+             dn->tag,
+             dir,
+             rep->last_durable_decree(),
+             rep->last_committed_decree(),
+             rep->last_prepared_decree());
+
+    utils::auto_lock<utils::ex_lock> l(reps_lock);
+    CHECK(reps.find(rep->get_gpid()) == reps.end(),
+          "conflict replica dir: {} <--> {}",
+          rep->dir(),
+          reps[rep->get_gpid()]->dir());
+
+    reps[rep->get_gpid()] = rep;
+}
+
+void replica_stub::load_replicas(replicas &reps)
+{
+    const auto &disks = get_all_disk_dirs();
+
+    std::vector<size_t> dir_indexes(disks.size(), 0);
+    std::vector<std::queue<std::pair<std::string, task_ptr>>> 
load_disk_queues(disks.size());
+    utils::ex_lock reps_lock;
+
+    while (true) {
+        size_t finished_disks = 0;
+
+        // For each round, start loading one replica for each disk in case 
there are too many
+        // replicas in a disk, except that all of the replicas of this disk 
are being loaded.
+        for (size_t disk_index = 0; disk_index < disks.size(); ++disk_index) {
+            // Structured bindings can be captured by closures in g++, while 
not supported
+            // well by clang. Thus we do not use following statement to bind 
both variables
+            // until clang has been upgraded to version 16 which could support 
that well:
+            //
+            //     const auto &[dn, dirs] = disks[disk_index];
+            //
+            // For the docs of clang 16 please see:
+            //
+            // 
https://releases.llvm.org/16.0.0/tools/clang/docs/ReleaseNotes.html#c-20-feature-support:
+            const auto &dirs = disks[disk_index].second;
+
+            auto &dir_index = dir_indexes[disk_index];
+            if (dir_index >= dirs.size()) {
+                // All of the replicas for the disk `disks[disk_index]` have 
begun to be loaded,
+                // thus just skip.
+                ++finished_disks;
+                continue;
+            }
+
+            const auto &dn = disks[disk_index].first;
+            auto &load_disk_queue = load_disk_queues[disk_index];
+            if (!load_disk_queue.empty() &&
+                load_disk_queue.size() >= 
FLAGS_max_replicas_on_load_for_each_disk) {
+                // Loading replicas should be throttled in case that disk IO 
is saturated.
+                if 
(load_disk_queue.front().second->wait(FLAGS_load_replica_max_wait_time_ms)) {
+                    load_disk_queue.pop();
+                } else {
+                    // There might be too many replicas that are being loaded 
which lead to
+                    // slow disk IO.
+                    LOG_WARNING("after {} ms, loading dir({}) is still not 
finished, there are "
+                                "{} replicas being loaded for disk(index={}, 
tag={}, path={}), "
+                                "skip dir(index={}, path={}), turn to next 
disk",
+                                FLAGS_load_replica_max_wait_time_ms,
+                                load_disk_queue.front().first,
+                                load_disk_queue.size(),
+                                disk_index,
+                                dn->tag,
+                                dn->full_dir,
+                                dir_index,
+                                dirs[dir_index]);
+                    continue;
+                }
+
+                // Continue to load a replica since we are within the limit 
now.

Review Comment:
   Skip loading?



##########
src/replica/replica_stub.cpp:
##########
@@ -411,6 +432,173 @@ void replica_stub::initialize(bool clear /* = false*/)
     _access_controller = std::make_unique<dsn::security::access_controller>();
 }
 
+replica_stub::disk_dirs replica_stub::get_all_disk_dirs() const
+{
+    disk_dirs disks;
+    for (const auto &dn : _fs_manager.get_dir_nodes()) {
+        if (dsn_unlikely(dn->status == disk_status::IO_ERROR)) {
+            // Skip disks with IO errors.
+            continue;
+        }
+
+        std::vector<std::string> sub_dirs;
+        CHECK(dsn::utils::filesystem::get_subdirectories(dn->full_dir, 
sub_dirs, false),
+              "failed to get sub_directories in {}",
+              dn->full_dir);
+        disks.emplace_back(dn.get(), std::move(sub_dirs));
+    }
+
+    return disks;
+}
+
+void replica_stub::load_replica(dir_node *dn,
+                                const std::string &dir,
+                                utils::ex_lock &reps_lock,
+                                replicas &reps)
+{
+    LOG_INFO("loading replica: tag={}, dir={}", dn->tag, dir);
+
+    const auto *const worker = task::get_current_worker2();
+    if (worker != nullptr) {
+        CHECK(!(worker->pool()->spec().partitioned),
+              "The thread pool for loading replicas must not be partitioned 
since load balancing "
+              "is required among multiple threads");
+    }
+
+    auto rep = load_replica(dn, dir.c_str());
+    if (rep == nullptr) {
+        return;
+    }
+
+    LOG_INFO("{}@{}: load replica successfully, tag={}, dir={}, 
last_durable_decree={}, "
+             "last_committed_decree={}, last_prepared_decree={}",
+             rep->get_gpid(),
+             dsn_primary_host_port(),
+             dn->tag,
+             dir,
+             rep->last_durable_decree(),
+             rep->last_committed_decree(),
+             rep->last_prepared_decree());
+
+    utils::auto_lock<utils::ex_lock> l(reps_lock);
+    CHECK(reps.find(rep->get_gpid()) == reps.end(),
+          "conflict replica dir: {} <--> {}",
+          rep->dir(),
+          reps[rep->get_gpid()]->dir());
+
+    reps[rep->get_gpid()] = rep;
+}
+
+void replica_stub::load_replicas(replicas &reps)
+{
+    const auto &disks = get_all_disk_dirs();
+
+    std::vector<size_t> dir_indexes(disks.size(), 0);
+    std::vector<std::queue<std::pair<std::string, task_ptr>>> 
load_disk_queues(disks.size());
+    utils::ex_lock reps_lock;
+
+    while (true) {
+        size_t finished_disks = 0;
+
+        // For each round, start loading one replica for each disk in case 
there are too many
+        // replicas in a disk, except that all of the replicas of this disk 
are being loaded.
+        for (size_t disk_index = 0; disk_index < disks.size(); ++disk_index) {
+            // Structured bindings can be captured by closures in g++, while 
not supported
+            // well by clang. Thus we do not use following statement to bind 
both variables
+            // until clang has been upgraded to version 16 which could support 
that well:
+            //
+            //     const auto &[dn, dirs] = disks[disk_index];
+            //
+            // For the docs of clang 16 please see:
+            //
+            // 
https://releases.llvm.org/16.0.0/tools/clang/docs/ReleaseNotes.html#c-20-feature-support:
+            const auto &dirs = disks[disk_index].second;
+
+            auto &dir_index = dir_indexes[disk_index];
+            if (dir_index >= dirs.size()) {
+                // All of the replicas for the disk `disks[disk_index]` have 
begun to be loaded,
+                // thus just skip.
+                ++finished_disks;
+                continue;
+            }
+
+            const auto &dn = disks[disk_index].first;
+            auto &load_disk_queue = load_disk_queues[disk_index];
+            if (!load_disk_queue.empty() &&
+                load_disk_queue.size() >= 
FLAGS_max_replicas_on_load_for_each_disk) {
+                // Loading replicas should be throttled in case that disk IO 
is saturated.
+                if 
(load_disk_queue.front().second->wait(FLAGS_load_replica_max_wait_time_ms)) {
+                    load_disk_queue.pop();
+                } else {
+                    // There might be too many replicas that are being loaded 
which lead to
+                    // slow disk IO.
+                    LOG_WARNING("after {} ms, loading dir({}) is still not 
finished, there are "
+                                "{} replicas being loaded for disk(index={}, 
tag={}, path={}), "
+                                "skip dir(index={}, path={}), turn to next 
disk",
+                                FLAGS_load_replica_max_wait_time_ms,
+                                load_disk_queue.front().first,
+                                load_disk_queue.size(),
+                                disk_index,
+                                dn->tag,
+                                dn->full_dir,
+                                dir_index,
+                                dirs[dir_index]);
+                    continue;
+                }
+
+                // Continue to load a replica since we are within the limit 
now.
+                if (dsn_unlikely(load_disk_queue.size() >=
+                                 FLAGS_max_replicas_on_load_for_each_disk)) {
+                    continue;
+                }
+            }
+
+            LOG_DEBUG("ready to load dir(index={}, path={}) for disk(index={}, 
tag={}, path={})",

Review Comment:
   How about moving it below the next `continue` in line 566?



##########
src/replica/replica_stub.cpp:
##########
@@ -495,75 +683,18 @@ void replica_stub::initialize(const replication_options 
&opts, bool clear /* = f
 
     // Start to load replicas in available data directories.
     LOG_INFO("start to load replicas");
-    std::map<dir_node *, std::vector<std::string>> dirs_by_dn;
-    for (const auto &dn : _fs_manager.get_dir_nodes()) {
-        // Skip IO error dir_node.
-        if (dsn_unlikely(dn->status == disk_status::IO_ERROR)) {
-            continue;
-        }
-        std::vector<std::string> sub_directories;
-        CHECK(dsn::utils::filesystem::get_subdirectories(dn->full_dir, 
sub_directories, false),
-              "fail to get sub_directories in {}",
-              dn->full_dir);
-        dirs_by_dn.emplace(dn.get(), sub_directories);
-    }
-
-    replicas rps;
-    utils::ex_lock rps_lock;
-    std::deque<task_ptr> load_tasks;
-    uint64_t start_time = dsn_now_ms();
-    for (const auto &dn_dirs : dirs_by_dn) {
-        const auto dn = dn_dirs.first;
-        for (const auto &dir : dn_dirs.second) {
-            if (dsn::replication::is_data_dir_invalid(dir)) {
-                LOG_WARNING("ignore dir {}", dir);
-                continue;
-            }
 
-            load_tasks.push_back(tasking::create_task(
-                LPC_REPLICATION_INIT_LOAD,
-                &_tracker,
-                [this, dn, dir, &rps, &rps_lock] {
-                    LOG_INFO("process dir {}", dir);
+    replicas reps;
 
-                    auto r = load_replica(dn, dir.c_str());
-                    if (r == nullptr) {
-                        return;
-                    }
-                    LOG_INFO("{}@{}: load replica '{}' success, <durable, "
-                             "commit> = <{}, {}>, last_prepared_decree = {}",
-                             r->get_gpid(),
-                             dsn_primary_host_port(),
-                             dir,
-                             r->last_durable_decree(),
-                             r->last_committed_decree(),
-                             r->last_prepared_decree());
-
-                    utils::auto_lock<utils::ex_lock> l(rps_lock);
-                    CHECK(rps.find(r->get_gpid()) == rps.end(),
-                          "conflict replica dir: {} <--> {}",
-                          r->dir(),
-                          rps[r->get_gpid()]->dir());
-
-                    rps[r->get_gpid()] = r;
-                },
-                load_tasks.size()));
-            load_tasks.back()->enqueue();
-        }
-    }
-    for (auto &tsk : load_tasks) {
-        tsk->wait();
-    }
-    uint64_t finish_time = dsn_now_ms();
+    utils::chronograph chrono;

Review Comment:
   The macros in src/utils/timer.h may help.



##########
src/replica/replica_stub.cpp:
##########
@@ -2015,7 +2164,6 @@ replica *replica_stub::load_replica(dir_node *dn, const 
char *dir)
     const auto err = rep->initialize_on_load();
     if (err != ERR_OK) {
         LOG_ERROR("{}: load replica failed, err = {}", rep->name(), err);
-        rep->close();

Review Comment:
   Why remove this?



##########
src/replica/replica_stub.cpp:
##########
@@ -411,6 +432,173 @@ void replica_stub::initialize(bool clear /* = false*/)
     _access_controller = std::make_unique<dsn::security::access_controller>();
 }
 
+replica_stub::disk_dirs replica_stub::get_all_disk_dirs() const
+{
+    disk_dirs disks;
+    for (const auto &dn : _fs_manager.get_dir_nodes()) {
+        if (dsn_unlikely(dn->status == disk_status::IO_ERROR)) {
+            // Skip disks with IO errors.
+            continue;
+        }
+
+        std::vector<std::string> sub_dirs;
+        CHECK(dsn::utils::filesystem::get_subdirectories(dn->full_dir, 
sub_dirs, false),
+              "failed to get sub_directories in {}",
+              dn->full_dir);
+        disks.emplace_back(dn.get(), std::move(sub_dirs));
+    }
+
+    return disks;
+}
+
+void replica_stub::load_replica(dir_node *dn,
+                                const std::string &dir,
+                                utils::ex_lock &reps_lock,
+                                replicas &reps)
+{
+    LOG_INFO("loading replica: tag={}, dir={}", dn->tag, dir);
+
+    const auto *const worker = task::get_current_worker2();
+    if (worker != nullptr) {
+        CHECK(!(worker->pool()->spec().partitioned),
+              "The thread pool for loading replicas must not be partitioned 
since load balancing "
+              "is required among multiple threads");
+    }
+
+    auto rep = load_replica(dn, dir.c_str());
+    if (rep == nullptr) {
+        return;
+    }
+
+    LOG_INFO("{}@{}: load replica successfully, tag={}, dir={}, 
last_durable_decree={}, "
+             "last_committed_decree={}, last_prepared_decree={}",
+             rep->get_gpid(),
+             dsn_primary_host_port(),
+             dn->tag,
+             dir,
+             rep->last_durable_decree(),
+             rep->last_committed_decree(),
+             rep->last_prepared_decree());
+
+    utils::auto_lock<utils::ex_lock> l(reps_lock);
+    CHECK(reps.find(rep->get_gpid()) == reps.end(),
+          "conflict replica dir: {} <--> {}",
+          rep->dir(),
+          reps[rep->get_gpid()]->dir());
+
+    reps[rep->get_gpid()] = rep;
+}
+
+void replica_stub::load_replicas(replicas &reps)
+{
+    const auto &disks = get_all_disk_dirs();
+
+    std::vector<size_t> dir_indexes(disks.size(), 0);
+    std::vector<std::queue<std::pair<std::string, task_ptr>>> 
load_disk_queues(disks.size());
+    utils::ex_lock reps_lock;
+
+    while (true) {
+        size_t finished_disks = 0;
+
+        // For each round, start loading one replica for each disk in case 
there are too many
+        // replicas in a disk, except that all of the replicas of this disk 
are being loaded.
+        for (size_t disk_index = 0; disk_index < disks.size(); ++disk_index) {
+            // Structured bindings can be captured by closures in g++, while 
not supported
+            // well by clang. Thus we do not use following statement to bind 
both variables
+            // until clang has been upgraded to version 16 which could support 
that well:
+            //
+            //     const auto &[dn, dirs] = disks[disk_index];
+            //
+            // For the docs of clang 16 please see:
+            //
+            // 
https://releases.llvm.org/16.0.0/tools/clang/docs/ReleaseNotes.html#c-20-feature-support:
+            const auto &dirs = disks[disk_index].second;
+
+            auto &dir_index = dir_indexes[disk_index];
+            if (dir_index >= dirs.size()) {
+                // All of the replicas for the disk `disks[disk_index]` have 
begun to be loaded,
+                // thus just skip.
+                ++finished_disks;
+                continue;
+            }
+
+            const auto &dn = disks[disk_index].first;
+            auto &load_disk_queue = load_disk_queues[disk_index];
+            if (!load_disk_queue.empty() &&
+                load_disk_queue.size() >= 
FLAGS_max_replicas_on_load_for_each_disk) {
+                // Loading replicas should be throttled in case that disk IO 
is saturated.
+                if 
(load_disk_queue.front().second->wait(FLAGS_load_replica_max_wait_time_ms)) {

Review Comment:
   It seems this patch implemented a theadpool-with-max-threads, isn't it? The 
benifit compare to the former implementation is now we can limit the 
`max_replicas_on_load_for_each_disk`.
   
   Could the rocksdb::ThreadPool work well here? 
https://github.com/apache/incubator-pegasus/blob/master/src/shell/commands/local_partition_split.cpp#L392



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to