This is an automated email from the ASF dual-hosted git repository. laiyingchun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push: new ce267cbd2 refactor(FQDN): further more refator on idl/dsn.layer2.thrift v2 (#2217) ce267cbd2 is described below commit ce267cbd2ea480a09345eb358143dcbc79b43760 Author: Samunroyu <yujingwe...@gmail.com> AuthorDate: Wed May 21 23:48:20 2025 +0800 refactor(FQDN): further more refator on idl/dsn.layer2.thrift v2 (#2217) This is a further refactor on idl/dsn.layer2.thrift based on https://github.com/apache/incubator-pegasus/pull/2049 the main effects are on partition_configuration structure. --- src/meta/backup_engine.cpp | 2 +- src/meta/duplication/meta_duplication_service.cpp | 11 +++++++---- src/meta/greedy_load_balancer.cpp | 9 +++++++-- src/meta/meta_backup_service.cpp | 2 +- src/meta/meta_bulk_load_service.cpp | 3 ++- src/meta/server_state_restore.cpp | 10 +++++++++- src/replica/bulk_load/replica_bulk_loader.cpp | 16 ++++++++++++---- src/replica/replica_context.cpp | 16 ++++++++++++---- src/replica/replica_stub.cpp | 8 ++++++-- 9 files changed, 57 insertions(+), 20 deletions(-) diff --git a/src/meta/backup_engine.cpp b/src/meta/backup_engine.cpp index 7197f4e48..be5347a2d 100644 --- a/src/meta/backup_engine.cpp +++ b/src/meta/backup_engine.cpp @@ -182,7 +182,7 @@ void backup_engine::backup_app_partition(const gpid &pid) _is_backup_failed = true; return; } - partition_primary = app->pcs[pid.get_partition_index()].hp_primary; + GET_HOST_PORT(app->pcs[pid.get_partition_index()], primary, partition_primary); } if (!partition_primary) { diff --git a/src/meta/duplication/meta_duplication_service.cpp b/src/meta/duplication/meta_duplication_service.cpp index f1d61a9db..971a4beb4 100644 --- a/src/meta/duplication/meta_duplication_service.cpp +++ b/src/meta/duplication/meta_duplication_service.cpp @@ -764,7 +764,9 @@ void meta_duplication_service::check_follower_app_if_create_completed( query_err = ERR_INCONSISTENT_STATE; } else { for (const auto &pc : resp.partitions) { - if (!pc.hp_primary) { + host_port primary; + GET_HOST_PORT(pc, primary, primary); + if (!primary) { // Fail once the primary replica is unavailable. query_err = ERR_INACTIVE_STATE; break; @@ -772,13 +774,14 @@ void meta_duplication_service::check_follower_app_if_create_completed( // Once replica count is more than 1, at least one secondary replica // is required. - if (1 + pc.hp_secondaries.size() < pc.max_replica_count && - pc.hp_secondaries.empty()) { + std::vector<host_port> secondaries; + GET_HOST_PORTS(pc, secondaries, secondaries); + if (1 + secondaries.size() < pc.max_replica_count && secondaries.empty()) { query_err = ERR_NOT_ENOUGH_MEMBER; break; } - for (const auto &secondary : pc.hp_secondaries) { + for (const auto &secondary : secondaries) { if (!secondary) { // Fail once any secondary replica is unavailable. query_err = ERR_INACTIVE_STATE; diff --git a/src/meta/greedy_load_balancer.cpp b/src/meta/greedy_load_balancer.cpp index 0b3017eda..2e3304358 100644 --- a/src/meta/greedy_load_balancer.cpp +++ b/src/meta/greedy_load_balancer.cpp @@ -147,8 +147,13 @@ bool greedy_load_balancer::all_replica_infos_collected(const node_state &ns) { const auto &n = ns.host_port(); return ns.for_each_partition([this, n](const dsn::gpid &pid) { - config_context &cc = *get_config_context(*(t_global_view->apps), pid); - if (cc.find_from_serving(n) == cc.serving.end()) { + config_context *ctx = get_config_context(*(t_global_view->apps), pid); + if (ctx == nullptr) { + LOG_INFO("get_config_context return nullptr for gpid({})", pid); + return false; + } + + if (ctx->find_from_serving(n) == ctx->serving.end()) { LOG_INFO("meta server hasn't collected gpid({})'s info of {}", pid, n); return false; } diff --git a/src/meta/meta_backup_service.cpp b/src/meta/meta_backup_service.cpp index 5dd49a6c0..10b641916 100644 --- a/src/meta/meta_backup_service.cpp +++ b/src/meta/meta_backup_service.cpp @@ -532,7 +532,7 @@ void policy_context::start_backup_partition_unlocked(gpid pid) pid, cold_backup_constant::PROGRESS_FINISHED, dsn::host_port()); return; } - partition_primary = app->pcs[pid.get_partition_index()].hp_primary; + GET_HOST_PORT(app->pcs[pid.get_partition_index()], primary, partition_primary); } if (!partition_primary) { LOG_WARNING("{}: partition {} doesn't have a primary now, retry to backup it later", diff --git a/src/meta/meta_bulk_load_service.cpp b/src/meta/meta_bulk_load_service.cpp index e88f696f9..ba7a2556b 100644 --- a/src/meta/meta_bulk_load_service.cpp +++ b/src/meta/meta_bulk_load_service.cpp @@ -475,7 +475,8 @@ void bulk_load_service::on_partition_bulk_load_reply(error_code err, const std::string &app_name = request.app_name; const gpid &pid = request.pid; const auto &primary_addr = request.primary; - const auto &primary_hp = request.hp_primary; + host_port primary_hp; + GET_HOST_PORT(request, primary, primary_hp); if (err != ERR_OK) { LOG_ERROR("app({}), partition({}) failed to receive bulk load response from node({}), " diff --git a/src/meta/server_state_restore.cpp b/src/meta/server_state_restore.cpp index 746a49b55..976aa7b5f 100644 --- a/src/meta/server_state_restore.cpp +++ b/src/meta/server_state_restore.cpp @@ -251,10 +251,18 @@ void server_state::on_query_restore_status(configuration_query_restore_rpc rpc) for (int32_t i = 0; i < app->partition_count; i++) { const auto &r_state = app->helpers->restore_states[i]; const auto &pc = app->pcs[i]; - if (pc.hp_primary || !pc.hp_secondaries.empty()) { + host_port primary; + GET_HOST_PORT(pc, primary, primary); + if (primary) { // already have primary, restore succeed continue; } + + std::vector<host_port> secondaries; + GET_HOST_PORTS(pc, secondaries, secondaries); + if (!secondaries.empty()) { + continue; + } if (r_state.progress < response.restore_progress[i]) { response.restore_progress[i] = r_state.progress; } diff --git a/src/replica/bulk_load/replica_bulk_loader.cpp b/src/replica/bulk_load/replica_bulk_loader.cpp index 05d77e859..3ed17b74a 100644 --- a/src/replica/bulk_load/replica_bulk_loader.cpp +++ b/src/replica/bulk_load/replica_bulk_loader.cpp @@ -936,10 +936,12 @@ void replica_bulk_loader::report_group_download_progress(/*out*/ bulk_load_respo primary_state.__set_download_progress(_download_progress.load()); primary_state.__set_download_status(_download_status.load()); } + host_port primary; + GET_HOST_PORT(_replica->_primary_states.pc, primary, primary); SET_VALUE_FROM_IP_AND_HOST_PORT(response, group_bulk_load_state, _replica->_primary_states.pc.primary, - _replica->_primary_states.pc.hp_primary, + primary, primary_state); LOG_INFO_PREFIX("primary = {}, download progress = {}%, status = {}", FMT_HOST_PORT_AND_IP(_replica->_primary_states.pc, primary), @@ -978,10 +980,12 @@ void replica_bulk_loader::report_group_ingestion_status(/*out*/ bulk_load_respon partition_bulk_load_state primary_state; primary_state.__set_ingest_status(_replica->_app->get_ingestion_status()); + host_port primary; + GET_HOST_PORT(_replica->_primary_states.pc, primary, primary); SET_VALUE_FROM_IP_AND_HOST_PORT(response, group_bulk_load_state, _replica->_primary_states.pc.primary, - _replica->_primary_states.pc.hp_primary, + primary, primary_state); LOG_INFO_PREFIX("primary = {}, ingestion status = {}", FMT_HOST_PORT_AND_IP(_replica->_primary_states.pc, primary), @@ -1025,10 +1029,12 @@ void replica_bulk_loader::report_group_cleaned_up(bulk_load_response &response) partition_bulk_load_state primary_state; primary_state.__set_is_cleaned_up(is_cleaned_up()); + host_port primary; + GET_HOST_PORT(_replica->_primary_states.pc, primary, primary); SET_VALUE_FROM_IP_AND_HOST_PORT(response, group_bulk_load_state, _replica->_primary_states.pc.primary, - _replica->_primary_states.pc.hp_primary, + primary, primary_state); LOG_INFO_PREFIX("primary = {}, bulk load states cleaned_up = {}", FMT_HOST_PORT_AND_IP(_replica->_primary_states.pc, primary), @@ -1064,10 +1070,12 @@ void replica_bulk_loader::report_group_is_paused(bulk_load_response &response) partition_bulk_load_state primary_state; primary_state.__set_is_paused(_status == bulk_load_status::BLS_PAUSED); + host_port primary; + GET_HOST_PORT(_replica->_primary_states.pc, primary, primary); SET_VALUE_FROM_IP_AND_HOST_PORT(response, group_bulk_load_state, _replica->_primary_states.pc.primary, - _replica->_primary_states.pc.hp_primary, + primary, primary_state); LOG_INFO_PREFIX("primary = {}, bulk_load is_paused = {}", FMT_HOST_PORT_AND_IP(_replica->_primary_states.pc, primary), diff --git a/src/replica/replica_context.cpp b/src/replica/replica_context.cpp index c2b4a2fa6..a4016a5f6 100644 --- a/src/replica/replica_context.cpp +++ b/src/replica/replica_context.cpp @@ -134,10 +134,18 @@ void primary_context::get_replica_config(partition_status::type st, bool primary_context::check_exist(const ::dsn::host_port &node, partition_status::type st) { switch (st) { - case partition_status::PS_PRIMARY: - return pc.hp_primary == node; - case partition_status::PS_SECONDARY: - return utils::contains(pc.hp_secondaries, node); + case partition_status::PS_PRIMARY: { + DCHECK(pc.__isset.hp_primary, ""); + host_port primary; + GET_HOST_PORT(pc, primary, primary); + return primary == node; + } + case partition_status::PS_SECONDARY: { + DCHECK(pc.__isset.hp_secondaries, ""); + std::vector<host_port> secondaries; + GET_HOST_PORTS(pc, secondaries, secondaries); + return utils::contains(secondaries, node); + } case partition_status::PS_POTENTIAL_SECONDARY: return learners.find(node) != learners.end(); default: diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp index 0003d1e9e..5d64de07f 100644 --- a/src/replica/replica_stub.cpp +++ b/src/replica/replica_stub.cpp @@ -1700,7 +1700,9 @@ void replica_stub::on_node_query_reply_scatter(replica_stub_ptr this_, req.__isset.meta_split_status ? req.meta_split_status : split_status::NOT_SPLIT); } else { - if (req.config.hp_primary == _primary_host_port) { + host_port primary; + GET_HOST_PORT(req.config, primary, primary); + if (primary == _primary_host_port) { LOG_INFO("{}@{}: replica not exists on replica server, which is primary, remove it " "from meta server", req.config.pid, @@ -1751,7 +1753,9 @@ void replica_stub::remove_replica_on_meta_server(const app_info &info, SET_IP_AND_HOST_PORT(*request, node, primary_address(), _primary_host_port); request->type = config_type::CT_DOWNGRADE_TO_INACTIVE; - if (_primary_host_port == pc.hp_primary) { + host_port primary; + GET_HOST_PORT(pc, primary, primary); + if (_primary_host_port == primary) { RESET_IP_AND_HOST_PORT(request->config, primary); } else if (REMOVE_IP_AND_HOST_PORT( primary_address(), _primary_host_port, request->config, secondaries)) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pegasus.apache.org For additional commands, e-mail: commits-h...@pegasus.apache.org