empiredan commented on code in PR #2078:
URL: 
https://github.com/apache/incubator-pegasus/pull/2078#discussion_r1897060207


##########
src/replica/replica_stub.cpp:
##########
@@ -411,6 +432,173 @@ void replica_stub::initialize(bool clear /* = false*/)
     _access_controller = std::make_unique<dsn::security::access_controller>();
 }
 
+replica_stub::disk_dirs replica_stub::get_all_disk_dirs() const
+{
+    disk_dirs disks;
+    for (const auto &dn : _fs_manager.get_dir_nodes()) {
+        if (dsn_unlikely(dn->status == disk_status::IO_ERROR)) {
+            // Skip disks with IO errors.
+            continue;
+        }
+
+        std::vector<std::string> sub_dirs;
+        CHECK(dsn::utils::filesystem::get_subdirectories(dn->full_dir, 
sub_dirs, false),
+              "failed to get sub_directories in {}",
+              dn->full_dir);
+        disks.emplace_back(dn.get(), std::move(sub_dirs));
+    }
+
+    return disks;
+}
+
+void replica_stub::load_replica(dir_node *dn,
+                                const std::string &dir,
+                                utils::ex_lock &reps_lock,
+                                replicas &reps)
+{
+    LOG_INFO("loading replica: tag={}, dir={}", dn->tag, dir);
+
+    const auto *const worker = task::get_current_worker2();
+    if (worker != nullptr) {
+        CHECK(!(worker->pool()->spec().partitioned),
+              "The thread pool for loading replicas must not be partitioned 
since load balancing "
+              "is required among multiple threads");
+    }
+
+    auto rep = load_replica(dn, dir.c_str());
+    if (rep == nullptr) {
+        return;
+    }
+
+    LOG_INFO("{}@{}: load replica successfully, tag={}, dir={}, 
last_durable_decree={}, "
+             "last_committed_decree={}, last_prepared_decree={}",
+             rep->get_gpid(),
+             dsn_primary_host_port(),
+             dn->tag,
+             dir,
+             rep->last_durable_decree(),
+             rep->last_committed_decree(),
+             rep->last_prepared_decree());
+
+    utils::auto_lock<utils::ex_lock> l(reps_lock);
+    CHECK(reps.find(rep->get_gpid()) == reps.end(),
+          "conflict replica dir: {} <--> {}",
+          rep->dir(),
+          reps[rep->get_gpid()]->dir());
+
+    reps[rep->get_gpid()] = rep;
+}
+
+void replica_stub::load_replicas(replicas &reps)
+{
+    const auto &disks = get_all_disk_dirs();
+
+    std::vector<size_t> dir_indexes(disks.size(), 0);
+    std::vector<std::queue<std::pair<std::string, task_ptr>>> 
load_disk_queues(disks.size());
+    utils::ex_lock reps_lock;
+
+    while (true) {
+        size_t finished_disks = 0;
+
+        // For each round, start loading one replica for each disk in case 
there are too many
+        // replicas in a disk, except that all of the replicas of this disk 
are being loaded.
+        for (size_t disk_index = 0; disk_index < disks.size(); ++disk_index) {
+            // Structured bindings can be captured by closures in g++, while 
not supported
+            // well by clang. Thus we do not use following statement to bind 
both variables
+            // until clang has been upgraded to version 16 which could support 
that well:
+            //
+            //     const auto &[dn, dirs] = disks[disk_index];
+            //
+            // For the docs of clang 16 please see:
+            //
+            // 
https://releases.llvm.org/16.0.0/tools/clang/docs/ReleaseNotes.html#c-20-feature-support:
+            const auto &dirs = disks[disk_index].second;
+
+            auto &dir_index = dir_indexes[disk_index];
+            if (dir_index >= dirs.size()) {
+                // All of the replicas for the disk `disks[disk_index]` have 
begun to be loaded,
+                // thus just skip.
+                ++finished_disks;
+                continue;
+            }
+
+            const auto &dn = disks[disk_index].first;
+            auto &load_disk_queue = load_disk_queues[disk_index];
+            if (!load_disk_queue.empty() &&
+                load_disk_queue.size() >= 
FLAGS_max_replicas_on_load_for_each_disk) {
+                // Loading replicas should be throttled in case that disk IO 
is saturated.
+                if 
(load_disk_queue.front().second->wait(FLAGS_load_replica_max_wait_time_ms)) {
+                    load_disk_queue.pop();
+                } else {
+                    // There might be too many replicas that are being loaded 
which lead to
+                    // slow disk IO.
+                    LOG_WARNING("after {} ms, loading dir({}) is still not 
finished, there are "
+                                "{} replicas being loaded for disk(index={}, 
tag={}, path={}), "
+                                "skip dir(index={}, path={}), turn to next 
disk",
+                                FLAGS_load_replica_max_wait_time_ms,
+                                load_disk_queue.front().first,
+                                load_disk_queue.size(),
+                                disk_index,

Review Comment:
   OK, use progress instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to