acelyc111 commented on code in PR #1658:
URL: 
https://github.com/apache/incubator-pegasus/pull/1658#discussion_r1423542520


##########
run.sh:
##########
@@ -27,7 +27,7 @@ export REPORT_DIR="$ROOT/test_report"
 export THIRDPARTY_ROOT=$ROOT/thirdparty
 export 
LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server:${BUILD_LATEST_DIR}/output/lib:${THIRDPARTY_ROOT}/output/lib:${LD_LIBRARY_PATH}
 # Disable AddressSanitizerOneDefinitionRuleViolation, see 
https://github.com/google/sanitizers/issues/1017 for details.
-export ASAN_OPTIONS=detect_odr_violation=0
+export 
ASAN_OPTIONS=detect_odr_violation=0:abort_on_error=1:disable_coredump=0:unmap_shadow_on_exit=1

Review Comment:
   Add some comments to explain why add the new options.
   And it would be better to output the ASAN_OPTIONS variable when running the 
script because it overwrite the enviroment variable.



##########
run.sh:
##########
@@ -467,7 +467,8 @@ function run_test()
             m_count=3
             if [ "${module}" == "recovery_test" ]; then
                 m_count=1
-                
opts="meta_state_service_type=meta_state_service_simple,distributed_lock_service_type=distributed_lock_service_simple"
+                fqdn=`hostname -f`
+                
opts="server_list=$fqdn:34601,meta_state_service_type=meta_state_service_simple,distributed_lock_service_type=distributed_lock_service_simple"

Review Comment:
   Add comments to explain why change the server_list only when run 
recovery_test.



##########
src/client/partition_resolver_simple.cpp:
##########
@@ -411,20 +414,21 @@ void 
partition_resolver_simple::handle_pending_requests(std::deque<request_conte
 }
 
 /*search in cache*/
-rpc_address partition_resolver_simple::get_address(const 
partition_configuration &config) const
+host_port partition_resolver_simple::get_host_port(const 
partition_configuration &config) const
 {
+    const auto &pc = config;

Review Comment:
   How about renaming the parameter `config` to `pc` directly?



##########
src/runtime/rpc/rpc_holder.h:
##########
@@ -111,7 +111,7 @@ class rpc_holder
 
     bool is_initialized() const { return bool(_i); }
 
-    const TRequest &request() const
+    TRequest &request() const

Review Comment:
   Why remove 'const'?



##########
run.sh:
##########
@@ -487,6 +488,19 @@ function run_test()
             run_start_zk
         fi
         pushd ${BUILD_LATEST_DIR}/bin/${module}
+        local function_tests=(
+             backup_restore_test
+             recovery_test
+             restore_test
+             base_api_test
+             throttle_test
+             bulk_load_test
+             detect_hotspot_test
+             partition_split_test
+        )
+        if [[ "${function_tests[@]}"  =~ "${module}" ]]; then

Review Comment:
   Add comments to explain why change LOCAL_HOSTNAME only when run 
function_tests.



##########
src/runtime/rpc/rpc_message.h:
##########
@@ -137,6 +138,8 @@ class message_ex : public ref_counter, public 
extensible_object<message_ex, 4>
     rpc_session_ptr io_session; // send/recv session
     rpc_address to_address;     // always ipv4/v6 address, it is the to_node's 
net address
     rpc_address server_address; // used by requests, and may be of uri/group 
address
+    host_port to_host_port;

Review Comment:
   Add some comments please.



##########
src/runtime/rpc/network.h:
##########
@@ -327,6 +330,7 @@ class rpc_session : public ref_counter
     // constant info
     connection_oriented_network &_net;
     dsn::rpc_address _remote_addr;

Review Comment:
   Is it possible to remove `_remote_addr`?



##########
src/runtime/rpc/asio_net_provider.h:
##########
@@ -93,6 +95,7 @@ class asio_network_provider : public 
connection_oriented_network
     std::vector<std::unique_ptr<boost::asio::io_service>> _io_services;
     std::vector<std::shared_ptr<std::thread>> _workers;
     ::dsn::rpc_address _address;

Review Comment:
   Is it possible to safe remove `_address`?



##########
src/common/replication_common.cpp:
##########
@@ -165,33 +168,46 @@ int32_t 
replication_options::app_mutation_2pc_min_replica_count(int32_t app_max_
     }
 }
 
+/*static*/ bool replica_helper::remove_node(::dsn::host_port node,

Review Comment:
   The two functions can be replaced by a template function to remove duplicate 
code.



##########
src/runtime/rpc/rpc_host_port.h:
##########
@@ -67,6 +76,8 @@ class host_port
 
     std::string to_string() const;
 
+    const char *get_char_ctr() const;

Review Comment:
   what does `ctr` mean?



##########
src/runtime/rpc/network.sim.h:
##########
@@ -109,6 +111,7 @@ class sim_network_provider : public 
connection_oriented_network
 
 private:
     ::dsn::rpc_address _address;

Review Comment:
   Is it possible to remove `_address`?



##########
src/meta/load_balance_policy.h:
##########
@@ -283,10 +290,11 @@ class copy_primary_operation : public 
copy_replica_operation
     copy_primary_operation(const std::shared_ptr<app_state> app,
                            const app_mapper &apps,
                            node_mapper &nodes,
-                           const std::vector<dsn::rpc_address> &address_vec,
-                           const std::unordered_map<dsn::rpc_address, int> 
&address_id,
+                           const std::vector<dsn::host_port> &address_vec,
+                           const std::unordered_map<dsn::host_port, int> 
&address_id,

Review Comment:
   ```suggestion
                              const std::vector<dsn::host_port> &host_port_vec,
                              const std::unordered_map<dsn::host_port, int> 
&host_port_id,
   ```



##########
src/runtime/api_layer1.h:
##########
@@ -215,6 +216,8 @@ replace the underneath implementation of the network (e.g., 
RDMA, simulated netw
 
 extern dsn::rpc_address dsn_primary_address();

Review Comment:
   Is it possible to safe remove `dsn_primary_address()`?



##########
src/meta/meta_bulk_load_service.cpp:
##########
@@ -438,8 +442,9 @@ void bulk_load_service::partition_bulk_load(const 
std::string &app_name, const g
         req->remote_root_path = ainfo.remote_root_path;
     }
 
-    LOG_INFO("send bulk load request to node({}), app({}), partition({}), 
partition "
+    LOG_INFO("send bulk load request to node({}({})), app({}), partition({}), 
partition "
              "status = {}, remote provider = {}, cluster_name = {}, 
remote_root_path = {}",
+             primary_hp.to_string(),
              primary_addr.to_string(),

Review Comment:
   ```suggestion
                primary_hp,
                primary_addr,
   ```



##########
src/meta/load_balance_policy.h:
##########
@@ -268,11 +273,13 @@ class copy_replica_operation
     const std::shared_ptr<app_state> _app;
     const app_mapper &_apps;
     node_mapper &_nodes;
-    const std::vector<dsn::rpc_address> &_address_vec;
-    const std::unordered_map<dsn::rpc_address, int> &_address_id;
-    std::unordered_map<dsn::rpc_address, disk_load> _node_loads;
+    const std::vector<dsn::host_port> &_address_vec;
+    const std::unordered_map<dsn::host_port, int> &_address_id;

Review Comment:
   ```suggestion
       const std::vector<dsn::host_port> &_host_port_vec;
       const std::unordered_map<dsn::host_port, int> &_host_port_id;
   ```



##########
src/meta/meta_bulk_load_service.cpp:
##########
@@ -450,6 +455,18 @@ void bulk_load_service::partition_bulk_load(const 
std::string &app_name, const g
 
     bulk_load_rpc rpc(std::move(req), RPC_BULK_LOAD, 0_ms, 0, 
pid.thread_hash());
     rpc.call(primary_addr, _meta_svc->tracker(), [this, rpc](error_code err) 
mutable {
+
+        // fill host_port struct if needed

Review Comment:
   Note the indent.



##########
src/meta/load_balance_policy.cpp:
##########
@@ -141,38 +146,40 @@ generate_balancer_request(const app_mapper &apps,
     case balance_type::MOVE_PRIMARY:
         ans = "move_primary";
         result.balance_type = balancer_request_type::move_primary;
+        result.action_list.emplace_back(new_proposal_action(
+            from, from, hp_from, hp_from, 
config_type::CT_DOWNGRADE_TO_SECONDARY));

Review Comment:
   Same, just to pass host ports parameters.



##########
src/meta/meta_bulk_load_service.cpp:
##########
@@ -450,6 +455,18 @@ void bulk_load_service::partition_bulk_load(const 
std::string &app_name, const g
 
     bulk_load_rpc rpc(std::move(req), RPC_BULK_LOAD, 0_ms, 0, 
pid.thread_hash());
     rpc.call(primary_addr, _meta_svc->tracker(), [this, rpc](error_code err) 
mutable {
+
+        // fill host_port struct if needed

Review Comment:
   The comment seems not enough, you need to mention the remote server doesn't 
support FQDN?



##########
src/meta/meta_bulk_load_service.cpp:
##########
@@ -461,25 +478,28 @@ void 
bulk_load_service::on_partition_bulk_load_reply(error_code err,
 {
     const std::string &app_name = request.app_name;
     const gpid &pid = request.pid;
-    const rpc_address &primary_addr = request.primary_addr;
+    const auto &primary_addr = request.primary_addr;
+    const auto &primary_hp = request.hp_primary_addr;
 
     if (err != ERR_OK) {
-        LOG_ERROR(
-            "app({}), partition({}) failed to receive bulk load response from 
node({}), error = {}",
-            app_name,
-            pid,
-            primary_addr.to_string(),
-            err.to_string());
+        LOG_ERROR("app({}), partition({}) failed to receive bulk load response 
from node({}({})), "
+                  "error = {}",
+                  app_name,
+                  pid,
+                  primary_hp.to_string(),
+                  primary_addr.to_string(),
+                  err.to_string());

Review Comment:
   ```suggestion
                     primary_hp,
                     primary_addr,
                     err);
   ```



##########
src/meta/meta_bulk_load_service.cpp:
##########
@@ -461,25 +478,28 @@ void 
bulk_load_service::on_partition_bulk_load_reply(error_code err,
 {
     const std::string &app_name = request.app_name;
     const gpid &pid = request.pid;
-    const rpc_address &primary_addr = request.primary_addr;
+    const auto &primary_addr = request.primary_addr;
+    const auto &primary_hp = request.hp_primary_addr;
 
     if (err != ERR_OK) {
-        LOG_ERROR(
-            "app({}), partition({}) failed to receive bulk load response from 
node({}), error = {}",
-            app_name,
-            pid,
-            primary_addr.to_string(),
-            err.to_string());
+        LOG_ERROR("app({}), partition({}) failed to receive bulk load response 
from node({}({})), "
+                  "error = {}",
+                  app_name,
+                  pid,
+                  primary_hp.to_string(),
+                  primary_addr.to_string(),
+                  err.to_string());
         try_rollback_to_downloading(app_name, pid);
         try_resend_bulk_load_request(app_name, pid);
         return;
     }
 
     if (response.err == ERR_OBJECT_NOT_FOUND || response.err == 
ERR_INVALID_STATE) {
         LOG_ERROR(
-            "app({}), partition({}) doesn't exist or has invalid state on 
node({}), error = {}",
+            "app({}), partition({}) doesn't exist or has invalid state on 
node({}({})), error = {}",
             app_name,
             pid,
+            primary_hp.to_string(),
             primary_addr.to_string(),
             response.err.to_string());

Review Comment:
   ```suggestion
               primary_hp,
               primary_addr,
               response.err);
   ```



##########
src/meta/meta_bulk_load_service.cpp:
##########
@@ -1600,9 +1623,18 @@ void 
bulk_load_service::on_query_bulk_load_status(query_bulk_load_rpc rpc)
     }
 
     response.bulk_load_states.resize(partition_count);
+    response.__set_hp_bulk_load_states(
+        std::vector<std::map<host_port, 
partition_bulk_load_state>>(partition_count));
     for (const auto &kv : _partitions_bulk_load_state) {
         if (kv.first.get_app_id() == app_id) {
-            response.bulk_load_states[kv.first.get_partition_index()] = 
kv.second;
+            auto gpid = kv.first.get_partition_index();

Review Comment:
   gpid is a type name, it's better to avoid using it as a variable name.
   ```suggestion
               auto pidx = kv.first.get_partition_index();
   ```



##########
src/meta/meta_bulk_load_service.cpp:
##########
@@ -499,13 +520,15 @@ void 
bulk_load_service::on_partition_bulk_load_reply(error_code err,
     }
 
     if (response.err != ERR_OK) {
-        LOG_ERROR("app({}), partition({}) from node({}) handle bulk load 
response failed, error = "
-                  "{}, primary status = {}",
-                  app_name,
-                  pid,
-                  primary_addr.to_string(),
-                  response.err.to_string(),
-                  dsn::enum_to_string(response.primary_bulk_load_status));
+        LOG_ERROR(
+            "app({}), partition({}) from node({}({})) handle bulk load 
response failed, error = "
+            "{}, primary status = {}",
+            app_name,
+            pid,
+            primary_hp.to_string(),
+            primary_addr.to_string(),
+            response.err.to_string(),

Review Comment:
   ```suggestion
               primary_hp,
               primary_addr,
               response.err,
   ```



##########
src/meta/meta_bulk_load_service.cpp:
##########
@@ -489,8 +509,9 @@ void 
bulk_load_service::on_partition_bulk_load_reply(error_code err,
 
     if (response.err == ERR_BUSY) {
         LOG_WARNING(
-            "node({}) has enough replicas downloading, wait for next round to 
send bulk load "
+            "node({}({})) has enough replicas downloading, wait for next round 
to send bulk load "
             "request for app({}), partition({})",
+            primary_hp.to_string(),
             primary_addr.to_string(),

Review Comment:
   ```suggestion
               primary_hp,
               primary_addr,
   ```



##########
src/client/partition_resolver.cpp:
##########
@@ -124,7 +127,7 @@ void partition_resolver::call_task(const 
rpc_response_task_ptr &t)
                     }
                     hdr.gpid = result.pid;
                 }
-                dsn_rpc_call(result.address, t.get());
+                dsn_rpc_call(this->_dns_resolver->resolve_address(result.hp), 
t.get());

Review Comment:
   @GehaFearless Have you improved the code according to my reply?



##########
src/meta/cluster_balance_policy.cpp:
##########
@@ -544,10 +545,21 @@ bool cluster_balance_policy::apply_move(const move_info 
&move,
     // add into migration list and selected_pid
     partition_configuration pc;
     pc.pid = move.pid;
-    pc.primary = primary_addr;
-    list[move.pid] = generate_balancer_request(*_global_view->apps, pc, 
move.type, source, target);
+    pc.hp_primary = primary_hp;
+    std::shared_ptr<dsn::dns_resolver> resolver;
+    if (_svc == nullptr) {
+        resolver = std::make_shared<dsn::dns_resolver>();

Review Comment:
   How about creating a valid meta_service there and pass it to this function, 
to ensure consistence.



##########
src/meta/load_balance_policy.h:
##########
@@ -244,8 +248,9 @@ class copy_replica_operation
     copy_replica_operation(const std::shared_ptr<app_state> app,
                            const app_mapper &apps,
                            node_mapper &nodes,
-                           const std::vector<dsn::rpc_address> &address_vec,
-                           const std::unordered_map<dsn::rpc_address, int> 
&address_id);
+                           const std::vector<dsn::host_port> &address_vec,
+                           const std::unordered_map<dsn::host_port, int> 
&address_id,

Review Comment:
   ```suggestion
                              const std::vector<dsn::host_port> &host_port_vec,
                              const std::unordered_map<dsn::host_port, int> 
&host_port_id,
   ```



##########
src/meta/meta_data.cpp:
##########
@@ -151,12 +136,16 @@ bool construct_replica(meta_view view, const gpid &pid, 
int max_replica_count)
     // we put max_replica_count-1 recent replicas to last_drops, in case of 
the DDD-state when the
     // only primary dead
     // when add node to pc.last_drops, we don't remove it from our cc.drop_list
-    CHECK(pc.last_drops.empty(), "last_drops of partition({}) must be empty", 
pid);
+    CHECK(pc.hp_last_drops.empty(),
+          "last_drops of partition({}.{}) must be empty",
+          pid.get_app_id(),
+          pid.get_partition_index());

Review Comment:
   ```suggestion
             "last_drops of partition({}) must be empty",
             pid);
   ```



##########
src/meta/load_balance_policy.cpp:
##########
@@ -128,7 +131,9 @@ generate_balancer_request(const app_mapper &apps,
                           const partition_configuration &pc,
                           const balance_type &type,
                           const rpc_address &from,
-                          const rpc_address &to)
+                          const rpc_address &to,
+                          const host_port &hp_from,
+                          const host_port &hp_to)

Review Comment:
   Is it enough to pass just hp_from and hp_to? The ip addresses will be 
resolved internally.



##########
src/runtime/rpc/rpc_engine.h:
##########
@@ -202,6 +204,7 @@ class rpc_engine
     std::unordered_map<int, std::vector<std::unique_ptr<network>>>
         _server_nets; // <port, <CHANNEL, network*>>
     ::dsn::rpc_address _local_primary_address;

Review Comment:
   Is it possible to remove `_local_primary_address`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to