This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 0bfe78023 refactor(duplication): move
get_all_replicas/get_all_primaries from duplication_sync_timer to replica_stub
(#1967)
0bfe78023 is described below
commit 0bfe78023f38b3eeeb02bd7988f76e55b9c2aabb
Author: Dan Wang <[email protected]>
AuthorDate: Tue Apr 2 11:12:01 2024 +0800
refactor(duplication): move get_all_replicas/get_all_primaries from
duplication_sync_timer to replica_stub (#1967)
Actually both `get_all_primaries()` and `get_all_replicas()` are operating
the members
of `replica_stub`, thus it's better to move both of them to `replica_stub`.
See also
https://github.com/apache/incubator-pegasus/pull/1608.
---
src/replica/duplication/duplication_sync_timer.cpp | 42 ++++------------------
src/replica/duplication/duplication_sync_timer.h | 6 ----
.../test/duplication_sync_timer_test.cpp | 1 +
src/replica/replica_stub.cpp | 29 +++++++++++++++
src/replica/replica_stub.h | 2 ++
5 files changed, 38 insertions(+), 42 deletions(-)
diff --git a/src/replica/duplication/duplication_sync_timer.cpp
b/src/replica/duplication/duplication_sync_timer.cpp
index 98137d21f..e1e0a1645 100644
--- a/src/replica/duplication/duplication_sync_timer.cpp
+++ b/src/replica/duplication/duplication_sync_timer.cpp
@@ -19,13 +19,12 @@
#include <chrono>
#include <cstdint>
#include <memory>
-#include <unordered_map>
#include <utility>
+#include <vector>
#include "common/duplication_common.h"
#include "common/replication.codes.h"
#include "duplication_sync_timer.h"
-#include "metadata_types.h"
#include "replica/replica.h"
#include "replica/replica_stub.h"
#include "replica_duplicator_manager.h"
@@ -74,7 +73,7 @@ void duplication_sync_timer::run()
req->__set_hp_node(_stub->primary_host_port());
// collects confirm points from all primaries on this server
- for (const replica_ptr &r : get_all_primaries()) {
+ for (const replica_ptr &r : _stub->get_all_primaries()) {
auto confirmed =
r->get_duplication_manager()->get_duplication_confirms_to_update();
if (!confirmed.empty()) {
req->confirm_list[r->get_gpid()] = std::move(confirmed);
@@ -112,8 +111,8 @@ void
duplication_sync_timer::on_duplication_sync_reply(error_code err,
void duplication_sync_timer::update_duplication_map(
const std::map<int32_t, std::map<int32_t, duplication_entry>> &dup_map)
{
- for (replica_ptr &r : get_all_replicas()) {
- auto it = dup_map.find(r->get_gpid().get_app_id());
+ for (replica_ptr &r : _stub->get_all_replicas()) {
+ const auto &it = dup_map.find(r->get_gpid().get_app_id());
if (it == dup_map.end()) {
// no duplication is assigned to this app
r->get_duplication_manager()->update_duplication_map({});
@@ -127,35 +126,6 @@
duplication_sync_timer::duplication_sync_timer(replica_stub *stub) : _stub(stub)
duplication_sync_timer::~duplication_sync_timer() {}
-std::vector<replica_ptr> duplication_sync_timer::get_all_primaries()
-{
- std::vector<replica_ptr> replica_vec;
- {
- zauto_read_lock l(_stub->_replicas_lock);
- for (auto &kv : _stub->_replicas) {
- replica_ptr r = kv.second;
- if (r->status() != partition_status::PS_PRIMARY) {
- continue;
- }
- replica_vec.emplace_back(std::move(r));
- }
- }
- return replica_vec;
-}
-
-std::vector<replica_ptr> duplication_sync_timer::get_all_replicas()
-{
- std::vector<replica_ptr> replica_vec;
- {
- zauto_read_lock l(_stub->_replicas_lock);
- for (auto &kv : _stub->_replicas) {
- replica_ptr r = kv.second;
- replica_vec.emplace_back(std::move(r));
- }
- }
- return replica_vec;
-}
-
void duplication_sync_timer::close()
{
LOG_INFO("stop duplication sync");
@@ -191,8 +161,8 @@ duplication_sync_timer::get_dup_states(int app_id, /*out*/
bool *app_found)
{
*app_found = false;
std::multimap<dupid_t, replica_dup_state> result;
- for (const replica_ptr &r : get_all_primaries()) {
- gpid rid = r->get_gpid();
+ for (const replica_ptr &r : _stub->get_all_primaries()) {
+ const gpid rid = r->get_gpid();
if (rid.get_app_id() != app_id) {
continue;
}
diff --git a/src/replica/duplication/duplication_sync_timer.h
b/src/replica/duplication/duplication_sync_timer.h
index 48fe2ec95..1a57a7631 100644
--- a/src/replica/duplication/duplication_sync_timer.h
+++ b/src/replica/duplication/duplication_sync_timer.h
@@ -19,13 +19,11 @@
#include <map>
#include <string>
-#include <vector>
#include "common//duplication_common.h"
#include "common/gpid.h"
#include "common/replication_other_types.h"
#include "duplication_types.h"
-#include "replica/replica.h"
#include "runtime/task/task.h"
#include "utils/zlocks.h"
@@ -71,10 +69,6 @@ private:
void on_duplication_sync_reply(error_code err, const
duplication_sync_response &resp);
- std::vector<replica_ptr> get_all_primaries();
-
- std::vector<replica_ptr> get_all_replicas();
-
private:
friend class duplication_sync_timer_test;
diff --git a/src/replica/duplication/test/duplication_sync_timer_test.cpp
b/src/replica/duplication/test/duplication_sync_timer_test.cpp
index c02629e41..7ac1b19a4 100644
--- a/src/replica/duplication/test/duplication_sync_timer_test.cpp
+++ b/src/replica/duplication/test/duplication_sync_timer_test.cpp
@@ -22,6 +22,7 @@
#include <memory>
#include <string>
#include <utility>
+#include <vector>
#include "common/duplication_common.h"
#include "common/replication.codes.h"
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index 4f6f73c99..25cd5d8b6 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -35,6 +35,7 @@
#include <chrono>
#include <cstdint>
#include <deque>
+#include <iterator>
#include <mutex>
#include <ostream>
#include <set>
@@ -731,6 +732,34 @@ dsn::error_code replica_stub::on_kill_replica(gpid id)
}
}
+std::vector<replica_ptr> replica_stub::get_all_replicas() const
+{
+ std::vector<replica_ptr> result;
+ {
+ zauto_read_lock l(_replicas_lock);
+ std::transform(_replicas.begin(),
+ _replicas.end(),
+ std::back_inserter(result),
+ [](const std::pair<gpid, replica_ptr> &r) { return
r.second; });
+ }
+ return result;
+}
+
+std::vector<replica_ptr> replica_stub::get_all_primaries() const
+{
+ std::vector<replica_ptr> result;
+ {
+ zauto_read_lock l(_replicas_lock);
+ for (const auto & [ _, r ] : _replicas) {
+ if (r->status() != partition_status::PS_PRIMARY) {
+ continue;
+ }
+ result.push_back(r);
+ }
+ }
+ return result;
+}
+
replica_ptr replica_stub::get_replica(gpid id) const
{
zauto_read_lock l(_replicas_lock);
diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h
index 0370c7d0c..9e1c09762 100644
--- a/src/replica/replica_stub.h
+++ b/src/replica/replica_stub.h
@@ -195,6 +195,8 @@ public:
//
// common routines for inquiry
//
+ std::vector<replica_ptr> get_all_replicas() const;
+ std::vector<replica_ptr> get_all_primaries() const;
replica_ptr get_replica(gpid id) const;
replication_options &options() { return _options; }
const replication_options &options() const { return _options; }
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]