acelyc111 commented on code in PR #1658:
URL:
https://github.com/apache/incubator-pegasus/pull/1658#discussion_r1423542520
##########
run.sh:
##########
@@ -27,7 +27,7 @@ export REPORT_DIR="$ROOT/test_report"
export THIRDPARTY_ROOT=$ROOT/thirdparty
export
LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server:${BUILD_LATEST_DIR}/output/lib:${THIRDPARTY_ROOT}/output/lib:${LD_LIBRARY_PATH}
# Disable AddressSanitizerOneDefinitionRuleViolation, see
https://github.com/google/sanitizers/issues/1017 for details.
-export ASAN_OPTIONS=detect_odr_violation=0
+export
ASAN_OPTIONS=detect_odr_violation=0:abort_on_error=1:disable_coredump=0:unmap_shadow_on_exit=1
Review Comment:
Add some comments to explain why add the new options.
And it would be better to output the ASAN_OPTIONS variable when running the
script because it overwrite the enviroment variable.
##########
run.sh:
##########
@@ -467,7 +467,8 @@ function run_test()
m_count=3
if [ "${module}" == "recovery_test" ]; then
m_count=1
-
opts="meta_state_service_type=meta_state_service_simple,distributed_lock_service_type=distributed_lock_service_simple"
+ fqdn=`hostname -f`
+
opts="server_list=$fqdn:34601,meta_state_service_type=meta_state_service_simple,distributed_lock_service_type=distributed_lock_service_simple"
Review Comment:
Add comments to explain why change the server_list only when run
recovery_test.
##########
src/client/partition_resolver_simple.cpp:
##########
@@ -411,20 +414,21 @@ void
partition_resolver_simple::handle_pending_requests(std::deque<request_conte
}
/*search in cache*/
-rpc_address partition_resolver_simple::get_address(const
partition_configuration &config) const
+host_port partition_resolver_simple::get_host_port(const
partition_configuration &config) const
{
+ const auto &pc = config;
Review Comment:
How about renaming the parameter `config` to `pc` directly?
##########
src/runtime/rpc/rpc_holder.h:
##########
@@ -111,7 +111,7 @@ class rpc_holder
bool is_initialized() const { return bool(_i); }
- const TRequest &request() const
+ TRequest &request() const
Review Comment:
Why remove 'const'?
##########
run.sh:
##########
@@ -487,6 +488,19 @@ function run_test()
run_start_zk
fi
pushd ${BUILD_LATEST_DIR}/bin/${module}
+ local function_tests=(
+ backup_restore_test
+ recovery_test
+ restore_test
+ base_api_test
+ throttle_test
+ bulk_load_test
+ detect_hotspot_test
+ partition_split_test
+ )
+ if [[ "${function_tests[@]}" =~ "${module}" ]]; then
Review Comment:
Add comments to explain why change LOCAL_HOSTNAME only when run
function_tests.
##########
src/runtime/rpc/rpc_message.h:
##########
@@ -137,6 +138,8 @@ class message_ex : public ref_counter, public
extensible_object<message_ex, 4>
rpc_session_ptr io_session; // send/recv session
rpc_address to_address; // always ipv4/v6 address, it is the to_node's
net address
rpc_address server_address; // used by requests, and may be of uri/group
address
+ host_port to_host_port;
Review Comment:
Add some comments please.
##########
src/runtime/rpc/network.h:
##########
@@ -327,6 +330,7 @@ class rpc_session : public ref_counter
// constant info
connection_oriented_network &_net;
dsn::rpc_address _remote_addr;
Review Comment:
Is it possible to remove `_remote_addr`?
##########
src/runtime/rpc/asio_net_provider.h:
##########
@@ -93,6 +95,7 @@ class asio_network_provider : public
connection_oriented_network
std::vector<std::unique_ptr<boost::asio::io_service>> _io_services;
std::vector<std::shared_ptr<std::thread>> _workers;
::dsn::rpc_address _address;
Review Comment:
Is it possible to safe remove `_address`?
##########
src/common/replication_common.cpp:
##########
@@ -165,33 +168,46 @@ int32_t
replication_options::app_mutation_2pc_min_replica_count(int32_t app_max_
}
}
+/*static*/ bool replica_helper::remove_node(::dsn::host_port node,
Review Comment:
The two functions can be replaced by a template function to remove duplicate
code.
##########
src/runtime/rpc/rpc_host_port.h:
##########
@@ -67,6 +76,8 @@ class host_port
std::string to_string() const;
+ const char *get_char_ctr() const;
Review Comment:
what does `ctr` mean?
##########
src/runtime/rpc/network.sim.h:
##########
@@ -109,6 +111,7 @@ class sim_network_provider : public
connection_oriented_network
private:
::dsn::rpc_address _address;
Review Comment:
Is it possible to remove `_address`?
##########
src/meta/load_balance_policy.h:
##########
@@ -283,10 +290,11 @@ class copy_primary_operation : public
copy_replica_operation
copy_primary_operation(const std::shared_ptr<app_state> app,
const app_mapper &apps,
node_mapper &nodes,
- const std::vector<dsn::rpc_address> &address_vec,
- const std::unordered_map<dsn::rpc_address, int>
&address_id,
+ const std::vector<dsn::host_port> &address_vec,
+ const std::unordered_map<dsn::host_port, int>
&address_id,
Review Comment:
```suggestion
const std::vector<dsn::host_port> &host_port_vec,
const std::unordered_map<dsn::host_port, int>
&host_port_id,
```
##########
src/runtime/api_layer1.h:
##########
@@ -215,6 +216,8 @@ replace the underneath implementation of the network (e.g.,
RDMA, simulated netw
extern dsn::rpc_address dsn_primary_address();
Review Comment:
Is it possible to safe remove `dsn_primary_address()`?
##########
src/meta/meta_bulk_load_service.cpp:
##########
@@ -438,8 +442,9 @@ void bulk_load_service::partition_bulk_load(const
std::string &app_name, const g
req->remote_root_path = ainfo.remote_root_path;
}
- LOG_INFO("send bulk load request to node({}), app({}), partition({}),
partition "
+ LOG_INFO("send bulk load request to node({}({})), app({}), partition({}),
partition "
"status = {}, remote provider = {}, cluster_name = {},
remote_root_path = {}",
+ primary_hp.to_string(),
primary_addr.to_string(),
Review Comment:
```suggestion
primary_hp,
primary_addr,
```
##########
src/meta/load_balance_policy.h:
##########
@@ -268,11 +273,13 @@ class copy_replica_operation
const std::shared_ptr<app_state> _app;
const app_mapper &_apps;
node_mapper &_nodes;
- const std::vector<dsn::rpc_address> &_address_vec;
- const std::unordered_map<dsn::rpc_address, int> &_address_id;
- std::unordered_map<dsn::rpc_address, disk_load> _node_loads;
+ const std::vector<dsn::host_port> &_address_vec;
+ const std::unordered_map<dsn::host_port, int> &_address_id;
Review Comment:
```suggestion
const std::vector<dsn::host_port> &_host_port_vec;
const std::unordered_map<dsn::host_port, int> &_host_port_id;
```
##########
src/meta/meta_bulk_load_service.cpp:
##########
@@ -450,6 +455,18 @@ void bulk_load_service::partition_bulk_load(const
std::string &app_name, const g
bulk_load_rpc rpc(std::move(req), RPC_BULK_LOAD, 0_ms, 0,
pid.thread_hash());
rpc.call(primary_addr, _meta_svc->tracker(), [this, rpc](error_code err)
mutable {
+
+ // fill host_port struct if needed
Review Comment:
Note the indent.
##########
src/meta/load_balance_policy.cpp:
##########
@@ -141,38 +146,40 @@ generate_balancer_request(const app_mapper &apps,
case balance_type::MOVE_PRIMARY:
ans = "move_primary";
result.balance_type = balancer_request_type::move_primary;
+ result.action_list.emplace_back(new_proposal_action(
+ from, from, hp_from, hp_from,
config_type::CT_DOWNGRADE_TO_SECONDARY));
Review Comment:
Same, just to pass host ports parameters.
##########
src/meta/meta_bulk_load_service.cpp:
##########
@@ -450,6 +455,18 @@ void bulk_load_service::partition_bulk_load(const
std::string &app_name, const g
bulk_load_rpc rpc(std::move(req), RPC_BULK_LOAD, 0_ms, 0,
pid.thread_hash());
rpc.call(primary_addr, _meta_svc->tracker(), [this, rpc](error_code err)
mutable {
+
+ // fill host_port struct if needed
Review Comment:
The comment seems not enough, you need to mention the remote server doesn't
support FQDN?
##########
src/meta/meta_bulk_load_service.cpp:
##########
@@ -461,25 +478,28 @@ void
bulk_load_service::on_partition_bulk_load_reply(error_code err,
{
const std::string &app_name = request.app_name;
const gpid &pid = request.pid;
- const rpc_address &primary_addr = request.primary_addr;
+ const auto &primary_addr = request.primary_addr;
+ const auto &primary_hp = request.hp_primary_addr;
if (err != ERR_OK) {
- LOG_ERROR(
- "app({}), partition({}) failed to receive bulk load response from
node({}), error = {}",
- app_name,
- pid,
- primary_addr.to_string(),
- err.to_string());
+ LOG_ERROR("app({}), partition({}) failed to receive bulk load response
from node({}({})), "
+ "error = {}",
+ app_name,
+ pid,
+ primary_hp.to_string(),
+ primary_addr.to_string(),
+ err.to_string());
Review Comment:
```suggestion
primary_hp,
primary_addr,
err);
```
##########
src/meta/meta_bulk_load_service.cpp:
##########
@@ -461,25 +478,28 @@ void
bulk_load_service::on_partition_bulk_load_reply(error_code err,
{
const std::string &app_name = request.app_name;
const gpid &pid = request.pid;
- const rpc_address &primary_addr = request.primary_addr;
+ const auto &primary_addr = request.primary_addr;
+ const auto &primary_hp = request.hp_primary_addr;
if (err != ERR_OK) {
- LOG_ERROR(
- "app({}), partition({}) failed to receive bulk load response from
node({}), error = {}",
- app_name,
- pid,
- primary_addr.to_string(),
- err.to_string());
+ LOG_ERROR("app({}), partition({}) failed to receive bulk load response
from node({}({})), "
+ "error = {}",
+ app_name,
+ pid,
+ primary_hp.to_string(),
+ primary_addr.to_string(),
+ err.to_string());
try_rollback_to_downloading(app_name, pid);
try_resend_bulk_load_request(app_name, pid);
return;
}
if (response.err == ERR_OBJECT_NOT_FOUND || response.err ==
ERR_INVALID_STATE) {
LOG_ERROR(
- "app({}), partition({}) doesn't exist or has invalid state on
node({}), error = {}",
+ "app({}), partition({}) doesn't exist or has invalid state on
node({}({})), error = {}",
app_name,
pid,
+ primary_hp.to_string(),
primary_addr.to_string(),
response.err.to_string());
Review Comment:
```suggestion
primary_hp,
primary_addr,
response.err);
```
##########
src/meta/meta_bulk_load_service.cpp:
##########
@@ -1600,9 +1623,18 @@ void
bulk_load_service::on_query_bulk_load_status(query_bulk_load_rpc rpc)
}
response.bulk_load_states.resize(partition_count);
+ response.__set_hp_bulk_load_states(
+ std::vector<std::map<host_port,
partition_bulk_load_state>>(partition_count));
for (const auto &kv : _partitions_bulk_load_state) {
if (kv.first.get_app_id() == app_id) {
- response.bulk_load_states[kv.first.get_partition_index()] =
kv.second;
+ auto gpid = kv.first.get_partition_index();
Review Comment:
gpid is a type name, it's better to avoid using it as a variable name.
```suggestion
auto pidx = kv.first.get_partition_index();
```
##########
src/meta/meta_bulk_load_service.cpp:
##########
@@ -499,13 +520,15 @@ void
bulk_load_service::on_partition_bulk_load_reply(error_code err,
}
if (response.err != ERR_OK) {
- LOG_ERROR("app({}), partition({}) from node({}) handle bulk load
response failed, error = "
- "{}, primary status = {}",
- app_name,
- pid,
- primary_addr.to_string(),
- response.err.to_string(),
- dsn::enum_to_string(response.primary_bulk_load_status));
+ LOG_ERROR(
+ "app({}), partition({}) from node({}({})) handle bulk load
response failed, error = "
+ "{}, primary status = {}",
+ app_name,
+ pid,
+ primary_hp.to_string(),
+ primary_addr.to_string(),
+ response.err.to_string(),
Review Comment:
```suggestion
primary_hp,
primary_addr,
response.err,
```
##########
src/meta/meta_bulk_load_service.cpp:
##########
@@ -489,8 +509,9 @@ void
bulk_load_service::on_partition_bulk_load_reply(error_code err,
if (response.err == ERR_BUSY) {
LOG_WARNING(
- "node({}) has enough replicas downloading, wait for next round to
send bulk load "
+ "node({}({})) has enough replicas downloading, wait for next round
to send bulk load "
"request for app({}), partition({})",
+ primary_hp.to_string(),
primary_addr.to_string(),
Review Comment:
```suggestion
primary_hp,
primary_addr,
```
##########
src/client/partition_resolver.cpp:
##########
@@ -124,7 +127,7 @@ void partition_resolver::call_task(const
rpc_response_task_ptr &t)
}
hdr.gpid = result.pid;
}
- dsn_rpc_call(result.address, t.get());
+ dsn_rpc_call(this->_dns_resolver->resolve_address(result.hp),
t.get());
Review Comment:
@GehaFearless Have you improved the code according to my reply?
##########
src/meta/cluster_balance_policy.cpp:
##########
@@ -544,10 +545,21 @@ bool cluster_balance_policy::apply_move(const move_info
&move,
// add into migration list and selected_pid
partition_configuration pc;
pc.pid = move.pid;
- pc.primary = primary_addr;
- list[move.pid] = generate_balancer_request(*_global_view->apps, pc,
move.type, source, target);
+ pc.hp_primary = primary_hp;
+ std::shared_ptr<dsn::dns_resolver> resolver;
+ if (_svc == nullptr) {
+ resolver = std::make_shared<dsn::dns_resolver>();
Review Comment:
How about creating a valid meta_service there and pass it to this function,
to ensure consistence.
##########
src/meta/load_balance_policy.h:
##########
@@ -244,8 +248,9 @@ class copy_replica_operation
copy_replica_operation(const std::shared_ptr<app_state> app,
const app_mapper &apps,
node_mapper &nodes,
- const std::vector<dsn::rpc_address> &address_vec,
- const std::unordered_map<dsn::rpc_address, int>
&address_id);
+ const std::vector<dsn::host_port> &address_vec,
+ const std::unordered_map<dsn::host_port, int>
&address_id,
Review Comment:
```suggestion
const std::vector<dsn::host_port> &host_port_vec,
const std::unordered_map<dsn::host_port, int>
&host_port_id,
```
##########
src/meta/meta_data.cpp:
##########
@@ -151,12 +136,16 @@ bool construct_replica(meta_view view, const gpid &pid,
int max_replica_count)
// we put max_replica_count-1 recent replicas to last_drops, in case of
the DDD-state when the
// only primary dead
// when add node to pc.last_drops, we don't remove it from our cc.drop_list
- CHECK(pc.last_drops.empty(), "last_drops of partition({}) must be empty",
pid);
+ CHECK(pc.hp_last_drops.empty(),
+ "last_drops of partition({}.{}) must be empty",
+ pid.get_app_id(),
+ pid.get_partition_index());
Review Comment:
```suggestion
"last_drops of partition({}) must be empty",
pid);
```
##########
src/meta/load_balance_policy.cpp:
##########
@@ -128,7 +131,9 @@ generate_balancer_request(const app_mapper &apps,
const partition_configuration &pc,
const balance_type &type,
const rpc_address &from,
- const rpc_address &to)
+ const rpc_address &to,
+ const host_port &hp_from,
+ const host_port &hp_to)
Review Comment:
Is it enough to pass just hp_from and hp_to? The ip addresses will be
resolved internally.
##########
src/runtime/rpc/rpc_engine.h:
##########
@@ -202,6 +204,7 @@ class rpc_engine
std::unordered_map<int, std::vector<std::unique_ptr<network>>>
_server_nets; // <port, <CHANNEL, network*>>
::dsn::rpc_address _local_primary_address;
Review Comment:
Is it possible to remove `_local_primary_address`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]