mapleFU commented on code in PR #2850:
URL: https://github.com/apache/kvrocks/pull/2850#discussion_r2009647660


##########
src/cluster/cluster.cc:
##########
@@ -197,8 +214,11 @@ Status Cluster::SetClusterNodes(const std::string 
&nodes_str, int64_t version, b
   }
 
   myself_ = nullptr;
-  if (!myid_.empty() && nodes_.find(myid_) != nodes_.end()) {
-    myself_ = nodes_[myid_];
+  if (!myid_.empty()) {
+    auto it = nodes_->find(myid_);
+    if (it != nodes_->end()) {

Review Comment:
   `if(auto it = ;`



##########
src/cluster/cluster.cc:
##########
@@ -96,11 +105,13 @@ Status Cluster::SetSlotRanges(const std::vector<SlotRange> 
&slot_ranges, const s
   }
 
   // Get the node which we want to assign slots into it
-  std::shared_ptr<ClusterNode> to_assign_node = nodes_[node_id];
-  if (to_assign_node == nullptr) {
+  auto it = nodes_->find(node_id);
+  if (it == nodes_->end()) {
     return {Status::NotOK, "No this node in the cluster"};
   }
 
+  auto to_assign_node = it->second;

Review Comment:
   ```suggestion
     std::shared_ptr<ClusterNode> to_assign_node;
     if (auto it = nodes_.find(); it != nodes_.end()) {
        to_assign_node = ..
     } else {
       return {Status::NotOK, "No this node in the cluster"};
     }
   ```



##########
src/cluster/cluster.cc:
##########
@@ -23,24 +23,33 @@
 #include <config/config_util.h>
 
 #include <array>
+#include <cstdint>
 #include <cstring>
 #include <fstream>
 #include <memory>
+#include <string>
+#include <string_view>
 #include <vector>
 
 #include "cluster/cluster_defs.h"
 #include "commands/commander.h"
 #include "common/io_util.h"
+#include "fmt/base.h"
 #include "fmt/format.h"
 #include "parse_util.h"
 #include "replication.h"
 #include "server/server.h"
 #include "string_util.h"
 #include "time_util.h"
 
-ClusterNode::ClusterNode(std::string id, std::string host, int port, int role, 
std::string master_id,
-                         const std::bitset<kClusterSlots> &slots)
-    : id(std::move(id)), host(std::move(host)), port(port), role(role), 
master_id(std::move(master_id)), slots(slots) {}
+ClusterNode::ClusterNode(std::string &&id, std::string &&host, int port, int 
role, std::string &&master_id,

Review Comment:
   why these are changed?



##########
src/cluster/cluster.cc:
##########
@@ -252,11 +272,14 @@ Status Cluster::SetMasterSlaveRepl() {
       srv_->slot_migrator->SetStopMigrationFlag(false);
       LOG(INFO) << "Change server role to master, restart migration task";
     }
+    if (!is_slave) {
+      srv_->CleanupOrphanSlaves(version_, *nodes_);
+    }

Review Comment:
   So this is what this patch actually does?



##########
src/cluster/cluster.cc:
##########
@@ -68,8 +77,8 @@ Status Cluster::SetNodeId(const std::string &node_id) {
 
   myid_ = node_id;
   // Already has cluster topology
-  if (version_ >= 0 && nodes_.find(node_id) != nodes_.end()) {
-    myself_ = nodes_[myid_];
+  if (version_ >= 0 && nodes_->count(node_id) > 0) {
+    myself_ = nodes_->at(node_id);

Review Comment:
   if the performance is important, why not use `find()` and iterator based 
operations? Which could make it just search once



##########
src/server/redis_connection.h:
##########
@@ -134,12 +139,18 @@ class Connection : public EvbufCallbackBase<Connection> {
   void SetLastCmd(std::string cmd) { last_cmd_ = std::move(cmd); }
   std::string GetIP() const { return ip_; }
   uint32_t GetPort() const { return port_; }
-  void SetListeningPort(int port) { listening_port_ = port; }
-  int GetListeningPort() const { return listening_port_; }
-  void SetAnnounceIP(std::string ip) { announce_ip_ = std::move(ip); }
-  std::string GetAnnounceIP() const { return !announce_ip_.empty() ? 
announce_ip_ : ip_; }
-  uint32_t GetAnnouncePort() const { return listening_port_ != 0 ? 
listening_port_ : port_; }
-  std::string GetAnnounceAddr() const { return GetAnnounceIP() + ":" + 
std::to_string(GetAnnouncePort()); }
+
+  void SetPeerInfo(std::unique_ptr<PeerInfo> &&peer_info) { peer_info_ = 
std::move(peer_info); }
+
+  const PeerInfo *GetPeerInfo() {
+    if (peer_info_) {
+      return peer_info_.get();
+    }
+
+    SetPeerInfo(std::make_unique<PeerInfo>(ip_, port_, "", -1));
+    return peer_info_.get();

Review Comment:
   Emmm when would this happens?



##########
src/server/redis_connection.h:
##########
@@ -217,4 +229,44 @@ class Connection : public EvbufCallbackBase<Connection> {
   RESP protocol_version_ = RESP::v2;
 };
 
+class PeerInfo {
+ public:
+  PeerInfo() = default;
+  ~PeerInfo() = default;
+
+  PeerInfo(std::string_view ip, uint32_t port, std::string_view peer_id, 
int64_t peer_version)
+      : port_(port), peer_version_(peer_version) {
+    if (peer_id.empty()) {
+      str_ = fmt::format("{}:{}", ip, port);
+    } else {
+      str_ = fmt::format("{}:{} ({}@{})", ip, port, peer_id, peer_version);
+    }
+
+    addr_ = std::string_view(str_).substr(0, str_.find(' '));
+    ip_ = std::string_view(str_).substr(0, ip.length());
+
+    if (!peer_id.empty()) {
+      peer_id_ = std::string_view(str_).substr(addr_.length() + 1, 
peer_id.length());
+    }
+  }
+
+  std::string_view GetIP() const { return ip_; }
+  uint32_t GetPort() const { return port_; }
+
+  std::string_view GetStringView() const { return str_; }
+  std::string_view GetPeerID() const { return peer_id_; }
+  std::string_view GetAddr() const { return addr_; }
+  int64_t GetPeerVersion() const { return peer_version_; }
+
+ private:
+  std::string_view ip_;
+  std::string_view addr_;

Review Comment:
   This looks really unsafe..Who would owns memory for them?



##########
src/server/server.cc:
##########
@@ -354,6 +355,27 @@ void Server::DisconnectSlaves() {
   }
 }
 
+void Server::CleanupOrphanSlaves(int64_t version, const ClusterNodes &nodes) {
+  std::lock_guard<std::mutex> lg(slave_threads_mu_);
+
+  for (auto &slave_thread : slave_threads_) {
+    const auto peer_info = slave_thread->GetConn()->GetPeerInfo();
+    auto peer_version = peer_info->GetPeerVersion();
+    if (peer_version < 0 || peer_version > version) {
+      // The peer version is greater than the current version,

Review Comment:
   what does `peer_version < 0` means?



##########
src/server/server.cc:
##########
@@ -354,6 +355,27 @@ void Server::DisconnectSlaves() {
   }
 }
 
+void Server::CleanupOrphanSlaves(int64_t version, const ClusterNodes &nodes) {
+  std::lock_guard<std::mutex> lg(slave_threads_mu_);
+
+  for (auto &slave_thread : slave_threads_) {
+    const auto peer_info = slave_thread->GetConn()->GetPeerInfo();

Review Comment:
   do we need check `GetPeerInfo != nullptr`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to