empiredan commented on code in PR #2078:
URL:
https://github.com/apache/incubator-pegasus/pull/2078#discussion_r1897060207
##########
src/replica/replica_stub.cpp:
##########
@@ -411,6 +432,173 @@ void replica_stub::initialize(bool clear /* = false*/)
_access_controller = std::make_unique<dsn::security::access_controller>();
}
+replica_stub::disk_dirs replica_stub::get_all_disk_dirs() const
+{
+ disk_dirs disks;
+ for (const auto &dn : _fs_manager.get_dir_nodes()) {
+ if (dsn_unlikely(dn->status == disk_status::IO_ERROR)) {
+ // Skip disks with IO errors.
+ continue;
+ }
+
+ std::vector<std::string> sub_dirs;
+ CHECK(dsn::utils::filesystem::get_subdirectories(dn->full_dir,
sub_dirs, false),
+ "failed to get sub_directories in {}",
+ dn->full_dir);
+ disks.emplace_back(dn.get(), std::move(sub_dirs));
+ }
+
+ return disks;
+}
+
+void replica_stub::load_replica(dir_node *dn,
+ const std::string &dir,
+ utils::ex_lock &reps_lock,
+ replicas &reps)
+{
+ LOG_INFO("loading replica: tag={}, dir={}", dn->tag, dir);
+
+ const auto *const worker = task::get_current_worker2();
+ if (worker != nullptr) {
+ CHECK(!(worker->pool()->spec().partitioned),
+ "The thread pool for loading replicas must not be partitioned
since load balancing "
+ "is required among multiple threads");
+ }
+
+ auto rep = load_replica(dn, dir.c_str());
+ if (rep == nullptr) {
+ return;
+ }
+
+ LOG_INFO("{}@{}: load replica successfully, tag={}, dir={},
last_durable_decree={}, "
+ "last_committed_decree={}, last_prepared_decree={}",
+ rep->get_gpid(),
+ dsn_primary_host_port(),
+ dn->tag,
+ dir,
+ rep->last_durable_decree(),
+ rep->last_committed_decree(),
+ rep->last_prepared_decree());
+
+ utils::auto_lock<utils::ex_lock> l(reps_lock);
+ CHECK(reps.find(rep->get_gpid()) == reps.end(),
+ "conflict replica dir: {} <--> {}",
+ rep->dir(),
+ reps[rep->get_gpid()]->dir());
+
+ reps[rep->get_gpid()] = rep;
+}
+
+void replica_stub::load_replicas(replicas &reps)
+{
+ const auto &disks = get_all_disk_dirs();
+
+ std::vector<size_t> dir_indexes(disks.size(), 0);
+ std::vector<std::queue<std::pair<std::string, task_ptr>>>
load_disk_queues(disks.size());
+ utils::ex_lock reps_lock;
+
+ while (true) {
+ size_t finished_disks = 0;
+
+ // For each round, start loading one replica for each disk in case
there are too many
+ // replicas in a disk, except that all of the replicas of this disk
are being loaded.
+ for (size_t disk_index = 0; disk_index < disks.size(); ++disk_index) {
+ // Structured bindings can be captured by closures in g++, while
not supported
+ // well by clang. Thus we do not use following statement to bind
both variables
+ // until clang has been upgraded to version 16 which could support
that well:
+ //
+ // const auto &[dn, dirs] = disks[disk_index];
+ //
+ // For the docs of clang 16 please see:
+ //
+ //
https://releases.llvm.org/16.0.0/tools/clang/docs/ReleaseNotes.html#c-20-feature-support:
+ const auto &dirs = disks[disk_index].second;
+
+ auto &dir_index = dir_indexes[disk_index];
+ if (dir_index >= dirs.size()) {
+ // All of the replicas for the disk `disks[disk_index]` have
begun to be loaded,
+ // thus just skip.
+ ++finished_disks;
+ continue;
+ }
+
+ const auto &dn = disks[disk_index].first;
+ auto &load_disk_queue = load_disk_queues[disk_index];
+ if (!load_disk_queue.empty() &&
+ load_disk_queue.size() >=
FLAGS_max_replicas_on_load_for_each_disk) {
+ // Loading replicas should be throttled in case that disk IO
is saturated.
+ if
(load_disk_queue.front().second->wait(FLAGS_load_replica_max_wait_time_ms)) {
+ load_disk_queue.pop();
+ } else {
+ // There might be too many replicas that are being loaded
which lead to
+ // slow disk IO.
+ LOG_WARNING("after {} ms, loading dir({}) is still not
finished, there are "
+ "{} replicas being loaded for disk(index={},
tag={}, path={}), "
+ "skip dir(index={}, path={}), turn to next
disk",
+ FLAGS_load_replica_max_wait_time_ms,
+ load_disk_queue.front().first,
+ load_disk_queue.size(),
+ disk_index,
Review Comment:
OK, use progress instead.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]