This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new ce267cbd2 refactor(FQDN): further more refator on 
idl/dsn.layer2.thrift v2 (#2217)
ce267cbd2 is described below

commit ce267cbd2ea480a09345eb358143dcbc79b43760
Author: Samunroyu <yujingwe...@gmail.com>
AuthorDate: Wed May 21 23:48:20 2025 +0800

    refactor(FQDN): further more refator on idl/dsn.layer2.thrift v2 (#2217)
    
    This is a further refactor on idl/dsn.layer2.thrift based on
    https://github.com/apache/incubator-pegasus/pull/2049
    the main effects are on partition_configuration structure.
---
 src/meta/backup_engine.cpp                        |  2 +-
 src/meta/duplication/meta_duplication_service.cpp | 11 +++++++----
 src/meta/greedy_load_balancer.cpp                 |  9 +++++++--
 src/meta/meta_backup_service.cpp                  |  2 +-
 src/meta/meta_bulk_load_service.cpp               |  3 ++-
 src/meta/server_state_restore.cpp                 | 10 +++++++++-
 src/replica/bulk_load/replica_bulk_loader.cpp     | 16 ++++++++++++----
 src/replica/replica_context.cpp                   | 16 ++++++++++++----
 src/replica/replica_stub.cpp                      |  8 ++++++--
 9 files changed, 57 insertions(+), 20 deletions(-)

diff --git a/src/meta/backup_engine.cpp b/src/meta/backup_engine.cpp
index 7197f4e48..be5347a2d 100644
--- a/src/meta/backup_engine.cpp
+++ b/src/meta/backup_engine.cpp
@@ -182,7 +182,7 @@ void backup_engine::backup_app_partition(const gpid &pid)
             _is_backup_failed = true;
             return;
         }
-        partition_primary = app->pcs[pid.get_partition_index()].hp_primary;
+        GET_HOST_PORT(app->pcs[pid.get_partition_index()], primary, 
partition_primary);
     }
 
     if (!partition_primary) {
diff --git a/src/meta/duplication/meta_duplication_service.cpp 
b/src/meta/duplication/meta_duplication_service.cpp
index f1d61a9db..971a4beb4 100644
--- a/src/meta/duplication/meta_duplication_service.cpp
+++ b/src/meta/duplication/meta_duplication_service.cpp
@@ -764,7 +764,9 @@ void 
meta_duplication_service::check_follower_app_if_create_completed(
                     query_err = ERR_INCONSISTENT_STATE;
                 } else {
                     for (const auto &pc : resp.partitions) {
-                        if (!pc.hp_primary) {
+                        host_port primary;
+                        GET_HOST_PORT(pc, primary, primary);
+                        if (!primary) {
                             // Fail once the primary replica is unavailable.
                             query_err = ERR_INACTIVE_STATE;
                             break;
@@ -772,13 +774,14 @@ void 
meta_duplication_service::check_follower_app_if_create_completed(
 
                         // Once replica count is more than 1, at least one 
secondary replica
                         // is required.
-                        if (1 + pc.hp_secondaries.size() < 
pc.max_replica_count &&
-                            pc.hp_secondaries.empty()) {
+                        std::vector<host_port> secondaries;
+                        GET_HOST_PORTS(pc, secondaries, secondaries);
+                        if (1 + secondaries.size() < pc.max_replica_count && 
secondaries.empty()) {
                             query_err = ERR_NOT_ENOUGH_MEMBER;
                             break;
                         }
 
-                        for (const auto &secondary : pc.hp_secondaries) {
+                        for (const auto &secondary : secondaries) {
                             if (!secondary) {
                                 // Fail once any secondary replica is 
unavailable.
                                 query_err = ERR_INACTIVE_STATE;
diff --git a/src/meta/greedy_load_balancer.cpp 
b/src/meta/greedy_load_balancer.cpp
index 0b3017eda..2e3304358 100644
--- a/src/meta/greedy_load_balancer.cpp
+++ b/src/meta/greedy_load_balancer.cpp
@@ -147,8 +147,13 @@ bool 
greedy_load_balancer::all_replica_infos_collected(const node_state &ns)
 {
     const auto &n = ns.host_port();
     return ns.for_each_partition([this, n](const dsn::gpid &pid) {
-        config_context &cc = *get_config_context(*(t_global_view->apps), pid);
-        if (cc.find_from_serving(n) == cc.serving.end()) {
+        config_context *ctx = get_config_context(*(t_global_view->apps), pid);
+        if (ctx == nullptr) {
+            LOG_INFO("get_config_context return nullptr for gpid({})", pid);
+            return false;
+        }
+
+        if (ctx->find_from_serving(n) == ctx->serving.end()) {
             LOG_INFO("meta server hasn't collected gpid({})'s info of {}", 
pid, n);
             return false;
         }
diff --git a/src/meta/meta_backup_service.cpp b/src/meta/meta_backup_service.cpp
index 5dd49a6c0..10b641916 100644
--- a/src/meta/meta_backup_service.cpp
+++ b/src/meta/meta_backup_service.cpp
@@ -532,7 +532,7 @@ void policy_context::start_backup_partition_unlocked(gpid 
pid)
                 pid, cold_backup_constant::PROGRESS_FINISHED, 
dsn::host_port());
             return;
         }
-        partition_primary = app->pcs[pid.get_partition_index()].hp_primary;
+        GET_HOST_PORT(app->pcs[pid.get_partition_index()], primary, 
partition_primary);
     }
     if (!partition_primary) {
         LOG_WARNING("{}: partition {} doesn't have a primary now, retry to 
backup it later",
diff --git a/src/meta/meta_bulk_load_service.cpp 
b/src/meta/meta_bulk_load_service.cpp
index e88f696f9..ba7a2556b 100644
--- a/src/meta/meta_bulk_load_service.cpp
+++ b/src/meta/meta_bulk_load_service.cpp
@@ -475,7 +475,8 @@ void 
bulk_load_service::on_partition_bulk_load_reply(error_code err,
     const std::string &app_name = request.app_name;
     const gpid &pid = request.pid;
     const auto &primary_addr = request.primary;
-    const auto &primary_hp = request.hp_primary;
+    host_port primary_hp;
+    GET_HOST_PORT(request, primary, primary_hp);
 
     if (err != ERR_OK) {
         LOG_ERROR("app({}), partition({}) failed to receive bulk load response 
from node({}), "
diff --git a/src/meta/server_state_restore.cpp 
b/src/meta/server_state_restore.cpp
index 746a49b55..976aa7b5f 100644
--- a/src/meta/server_state_restore.cpp
+++ b/src/meta/server_state_restore.cpp
@@ -251,10 +251,18 @@ void 
server_state::on_query_restore_status(configuration_query_restore_rpc rpc)
     for (int32_t i = 0; i < app->partition_count; i++) {
         const auto &r_state = app->helpers->restore_states[i];
         const auto &pc = app->pcs[i];
-        if (pc.hp_primary || !pc.hp_secondaries.empty()) {
+        host_port primary;
+        GET_HOST_PORT(pc, primary, primary);
+        if (primary) {
             // already have primary, restore succeed
             continue;
         }
+
+        std::vector<host_port> secondaries;
+        GET_HOST_PORTS(pc, secondaries, secondaries);
+        if (!secondaries.empty()) {
+            continue;
+        }
         if (r_state.progress < response.restore_progress[i]) {
             response.restore_progress[i] = r_state.progress;
         }
diff --git a/src/replica/bulk_load/replica_bulk_loader.cpp 
b/src/replica/bulk_load/replica_bulk_loader.cpp
index 05d77e859..3ed17b74a 100644
--- a/src/replica/bulk_load/replica_bulk_loader.cpp
+++ b/src/replica/bulk_load/replica_bulk_loader.cpp
@@ -936,10 +936,12 @@ void 
replica_bulk_loader::report_group_download_progress(/*out*/ bulk_load_respo
         primary_state.__set_download_progress(_download_progress.load());
         primary_state.__set_download_status(_download_status.load());
     }
+    host_port primary;
+    GET_HOST_PORT(_replica->_primary_states.pc, primary, primary);
     SET_VALUE_FROM_IP_AND_HOST_PORT(response,
                                     group_bulk_load_state,
                                     _replica->_primary_states.pc.primary,
-                                    _replica->_primary_states.pc.hp_primary,
+                                    primary,
                                     primary_state);
     LOG_INFO_PREFIX("primary = {}, download progress = {}%, status = {}",
                     FMT_HOST_PORT_AND_IP(_replica->_primary_states.pc, 
primary),
@@ -978,10 +980,12 @@ void 
replica_bulk_loader::report_group_ingestion_status(/*out*/ bulk_load_respon
 
     partition_bulk_load_state primary_state;
     primary_state.__set_ingest_status(_replica->_app->get_ingestion_status());
+    host_port primary;
+    GET_HOST_PORT(_replica->_primary_states.pc, primary, primary);
     SET_VALUE_FROM_IP_AND_HOST_PORT(response,
                                     group_bulk_load_state,
                                     _replica->_primary_states.pc.primary,
-                                    _replica->_primary_states.pc.hp_primary,
+                                    primary,
                                     primary_state);
     LOG_INFO_PREFIX("primary = {}, ingestion status = {}",
                     FMT_HOST_PORT_AND_IP(_replica->_primary_states.pc, 
primary),
@@ -1025,10 +1029,12 @@ void 
replica_bulk_loader::report_group_cleaned_up(bulk_load_response &response)
 
     partition_bulk_load_state primary_state;
     primary_state.__set_is_cleaned_up(is_cleaned_up());
+    host_port primary;
+    GET_HOST_PORT(_replica->_primary_states.pc, primary, primary);
     SET_VALUE_FROM_IP_AND_HOST_PORT(response,
                                     group_bulk_load_state,
                                     _replica->_primary_states.pc.primary,
-                                    _replica->_primary_states.pc.hp_primary,
+                                    primary,
                                     primary_state);
     LOG_INFO_PREFIX("primary = {}, bulk load states cleaned_up = {}",
                     FMT_HOST_PORT_AND_IP(_replica->_primary_states.pc, 
primary),
@@ -1064,10 +1070,12 @@ void 
replica_bulk_loader::report_group_is_paused(bulk_load_response &response)
 
     partition_bulk_load_state primary_state;
     primary_state.__set_is_paused(_status == bulk_load_status::BLS_PAUSED);
+    host_port primary;
+    GET_HOST_PORT(_replica->_primary_states.pc, primary, primary);
     SET_VALUE_FROM_IP_AND_HOST_PORT(response,
                                     group_bulk_load_state,
                                     _replica->_primary_states.pc.primary,
-                                    _replica->_primary_states.pc.hp_primary,
+                                    primary,
                                     primary_state);
     LOG_INFO_PREFIX("primary = {}, bulk_load is_paused = {}",
                     FMT_HOST_PORT_AND_IP(_replica->_primary_states.pc, 
primary),
diff --git a/src/replica/replica_context.cpp b/src/replica/replica_context.cpp
index c2b4a2fa6..a4016a5f6 100644
--- a/src/replica/replica_context.cpp
+++ b/src/replica/replica_context.cpp
@@ -134,10 +134,18 @@ void 
primary_context::get_replica_config(partition_status::type st,
 bool primary_context::check_exist(const ::dsn::host_port &node, 
partition_status::type st)
 {
     switch (st) {
-    case partition_status::PS_PRIMARY:
-        return pc.hp_primary == node;
-    case partition_status::PS_SECONDARY:
-        return utils::contains(pc.hp_secondaries, node);
+    case partition_status::PS_PRIMARY: {
+        DCHECK(pc.__isset.hp_primary, "");
+        host_port primary;
+        GET_HOST_PORT(pc, primary, primary);
+        return primary == node;
+    }
+    case partition_status::PS_SECONDARY: {
+        DCHECK(pc.__isset.hp_secondaries, "");
+        std::vector<host_port> secondaries;
+        GET_HOST_PORTS(pc, secondaries, secondaries);
+        return utils::contains(secondaries, node);
+    }
     case partition_status::PS_POTENTIAL_SECONDARY:
         return learners.find(node) != learners.end();
     default:
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index 0003d1e9e..5d64de07f 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -1700,7 +1700,9 @@ void 
replica_stub::on_node_query_reply_scatter(replica_stub_ptr this_,
                                 req.__isset.meta_split_status ? 
req.meta_split_status
                                                               : 
split_status::NOT_SPLIT);
     } else {
-        if (req.config.hp_primary == _primary_host_port) {
+        host_port primary;
+        GET_HOST_PORT(req.config, primary, primary);
+        if (primary == _primary_host_port) {
             LOG_INFO("{}@{}: replica not exists on replica server, which is 
primary, remove it "
                      "from meta server",
                      req.config.pid,
@@ -1751,7 +1753,9 @@ void replica_stub::remove_replica_on_meta_server(const 
app_info &info,
     SET_IP_AND_HOST_PORT(*request, node, primary_address(), 
_primary_host_port);
     request->type = config_type::CT_DOWNGRADE_TO_INACTIVE;
 
-    if (_primary_host_port == pc.hp_primary) {
+    host_port primary;
+    GET_HOST_PORT(pc, primary, primary);
+    if (_primary_host_port == primary) {
         RESET_IP_AND_HOST_PORT(request->config, primary);
     } else if (REMOVE_IP_AND_HOST_PORT(
                    primary_address(), _primary_host_port, request->config, 
secondaries)) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pegasus.apache.org
For additional commands, e-mail: commits-h...@pegasus.apache.org

Reply via email to