This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bfe78023 refactor(duplication): move 
get_all_replicas/get_all_primaries from duplication_sync_timer to replica_stub 
(#1967)
0bfe78023 is described below

commit 0bfe78023f38b3eeeb02bd7988f76e55b9c2aabb
Author: Dan Wang <[email protected]>
AuthorDate: Tue Apr 2 11:12:01 2024 +0800

    refactor(duplication): move get_all_replicas/get_all_primaries from 
duplication_sync_timer to replica_stub (#1967)
    
    Actually both `get_all_primaries()` and `get_all_replicas()` are operating 
the members
    of `replica_stub`, thus it's better to move both of them to `replica_stub`. 
See also
    https://github.com/apache/incubator-pegasus/pull/1608.
---
 src/replica/duplication/duplication_sync_timer.cpp | 42 ++++------------------
 src/replica/duplication/duplication_sync_timer.h   |  6 ----
 .../test/duplication_sync_timer_test.cpp           |  1 +
 src/replica/replica_stub.cpp                       | 29 +++++++++++++++
 src/replica/replica_stub.h                         |  2 ++
 5 files changed, 38 insertions(+), 42 deletions(-)

diff --git a/src/replica/duplication/duplication_sync_timer.cpp 
b/src/replica/duplication/duplication_sync_timer.cpp
index 98137d21f..e1e0a1645 100644
--- a/src/replica/duplication/duplication_sync_timer.cpp
+++ b/src/replica/duplication/duplication_sync_timer.cpp
@@ -19,13 +19,12 @@
 #include <chrono>
 #include <cstdint>
 #include <memory>
-#include <unordered_map>
 #include <utility>
+#include <vector>
 
 #include "common/duplication_common.h"
 #include "common/replication.codes.h"
 #include "duplication_sync_timer.h"
-#include "metadata_types.h"
 #include "replica/replica.h"
 #include "replica/replica_stub.h"
 #include "replica_duplicator_manager.h"
@@ -74,7 +73,7 @@ void duplication_sync_timer::run()
     req->__set_hp_node(_stub->primary_host_port());
 
     // collects confirm points from all primaries on this server
-    for (const replica_ptr &r : get_all_primaries()) {
+    for (const replica_ptr &r : _stub->get_all_primaries()) {
         auto confirmed = 
r->get_duplication_manager()->get_duplication_confirms_to_update();
         if (!confirmed.empty()) {
             req->confirm_list[r->get_gpid()] = std::move(confirmed);
@@ -112,8 +111,8 @@ void 
duplication_sync_timer::on_duplication_sync_reply(error_code err,
 void duplication_sync_timer::update_duplication_map(
     const std::map<int32_t, std::map<int32_t, duplication_entry>> &dup_map)
 {
-    for (replica_ptr &r : get_all_replicas()) {
-        auto it = dup_map.find(r->get_gpid().get_app_id());
+    for (replica_ptr &r : _stub->get_all_replicas()) {
+        const auto &it = dup_map.find(r->get_gpid().get_app_id());
         if (it == dup_map.end()) {
             // no duplication is assigned to this app
             r->get_duplication_manager()->update_duplication_map({});
@@ -127,35 +126,6 @@ 
duplication_sync_timer::duplication_sync_timer(replica_stub *stub) : _stub(stub)
 
 duplication_sync_timer::~duplication_sync_timer() {}
 
-std::vector<replica_ptr> duplication_sync_timer::get_all_primaries()
-{
-    std::vector<replica_ptr> replica_vec;
-    {
-        zauto_read_lock l(_stub->_replicas_lock);
-        for (auto &kv : _stub->_replicas) {
-            replica_ptr r = kv.second;
-            if (r->status() != partition_status::PS_PRIMARY) {
-                continue;
-            }
-            replica_vec.emplace_back(std::move(r));
-        }
-    }
-    return replica_vec;
-}
-
-std::vector<replica_ptr> duplication_sync_timer::get_all_replicas()
-{
-    std::vector<replica_ptr> replica_vec;
-    {
-        zauto_read_lock l(_stub->_replicas_lock);
-        for (auto &kv : _stub->_replicas) {
-            replica_ptr r = kv.second;
-            replica_vec.emplace_back(std::move(r));
-        }
-    }
-    return replica_vec;
-}
-
 void duplication_sync_timer::close()
 {
     LOG_INFO("stop duplication sync");
@@ -191,8 +161,8 @@ duplication_sync_timer::get_dup_states(int app_id, /*out*/ 
bool *app_found)
 {
     *app_found = false;
     std::multimap<dupid_t, replica_dup_state> result;
-    for (const replica_ptr &r : get_all_primaries()) {
-        gpid rid = r->get_gpid();
+    for (const replica_ptr &r : _stub->get_all_primaries()) {
+        const gpid rid = r->get_gpid();
         if (rid.get_app_id() != app_id) {
             continue;
         }
diff --git a/src/replica/duplication/duplication_sync_timer.h 
b/src/replica/duplication/duplication_sync_timer.h
index 48fe2ec95..1a57a7631 100644
--- a/src/replica/duplication/duplication_sync_timer.h
+++ b/src/replica/duplication/duplication_sync_timer.h
@@ -19,13 +19,11 @@
 
 #include <map>
 #include <string>
-#include <vector>
 
 #include "common//duplication_common.h"
 #include "common/gpid.h"
 #include "common/replication_other_types.h"
 #include "duplication_types.h"
-#include "replica/replica.h"
 #include "runtime/task/task.h"
 #include "utils/zlocks.h"
 
@@ -71,10 +69,6 @@ private:
 
     void on_duplication_sync_reply(error_code err, const 
duplication_sync_response &resp);
 
-    std::vector<replica_ptr> get_all_primaries();
-
-    std::vector<replica_ptr> get_all_replicas();
-
 private:
     friend class duplication_sync_timer_test;
 
diff --git a/src/replica/duplication/test/duplication_sync_timer_test.cpp 
b/src/replica/duplication/test/duplication_sync_timer_test.cpp
index c02629e41..7ac1b19a4 100644
--- a/src/replica/duplication/test/duplication_sync_timer_test.cpp
+++ b/src/replica/duplication/test/duplication_sync_timer_test.cpp
@@ -22,6 +22,7 @@
 #include <memory>
 #include <string>
 #include <utility>
+#include <vector>
 
 #include "common/duplication_common.h"
 #include "common/replication.codes.h"
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index 4f6f73c99..25cd5d8b6 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -35,6 +35,7 @@
 #include <chrono>
 #include <cstdint>
 #include <deque>
+#include <iterator>
 #include <mutex>
 #include <ostream>
 #include <set>
@@ -731,6 +732,34 @@ dsn::error_code replica_stub::on_kill_replica(gpid id)
     }
 }
 
+std::vector<replica_ptr> replica_stub::get_all_replicas() const
+{
+    std::vector<replica_ptr> result;
+    {
+        zauto_read_lock l(_replicas_lock);
+        std::transform(_replicas.begin(),
+                       _replicas.end(),
+                       std::back_inserter(result),
+                       [](const std::pair<gpid, replica_ptr> &r) { return 
r.second; });
+    }
+    return result;
+}
+
+std::vector<replica_ptr> replica_stub::get_all_primaries() const
+{
+    std::vector<replica_ptr> result;
+    {
+        zauto_read_lock l(_replicas_lock);
+        for (const auto & [ _, r ] : _replicas) {
+            if (r->status() != partition_status::PS_PRIMARY) {
+                continue;
+            }
+            result.push_back(r);
+        }
+    }
+    return result;
+}
+
 replica_ptr replica_stub::get_replica(gpid id) const
 {
     zauto_read_lock l(_replicas_lock);
diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h
index 0370c7d0c..9e1c09762 100644
--- a/src/replica/replica_stub.h
+++ b/src/replica/replica_stub.h
@@ -195,6 +195,8 @@ public:
     //
     // common routines for inquiry
     //
+    std::vector<replica_ptr> get_all_replicas() const;
+    std::vector<replica_ptr> get_all_primaries() const;
     replica_ptr get_replica(gpid id) const;
     replication_options &options() { return _options; }
     const replication_options &options() const { return _options; }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to