This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b5d92d1b77 [INLONG-9228][SDK] CPP SDK supports dynamic load balancing
(#9235)
b5d92d1b77 is described below
commit b5d92d1b77c25dd11a0f2f257289ea584cc3bb4f
Author: doleyzi <[email protected]>
AuthorDate: Wed Nov 8 11:28:59 2023 +0800
[INLONG-9228][SDK] CPP SDK supports dynamic load balancing (#9235)
---
.../dataproxy-sdk-cpp/release/inc/sdk_conf.h | 6 +-
.../dataproxy-sdk-cpp/src/client/stat.h | 11 +-
.../dataproxy-sdk-cpp/src/client/tcp_client.cc | 194 ++++++++++--
.../dataproxy-sdk-cpp/src/client/tcp_client.h | 34 +-
.../dataproxy-sdk-cpp/src/config/proxy_info.h | 8 +-
.../dataproxy-sdk-cpp/src/config/sdk_conf.cc | 30 ++
.../dataproxy-sdk-cpp/src/group/send_group.cc | 348 +++++++++++++++++----
.../dataproxy-sdk-cpp/src/group/send_group.h | 42 ++-
.../dataproxy-sdk-cpp/src/manager/proxy_manager.cc | 13 +-
.../dataproxy-sdk-cpp/src/utils/capi_constant.h | 10 +
10 files changed, 591 insertions(+), 105 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h
index 4407869ba7..609d6c35b8 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h
@@ -54,6 +54,8 @@ public:
uint32_t per_groupid_thread_nums_; // Sending thread per groupid
uint32_t dispatch_interval_zip_; // Compression thread distribution
interval
uint32_t dispatch_interval_send_; // sending thread sending interval
+ uint32_t load_balance_interval_;
+ uint32_t heart_beat_interval_;
// Packaging parameters
bool enable_pack_;
@@ -77,7 +79,8 @@ public:
std::string manager_cluster_url_;
uint32_t manager_update_interval_; // Automatic update interval, minutes
uint32_t manager_url_timeout_; // URL parsing timeout, seconds
- uint32_t max_proxy_num_;
+ uint64_t max_proxy_num_;
+ uint64_t reserve_proxy_num_;
uint32_t msg_type_;
bool enable_isolation_;
@@ -85,6 +88,7 @@ public:
bool enable_tcp_nagle_;
uint64_t tcp_idle_time_; // The time when tcpclient did not send
data
uint32_t tcp_detection_interval_; // tcp-client detection interval
+ bool enable_balance_;
// auth settings
bool need_auth_;
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/stat.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/stat.h
index 695a66e31f..f57a26565f 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/stat.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/stat.h
@@ -28,29 +28,34 @@ private:
uint64_t send_failed_pack_num_;
uint64_t send_failed_msg_num_;
+ uint64_t time_cost_;
public:
Stat()
: send_success_pack_num_(0), send_success_msg_num_(0),
- send_failed_pack_num_(0), send_failed_msg_num_(0) {}
+ send_failed_pack_num_(0), send_failed_msg_num_(0) ,time_cost_(0) {}
void AddSendSuccessPackNum(uint64_t num) { send_success_pack_num_ += num; }
void AddSendSuccessMsgNum(uint64_t num) { send_success_msg_num_ += num; }
void AddSendFailPackNum(uint64_t num) { send_failed_pack_num_ += num; }
void AddSendFailMsgNum(uint64_t num) { send_failed_msg_num_ += num; }
+ void AddTimeCost(uint64_t time_cost) { time_cost_ += time_cost; }
void ResetStat() {
send_success_pack_num_ = 0;
send_success_msg_num_ = 0;
send_failed_pack_num_ = 0;
send_failed_msg_num_ = 0;
+ time_cost_ = 0;
}
std::string ToString() {
std::stringstream stat;
stat << "success-pack[" << send_success_pack_num_ << "]";
- stat << " success-msg[" << send_success_msg_num_ << "]";
+ stat << "msg[" << send_success_msg_num_ << "]";
stat << " failed-pack[" << send_failed_pack_num_ << "]";
- stat << " failed-msg[" << send_failed_msg_num_ << "]";
+ stat << "msg[" << send_failed_msg_num_ << "]";
+ uint64_t pack_num = send_success_pack_num_ + send_failed_msg_num_ + 1;
+ stat << " trans[" << time_cost_ / pack_num << "]";
return stat.str();
}
};
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc
index 8a2a583b3b..e27910c4e9 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc
@@ -29,8 +29,9 @@ TcpClient::TcpClient(IOContext &io_context, std::string ip,
uint32_t port)
wait_timer_(std::make_shared<asio::steady_timer>(io_context)),
keep_alive_timer_(std::make_shared<asio::steady_timer>(io_context)),
ip_(ip), port_(port), endpoint_(asio::ip::address::from_string(ip),
port),
- status_(kUndefined), recv_buf_(new BlockMemory()), exit_(false) {
- ;
+ status_(kUndefined), recv_buf_(new BlockMemory()), exit_(false),
+ proxy_loads_(30), wait_heart_beat_(false), reset_client_(false),
+ heart_beat_index_(0), only_heart_heat_(false) {
client_info_ = " [" + ip_ + ":" + std::to_string(port_) + "]";
tcp_detection_interval_ = SdkConfig::getInstance()->tcp_detection_interval_;
@@ -82,7 +83,6 @@ void TcpClient::AsyncConnect() {
asio::error_code error;
socket_->close(error);
if (asio::error::operation_aborted == error) {
- // operation aborted
return;
}
}
@@ -102,7 +102,6 @@ void TcpClient::DoAsyncConnect(asio::error_code error) {
}
if (error) {
if (asio::error::operation_aborted == error) {
- // operation aborted必
return;
}
}
@@ -122,7 +121,6 @@ void TcpClient::OnConnected(asio::error_code error) {
return;
}
if (asio::error::operation_aborted == error) {
- // operation aborted
return;
}
status_ = kConnectFailed;
@@ -164,23 +162,22 @@ void TcpClient::OnWroten(const asio::error_code error,
}
if (error) {
if (asio::error::operation_aborted == error) {
- // operation aborted
return;
}
- status_ = kWriting;
LOG_ERROR("write error:" << error.message() << CLIENT_INFO);
+ status_ = kWriting;
HandleFail();
return;
}
if (0 == bytes_transferred) {
- status_ = kWaiting;
LOG_ERROR("transferred 0 bytes." << CLIENT_INFO);
+ status_ = kWaiting;
HandleFail();
return;
}
- status_ = CLIENT_RESPONSE;
+ status_ = kClientResponse;
asio::async_read(*socket_, asio::buffer(recv_buf_->m_data, sizeof(uint32_t)),
std::bind(&TcpClient::OnReturn, this, std::placeholders::_1,
std::placeholders::_2));
@@ -191,17 +188,16 @@ void TcpClient::OnReturn(asio::error_code error,
std::size_t len) {
}
if (error) {
if (asio::error::operation_aborted == error) {
- // operation aborted
return;
}
LOG_ERROR("OnReturn error:" << error.message() << CLIENT_INFO);
status_ = kWaiting;
+ std::fill(proxy_loads_.begin(), proxy_loads_.end(), 0);
HandleFail();
return;
}
if (len != sizeof(uint32_t)) {
status_ = kWaiting;
- LOG_ERROR("len :" << len <<" != sizeof(uint32_t):" <<sizeof(uint32_t)<<
CLIENT_INFO);
HandleFail();
return;
}
@@ -210,11 +206,11 @@ void TcpClient::OnReturn(asio::error_code error,
std::size_t len) {
if (resp_len > recv_buf_->m_max_size) {
status_ = kWaiting;
- LOG_ERROR("invalid resp_len :" << resp_len << CLIENT_INFO);
HandleFail();
return;
}
- asio::async_read(*socket_, asio::buffer(recv_buf_->m_data, resp_len),
+ asio::async_read(*socket_,
+ asio::buffer(recv_buf_->m_data + sizeof(uint32_t),
resp_len),
std::bind(&TcpClient::OnBody, this, std::placeholders::_1,
std::placeholders::_2));
}
@@ -226,7 +222,6 @@ void TcpClient::OnBody(asio::error_code error, size_t
bytesTransferred) {
if (error) {
if (asio::error::operation_aborted == error) {
- // operation aborted
return;
}
LOG_ERROR("OnBody error:" << error.message() << CLIENT_INFO);
@@ -234,12 +229,27 @@ void TcpClient::OnBody(asio::error_code error, size_t
bytesTransferred) {
HandleFail();
return;
}
+ uint32_t parse_index = sizeof(uint32_t);
+ uint8_t msg_type =
+ *reinterpret_cast<const uint8_t *>(recv_buf_->m_data + parse_index);
- if (sendBuffer_ != nullptr) {
- stat_.AddSendSuccessMsgNum(sendBuffer_->msgCnt());
- stat_.AddSendSuccessPackNum(1);
+ switch (msg_type) {
+ case 8:
+ ParseHeartBeat(bytesTransferred);
+ break;
+ default:
+ ParseGenericResponse();
+ break;
+ }
+ if (wait_heart_beat_) {
+ HeartBeat();
+ wait_heart_beat_ = false;
+ return;
+ }
- sendBuffer_->releaseBuf();
+ if (reset_client_) {
+ RestClient();
+ reset_client_ = false;
}
status_ = kFree;
@@ -255,6 +265,8 @@ void TcpClient::HandleFail() {
stat_.AddSendFailMsgNum(sendBuffer_->msgCnt());
stat_.AddSendFailPackNum(1);
+ stat_.AddTimeCost(Utils::getCurrentMsTime() - last_update_time_);
+
sendBuffer_->doUserCallBack();
sendBuffer_->releaseBuf();
}
@@ -269,12 +281,14 @@ void TcpClient::DetectStatus(const asio::error_code
error) {
if (error) {
return;
}
-
- LOG_INFO(stat_.ToString() << CLIENT_INFO);
- stat_.ResetStat();
+ if (!only_heart_heat_) {
+ LOG_INFO(stat_.ToString() << CLIENT_INFO);
+ stat_.ResetStat();
+ }
if ((Utils::getCurrentMsTime() - last_update_time_) > tcp_idle_time_ &&
status_ != kConnecting) {
+ std::fill(proxy_loads_.begin(), proxy_loads_.end(), 0);
LOG_INFO("reconnect because it has idle "
<< tcp_idle_time_ << " ms."
<< "last send time:" << last_update_time_ << CLIENT_INFO);
@@ -287,4 +301,142 @@ void TcpClient::DetectStatus(const asio::error_code
error) {
std::bind(&TcpClient::DetectStatus, this, std::placeholders::_1));
}
+void TcpClient::HeartBeat(bool only_heart_heat) {
+ if (kStopped == status_ || exit_) {
+ return;
+ }
+ only_heart_heat_ = only_heart_heat;
+ status_ = kHeartBeat;
+ last_update_time_ = Utils::getCurrentMsTime();
+ // status_ = kWriting;
+
+ bin_hb_.total_len = htonl(sizeof(BinaryHB) - 4);
+ bin_hb_.msg_type = 8;
+ bin_hb_.data_time =
+ htonl(static_cast<uint32_t>(Utils::getCurrentMsTime() / 1000));
+ bin_hb_.body_ver = 1;
+ bin_hb_.body_len = 0;
+ bin_hb_.attr_len = 0;
+ bin_hb_.magic = htons(constants::kBinaryMagic);
+ char *hb = (char *)&bin_hb_;
+ uint32_t hb_len = sizeof(bin_hb_);
+
+ asio::async_write(*socket_, asio::buffer(hb, hb_len),
+ std::bind(&TcpClient::OnWroten, this,
std::placeholders::_1,
+ std::placeholders::_2));
+}
+
+void TcpClient::ParseHeartBeat(size_t total_length) {
+ // | total length(4) | msg type(1) | data time(4) | body version(1) | body
+ // length (4) | body | attr length(2) | attr | magic (2) | skip total length
+ uint32_t parse_index = sizeof(uint32_t);
+ // skip msg type
+ parse_index += sizeof(uint8_t);
+ // skip data time
+ // uint32_t data_time = ntohl(*reinterpret_cast<const uint32_t
+ // *>(recv_buf_->m_data + parse_index));
+ parse_index += sizeof(uint32_t);
+
+ // 3、parse body version
+ uint32_t body_version =
+ *reinterpret_cast<const uint8_t *>(recv_buf_->m_data + parse_index);
+ parse_index += sizeof(uint8_t);
+
+ // 4、parse body length
+ uint32_t body_length = ntohl(
+ *reinterpret_cast<const uint32_t *>(recv_buf_->m_data + parse_index));
+ parse_index += sizeof(uint32_t);
+
+ // 5 parse load
+ int16_t load = ntohs(
+ *reinterpret_cast<const int16_t *>(recv_buf_->m_data + parse_index));
+ parse_index += sizeof(int16_t);
+
+ // 7 parse attr length
+ uint16_t attr_length = ntohs(
+ *reinterpret_cast<const uint16_t *>(recv_buf_->m_data + parse_index));
+ parse_index += sizeof(uint16_t);
+
+ // 8 skip attr
+ parse_index += attr_length;
+
+ // 9 parse magic
+ uint16_t magic = ntohs(
+ *reinterpret_cast<const uint16_t *>(recv_buf_->m_data + parse_index));
+ parse_index += sizeof(uint16_t);
+
+ if (magic != constants::kBinaryMagic) {
+ LOG_ERROR("failed to parse heartbeat ack! error magic "
+ << magic << " !=" << constants::kBinaryMagic << CLIENT_INFO);
+ return;
+ }
+
+ if (total_length + 4 != parse_index) {
+ LOG_ERROR("failed to parse heartbeat ack! total_length "
+ << total_length << " +4 !=" << parse_index << CLIENT_INFO);
+ return;
+ }
+ if (heart_beat_index_ > constants::MAX_STAT) {
+ heart_beat_index_ = 0;
+ }
+ if (body_version == 1 && body_length == 2) {
+ proxy_loads_[heart_beat_index_++ % 30] = load;
+ } else {
+ proxy_loads_[heart_beat_index_++ % 30] = 0;
+ }
+ LOG_INFO("current load is " << load << CLIENT_INFO);
+}
+
+void TcpClient::ParseGenericResponse() {
+ if (sendBuffer_ != nullptr) {
+ stat_.AddSendSuccessMsgNum(sendBuffer_->msgCnt());
+ stat_.AddSendSuccessPackNum(1);
+
+ stat_.AddTimeCost(Utils::getCurrentMsTime() - last_update_time_);
+
+ sendBuffer_->releaseBuf();
+ }
+}
+
+int32_t TcpClient::GetAvgLoad() {
+ int32_t numerator = 0;
+ int32_t denominator = 0;
+ for (int i = 0; i < proxy_loads_.size(); i++) {
+ if (proxy_loads_[i] > 0) {
+ numerator += proxy_loads_[i] * constants::kWeight[i];
+ denominator += constants::kWeight[i];
+ }
+ }
+ int32_t avg_load = 0;
+ if (0 == denominator) {
+ return avg_load;
+ }
+ avg_load = numerator / denominator;
+ LOG_INFO("average load is " << avg_load << CLIENT_INFO);
+ return avg_load;
+}
+
+void TcpClient::SetHeartBeatStatus() { wait_heart_beat_ = true; }
+
+void TcpClient::UpdateClient(const std::string ip, const uint32_t port) {
+ LOG_INFO("UpdateClient[" << only_heart_heat_ << "][" << ip << ":" << port
+ << "] replace" << CLIENT_INFO);
+ ip_ = ip;
+ port_ = port;
+ reset_client_ = true;
+}
+void TcpClient::RestClient() {
+ std::fill(proxy_loads_.begin(), proxy_loads_.end(), 0);
+ asio::ip::tcp::endpoint endpoint(asio::ip::address::from_string(ip_), port_);
+ endpoint_ = endpoint;
+ client_info_ = " [" + ip_ + ":" + std::to_string(port_) + "]";
+
+ LOG_INFO("RestClient[" << only_heart_heat_ << "]" << CLIENT_INFO);
+
+ AsyncConnect();
+}
+const std::string &TcpClient::getIp() const { return ip_; }
+const std::string &TcpClient::getClientInfo() const { return client_info_; }
+uint32_t TcpClient::getPort() const { return port_; }
+
} // namespace inlong
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.h
index be0b6d915a..6835a7fd26 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.h
@@ -24,6 +24,7 @@
#include "../utils/capi_constant.h"
#include "../utils/read_write_mutex.h"
#include "../utils/send_buffer.h"
+#include "msg_protocol.h"
#include "stat.h"
#include <queue>
@@ -36,8 +37,10 @@ enum ClientStatus {
kConnectFailed = 4,
kWaiting = 5,
kStopped = 6,
- CLIENT_RESPONSE = 7
+ kClientResponse = 7,
+ kHeartBeat = 8
};
+
enum {
kConnectTimeout = 1000 * 20,
};
@@ -50,9 +53,23 @@ private:
SteadyTimerPtr keep_alive_timer_;
ClientStatus status_;
std::string ip_;
+
+public:
+ const std::string &getIp() const;
+
+private:
uint32_t port_;
+
+public:
+ uint32_t getPort() const;
+
+private:
std::string client_info_;
+public:
+ const std::string &getClientInfo() const;
+
+private:
std::shared_ptr<SendBuffer> sendBuffer_;
asio::ip::tcp::endpoint endpoint_;
BlockMemoryPtrT recv_buf_;
@@ -61,6 +78,12 @@ private:
uint64_t last_update_time_;
Stat stat_;
bool exit_;
+ BinaryHB bin_hb_ = {0};
+ std::vector<int32_t> proxy_loads_;
+ uint32_t heart_beat_index_;
+ bool wait_heart_beat_;
+ bool reset_client_;
+ volatile bool only_heart_heat_;
public:
TcpClient(IOContext &io_context, std::string ip, uint32_t port);
@@ -74,9 +97,16 @@ public:
void OnBody(asio::error_code error, size_t bytesTransferred);
void DoClose();
void HandleFail();
- bool isFree() { return (status_ == kFree); };
+ bool isFree() { return (status_ == kFree); }
void write(SendBufferPtrT sendBuffer);
void DetectStatus(const asio::error_code error);
+ void HeartBeat(bool only_heart_heat = false);
+ void SetHeartBeatStatus();
+ void ParseHeartBeat(size_t total_length);
+ void ParseGenericResponse();
+ void UpdateClient(const std::string ip, const uint32_t port);
+ void RestClient();
+ int32_t GetAvgLoad();
};
typedef std::shared_ptr<TcpClient> TcpClientTPtrT;
typedef std::vector<TcpClientTPtrT> TcpClientTPtrVecT;
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/proxy_info.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/proxy_info.h
index d433e3a295..59700a473f 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/proxy_info.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/proxy_info.h
@@ -29,13 +29,17 @@ private:
std::string proxy_str_id_;
std::string ip_;
int32_t port_;
+ int32_t load_;
public:
- ProxyInfo(std::string proxy_str_id, std::string ip, int32_t port)
- : proxy_str_id_(proxy_str_id), ip_(ip), port_(port) {}
+ ProxyInfo(std::string proxy_str_id, std::string ip, int32_t port,int32_t
load)
+ : proxy_str_id_(proxy_str_id), ip_(ip), port_(port), load_(load) {}
ProxyInfo(){};
+ void setIp(const std::string& ip) { ip_ = ip; }
+ void setPort(int32_t port) { port_ = port; }
std::string ip() const { return ip_; }
int32_t port() const { return port_; }
+ int32_t GetLoad() const { return load_; }
};
using ProxyInfoVec = std::vector<ProxyInfo>;
} // namespace inlong
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
index 3620d8985e..91e2fa8ad4 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
@@ -78,6 +78,9 @@ void SdkConfig::defaultInit() {
dispatch_interval_zip_ = constants::kDispatchIntervalZip;
tcp_detection_interval_ = constants::kTcpDetectionInterval;
tcp_idle_time_ = constants::kTcpIdleTime;
+ load_balance_interval_ = constants::kLoadBalanceInterval;
+ heart_beat_interval_ = constants::kHeartBeatInterval;
+ enable_balance_ = constants::kEnableBalance;
// cache parameter
send_buf_size_ = constants::kSendBufSize;
@@ -107,6 +110,7 @@ void SdkConfig::defaultInit() {
manager_url_timeout_ = constants::kManagerTimeout;
max_proxy_num_ = constants::kMaxProxyNum;
enable_isolation_ = constants::kEnableIsolation;
+ reserve_proxy_num_ = constants::kReserveProxyNum;
local_ip_ = constants::kSerIP;
local_port_ = constants::kSerPort;
@@ -144,6 +148,24 @@ void SdkConfig::InitThreadParam(const rapidjson::Value
&doc) {
} else {
dispatch_interval_send_ = constants::kDispatchIntervalSend;
}
+
+ if (doc.HasMember("load_balance_interval") &&
+ doc["load_balance_interval"].IsInt() &&
+ doc["load_balance_interval"].GetInt() > 0) {
+ const rapidjson::Value &obj = doc["load_balance_interval"];
+ load_balance_interval_ = obj.GetInt();
+ } else {
+ load_balance_interval_ = constants::kLoadBalanceInterval;
+ }
+
+ if (doc.HasMember("heart_beat_interval") &&
+ doc["heart_beat_interval"].IsInt() &&
+ doc["heart_beat_interval"].GetInt() > 0) {
+ const rapidjson::Value &obj = doc["heart_beat_interval"];
+ heart_beat_interval_ = obj.GetInt();
+ } else {
+ heart_beat_interval_ = constants::kHeartBeatInterval;
+ }
}
void SdkConfig::InitCacheParam(const rapidjson::Value &doc) {
@@ -360,6 +382,14 @@ void SdkConfig::InitTcpParam(const rapidjson::Value &doc) {
} else {
tcp_idle_time_ = constants::kTcpIdleTime;
}
+
+ // enable balance
+ if (doc.HasMember("enable_balance") && doc["enable_balance"].IsBool()) {
+ const rapidjson::Value &obj = doc["enable_balance"];
+ enable_balance_ = obj.GetBool();
+ } else {
+ enable_balance_ = constants::kEnableBalance;
+ }
}
void SdkConfig::InitAuthParm(const rapidjson::Value &doc) {
// auth settings
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc
index fb91972318..4591953fa0 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc
@@ -25,19 +25,26 @@
namespace inlong {
const int kDefaultQueueSize = 20;
-SendGroup::SendGroup(std::string group_id)
- : work_(asio::make_work_guard(io_context_)), group_id_(group_id),
- send_idx_(0) {
+SendGroup::SendGroup(std::string send_group_key)
+ : work_(asio::make_work_guard(io_context_)),
+ send_group_key_(send_group_key), send_idx_(0), dispatch_stat_(0),
+ load_threshold_(0), max_proxy_num_(0) {
max_send_queue_num_ = SdkConfig::getInstance()->send_buf_size_ /
SdkConfig::getInstance()->pack_size_;
if (max_send_queue_num_ <= 0) {
max_send_queue_num_ = kDefaultQueueSize;
}
- LOG_INFO("SendGroup max_send_queue_num " << max_send_queue_num_);
+ LOG_INFO("SendGroup: " << send_group_key_
+ << ", max send queue num: " << max_send_queue_num_);
dispatch_interval_ = SdkConfig::getInstance()->dispatch_interval_send_;
- tcp_clients_old_ = nullptr;
- tcp_clients_ = std::make_shared<std::vector<TcpClientTPtrT>>();
- tcp_clients_->reserve(SdkConfig::getInstance()->max_proxy_num_);
+ load_balance_interval_ = SdkConfig::getInstance()->load_balance_interval_;
+ heart_beat_interval_ =
+ SdkConfig::getInstance()->heart_beat_interval_ / dispatch_interval_;
+ need_balance_ = SdkConfig::getInstance()->enable_balance_;
+
+ work_clients_old_ = nullptr;
+ work_clients_ = std::make_shared<std::vector<TcpClientTPtrT>>();
+ work_clients_->reserve(SdkConfig::getInstance()->max_proxy_num_);
send_timer_ = std::make_shared<asio::steady_timer>(io_context_);
send_timer_->expires_after(std::chrono::milliseconds(dispatch_interval_));
@@ -49,13 +56,29 @@ SendGroup::SendGroup(std::string group_id)
update_conf_timer_->async_wait(
std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1));
- current_proxy_vec_.reserve(SdkConfig::getInstance()->max_proxy_num_);
+ if (SdkConfig::getInstance()->enable_balance_) {
+ load_balance_timer_ = std::make_shared<asio::steady_timer>(io_context_);
+ load_balance_timer_->expires_after(
+ std::chrono::milliseconds(load_balance_interval_));
+ load_balance_timer_->async_wait(
+ std::bind(&SendGroup::LoadBalance, this, std::placeholders::_1));
+ }
+
+ current_bus_vec_.reserve(SdkConfig::getInstance()->max_proxy_num_);
thread_ = std::thread(&SendGroup::Run, this);
}
SendGroup::~SendGroup() {
LOG_INFO("~SendGroup ");
- send_timer_->cancel();
- update_conf_timer_->cancel();
+ if (send_timer_) {
+ send_timer_->cancel();
+ }
+ if (update_conf_timer_) {
+ update_conf_timer_->cancel();
+ }
+ if (load_balance_timer_) {
+ update_conf_timer_->cancel();
+ }
+
io_context_.stop();
if (thread_.joinable()) {
thread_.join();
@@ -76,19 +99,18 @@ void SendGroup::DispatchData(std::error_code error) {
return;
}
try {
- unique_read_lock<read_write_mutex> rdlck(remote_proxy_list_mutex_);
- if (tcp_clients_ != nullptr) {
- if (send_idx_ >= tcp_clients_->size()) {
+ unique_read_lock<read_write_mutex> rdlck(work_clients_mutex_);
+ if (work_clients_ != nullptr) {
+ if (send_idx_ >= work_clients_->size()) {
send_idx_ = 0;
}
-
- while (send_idx_ < tcp_clients_->size()) {
- if ((*tcp_clients_)[send_idx_]->isFree()) {
+ while (send_idx_ < work_clients_->size()) {
+ if ((*work_clients_)[send_idx_]->isFree()) {
SendBufferPtrT send_buf = PopData();
if (send_buf == nullptr) {
break;
}
- (*tcp_clients_)[send_idx_]->write(send_buf);
+ (*work_clients_)[send_idx_]->write(send_buf);
}
send_idx_++;
}
@@ -96,10 +118,33 @@ void SendGroup::DispatchData(std::error_code error) {
} catch (std::exception &e) {
LOG_ERROR("Exception " << e.what());
}
+
+ if (need_balance_ && dispatch_stat_++ > heart_beat_interval_) {
+ HeartBeat();
+ dispatch_stat_ = 0;
+ }
+
send_timer_->expires_after(std::chrono::milliseconds(dispatch_interval_));
send_timer_->async_wait(
std::bind(&SendGroup::DispatchData, this, std::placeholders::_1));
}
+void SendGroup::HeartBeat() {
+ unique_read_lock<read_write_mutex> rdlck(work_clients_mutex_);
+ if (work_clients_ == nullptr) {
+ return;
+ }
+ for (int idx = 0; idx < work_clients_->size(); idx++) {
+ if ((*work_clients_)[idx]->isFree()) {
+ (*work_clients_)[idx]->HeartBeat();
+ } else {
+ (*work_clients_)[idx]->SetHeartBeatStatus();
+ }
+ }
+
+ for (int idx = 0; idx < reserve_clients_.size(); idx++) {
+ reserve_clients_[idx]->HeartBeat(true);
+ }
+}
bool SendGroup::IsFull() { return GetQueueSize() > max_send_queue_num_; }
@@ -120,30 +165,36 @@ void SendGroup::UpdateConf(std::error_code error) {
ClearOldTcpClients();
- ProxyInfoVec new_proxy_info;
- if (ProxyManager::GetInstance()->GetProxy(group_id_, new_proxy_info) !=
+ ProxyInfoVec new_bus_info;
+ if (ProxyManager::GetInstance()->GetProxy(send_group_key_, new_bus_info) !=
kSuccess ||
- new_proxy_info.empty()) {
+ new_bus_info.empty()) {
update_conf_timer_->expires_after(std::chrono::milliseconds(kTimerMinute));
update_conf_timer_->async_wait(
std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1));
return;
}
+ if (new_bus_info.empty()) {
+ LOG_INFO("UpdateConf new_bus_info is empty!");
+ return;
+ }
+
+ load_threshold_ = new_bus_info[0].GetLoad() >
constants::kDefaultLoadThreshold
+ ? constants::kDefaultLoadThreshold
+ : std::max((new_bus_info[0].GetLoad()), 0);
- if (!IsConfChanged(current_proxy_vec_, new_proxy_info)) {
- LOG_INFO("Don`t need UpdateConf. current proxy size("
- << current_proxy_vec_.size() << ")=proxy size("
- << new_proxy_info.size() << ")");
+ if (!IsConfChanged(current_bus_vec_, new_bus_info)) {
+ LOG_INFO("Don`t need UpdateConf. current bus size("
+ << current_bus_vec_.size() << ")=bus size(" << new_bus_info.size()
+ << ")");
update_conf_timer_->expires_after(std::chrono::milliseconds(kTimerMinute));
update_conf_timer_->async_wait(
std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1));
return;
}
- uint32_t proxy_num = SdkConfig::getInstance()->max_proxy_num_;
- if (proxy_num > new_proxy_info.size()) {
- proxy_num = new_proxy_info.size();
- }
+ max_proxy_num_ =
+ std::min(SdkConfig::getInstance()->max_proxy_num_, new_bus_info.size());
std::shared_ptr<std::vector<TcpClientTPtrT>> tcp_clients_tmp =
std::make_shared<std::vector<TcpClientTPtrT>>();
@@ -155,32 +206,33 @@ void SendGroup::UpdateConf(std::error_code error) {
return;
}
- std::random_shuffle(new_proxy_info.begin(), new_proxy_info.end());
+ std::random_shuffle(new_bus_info.begin(), new_bus_info.end());
- tcp_clients_tmp->reserve(proxy_num);
- for (int i = 0; i < proxy_num; i++) {
- ProxyInfo proxy_tmp = new_proxy_info[i];
- TcpClientTPtrT tcpClientTPtrT = std::make_shared<TcpClient>(
- io_context_, proxy_tmp.ip(), proxy_tmp.port());
+ tcp_clients_tmp->reserve(max_proxy_num_);
+ for (int i = 0; i < max_proxy_num_; i++) {
+ ProxyInfo bus_tmp = new_bus_info[i];
+ TcpClientTPtrT tcpClientTPtrT =
+ std::make_shared<TcpClient>(io_context_, bus_tmp.ip(), bus_tmp.port());
tcp_clients_tmp->push_back(tcpClientTPtrT);
- LOG_INFO("new proxy info.[" << proxy_tmp.ip() << ":" << proxy_tmp.port()
- << "]");
+ LOG_INFO("new bus info.[" << bus_tmp.ip() << ":" << bus_tmp.port() << "]");
}
{
LOG_INFO("do change tcp clients.");
- unique_write_lock<read_write_mutex> wtlck(remote_proxy_list_mutex_);
- tcp_clients_old_ = tcp_clients_;
- tcp_clients_ = tcp_clients_tmp;
+ unique_write_lock<read_write_mutex> wtlck(work_clients_mutex_);
+ work_clients_old_ = work_clients_;
+ work_clients_ = tcp_clients_tmp;
}
- if (tcp_clients_old_ != nullptr) {
- for (int j = 0; j < tcp_clients_old_->size(); j++) {
- (*tcp_clients_old_)[j]->DoClose();
+ if (work_clients_old_ != nullptr) {
+ for (int j = 0; j < work_clients_old_->size(); j++) {
+ (*work_clients_old_)[j]->DoClose();
}
}
- current_proxy_vec_ = new_proxy_info;
+ current_bus_vec_ = new_bus_info;
+
+ InitReserveClient();
update_conf_timer_->expires_after(std::chrono::milliseconds(kTimerMinute));
update_conf_timer_->async_wait(
@@ -204,25 +256,25 @@ uint32_t SendGroup::GetQueueSize() {
return send_buf_list_.size();
}
-bool SendGroup::IsConfChanged(ProxyInfoVec ¤t_proxy_vec,
- ProxyInfoVec &new_proxy_vec) {
- if (new_proxy_vec.empty())
+bool SendGroup::IsConfChanged(ProxyInfoVec ¤t_bus_vec,
+ ProxyInfoVec &new_bus_vec) {
+ if (new_bus_vec.empty())
return false;
- if (current_proxy_vec.size() != new_proxy_vec.size()) {
+ if (current_bus_vec.size() != new_bus_vec.size()) {
return true;
}
- for (auto ¤t_bu : current_proxy_vec) {
- for (int i = 0; i < new_proxy_vec.size(); i++) {
- if ((current_bu.ip() == new_proxy_vec[i].ip()) &&
- (current_bu.port() == new_proxy_vec[i].port()))
+ for (auto ¤t_bu : current_bus_vec) {
+ for (int i = 0; i < new_bus_vec.size(); i++) {
+ if ((current_bu.ip() == new_bus_vec[i].ip()) &&
+ (current_bu.port() == new_bus_vec[i].port()))
break;
- if (i == (new_proxy_vec.size() - 1)) {
- if ((current_bu.ip() != new_proxy_vec[i].ip() ||
- current_bu.port() == new_proxy_vec[i].port())) {
- LOG_INFO("current proxy ip." << current_bu.ip() << ":"
- << current_bu.port()
- << " can`t find in proxy.");
+ if (i == (new_bus_vec.size() - 1)) {
+ if ((current_bu.ip() != new_bus_vec[i].ip() ||
+ current_bu.port() == new_bus_vec[i].port())) {
+ LOG_INFO("current bus ip." << current_bu.ip() << ":"
+ << current_bu.port()
+ << " can`t find in bus.");
return true;
}
}
@@ -232,20 +284,188 @@ bool SendGroup::IsConfChanged(ProxyInfoVec
¤t_proxy_vec,
}
bool SendGroup::IsAvailable() {
- unique_read_lock<read_write_mutex> rdlck(remote_proxy_list_mutex_);
- if (tcp_clients_ == nullptr) {
+ unique_read_lock<read_write_mutex> rdlck(work_clients_mutex_);
+ if (work_clients_ == nullptr) {
return false;
}
- if (tcp_clients_->empty()) {
+ if (work_clients_->empty()) {
return false;
}
return true;
}
void SendGroup::ClearOldTcpClients() {
- if (tcp_clients_old_ != nullptr) {
- LOG_INFO("ClearOldTcpClients." << tcp_clients_old_->size());
- tcp_clients_old_->clear();
- tcp_clients_old_.reset();
+ if (work_clients_old_ != nullptr) {
+ LOG_INFO("ClearOldTcpClients." << work_clients_old_->size());
+ work_clients_old_->clear();
+ work_clients_old_.reset();
}
}
+
+void SendGroup::LoadBalance(std::error_code error) {
+ if (error) {
+ return;
+ }
+
+ if (NeedDoLoadBalance()) {
+ DoLoadBalance();
+ }
+ uint64_t interval = load_balance_interval_ + rand() % 120 * 1000;
+ LOG_INFO("LoadBalance interval:" << interval);
+ load_balance_timer_->expires_after(std::chrono::milliseconds(interval));
+ load_balance_timer_->async_wait(
+ std::bind(&SendGroup::LoadBalance, this, std::placeholders::_1));
+}
+
+void SendGroup::DoLoadBalance() {
+ if (reserve_clients_.empty()) {
+ return;
+ }
+
+ TcpClientTPtrT work_client = GetMaxLoadClient();
+ TcpClientTPtrT reserve_client = GetReserveClient();
+ if (reserve_client == nullptr || work_client == nullptr) {
+ LOG_ERROR("client nullptr");
+ return;
+ }
+
+ if ((work_client->GetAvgLoad() - reserve_client->GetAvgLoad()) >
+ load_threshold_) {
+ LOG_INFO("DoLoadBalance " << reserve_client->getClientInfo() << "replace"
+ << work_client->getClientInfo() << ",load[work "
+ << work_client->GetAvgLoad() << "][reserve "
+ << reserve_client->GetAvgLoad() << "][threshold "
+ << load_threshold_ << "]");
+ std::string ip = work_client->getIp();
+ uint32_t port = work_client->getPort();
+ work_client->UpdateClient(reserve_client->getIp(),
+ reserve_client->getPort());
+
+ ProxyInfo proxy = GetRandomProxy(ip, port);
+ if (!proxy.ip().empty()) {
+ reserve_client->UpdateClient(proxy.ip(), proxy.port());
+ }
+ }
+
+ need_balance_ = true;
+}
+bool SendGroup::NeedDoLoadBalance() {
+ unique_read_lock<read_write_mutex> rdlck(work_clients_mutex_);
+ if (load_threshold_ <= 0 ||
+ work_clients_->size() == current_bus_vec_.size()) {
+ LOG_INFO("Don`t need DoLoadBalance [load_threshold]:"
+ << load_threshold_
+ << ",[tcp_client size]:" << work_clients_->size()
+ << ",[current_bus_vec size]:" << current_bus_vec_.size());
+ need_balance_ = false;
+ return false;
+ }
+ need_balance_ = true;
+ return true;
+}
+void SendGroup::InitReserveClient() {
+ if (max_proxy_num_ >= current_bus_vec_.size()) {
+ return;
+ }
+ uint64_t max_reserve_num = current_bus_vec_.size() - max_proxy_num_;
+ uint64_t reserve_num =
+ std::min(SdkConfig::getInstance()->reserve_proxy_num_, max_reserve_num);
+ if (reserve_num <= 0) {
+ return;
+ }
+
+ unique_write_lock<read_write_mutex> wtlck(reserve_clients_mutex_);
+ reserve_clients_.clear();
+
+ for (uint64_t i = current_bus_vec_.size() - reserve_num;
+ i < current_bus_vec_.size(); i++) {
+ ProxyInfo bus_tmp = current_bus_vec_[i];
+ TcpClientTPtrT tcpClientTPtrT =
+ std::make_shared<TcpClient>(io_context_, bus_tmp.ip(), bus_tmp.port());
+ reserve_clients_.push_back(tcpClientTPtrT);
+ }
+ LOG_INFO(
+ "InitReserveClient reserve_clients size:" << reserve_clients_.size());
+}
+bool SendGroup::UpSort(const TcpClientTPtrT &begin, const TcpClientTPtrT &end)
{
+ if (begin && end) {
+ return begin->GetAvgLoad() < end->GetAvgLoad();
+ }
+ return false;
+}
+
+TcpClientTPtrT SendGroup::GetMaxLoadClient() {
+ unique_read_lock<read_write_mutex> rdlck(work_clients_mutex_);
+ uint32_t client_index = 0;
+ int32_t max_load = (*work_clients_)[0]->GetAvgLoad();
+ for (int index = 1; index < work_clients_->size(); index++) {
+ int32_t proxy_load = (*work_clients_)[index]->GetAvgLoad();
+ if (proxy_load > max_load) {
+ max_load = proxy_load;
+ client_index = index;
+ }
+ }
+ return (*work_clients_)[client_index];
+}
+
+ProxyInfo SendGroup::GetRandomProxy(const std::string &ip, uint32_t port) {
+ ProxyInfo proxy_info;
+ for (auto &it : current_bus_vec_) {
+ if (it.ip() == ip && it.port() == port) {
+ continue;
+ }
+ bool exist = false;
+ for (int index = 0; index < reserve_clients_.size(); index++) {
+ if (it.ip() == reserve_clients_[index]->getIp() &&
+ it.port() == reserve_clients_[index]->getPort()) {
+ exist = true;
+ break;
+ }
+ }
+ if (exist) {
+ continue;
+ }
+ if (ExistInWorkClient(it.ip(), it.port())) {
+ continue;
+ }
+ proxy_info.setIp(it.ip());
+ proxy_info.setPort(it.port());
+ return proxy_info;
+ }
+ return proxy_info;
+}
+
+TcpClientTPtrT SendGroup::GetReserveClient() {
+ std::sort(reserve_clients_.begin(), reserve_clients_.end(), UpSort);
+ unique_read_lock<read_write_mutex> rdlck(work_clients_mutex_);
+ ProxyInfo proxy_info;
+ for (auto &it : reserve_clients_) {
+ if (it->GetAvgLoad() <= 0) {
+ continue;
+ }
+ bool exist = false;
+ for (int index = 0; index < work_clients_->size(); index++) {
+ if (it->getIp() == (*work_clients_)[index]->getIp() &&
+ it->getPort() == (*work_clients_)[index]->getPort()) {
+ exist = true;
+ break;
+ }
+ }
+ if (exist) {
+ continue;
+ }
+ return it;
+ }
+ return nullptr;
+}
+
+bool SendGroup::ExistInWorkClient(const std::string &ip, uint32_t port) {
+ unique_read_lock<read_write_mutex> rdlck(work_clients_mutex_);
+ for (int index = 0; index < work_clients_->size(); index++) {
+ if (ip == (*work_clients_)[index]->getIp() &&
+ port == (*work_clients_)[index]->getPort()) {
+ return true;
+ }
+ }
+ return false;
+}
} // namespace inlong
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.h
index 88dc90771c..a52d63dcd3 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.h
@@ -39,27 +39,37 @@ private:
io_context_work work_;
std::thread thread_;
void Run();
+ uint64_t max_proxy_num_;
public:
- std::shared_ptr<std::vector<TcpClientTPtrT>> tcp_clients_;
- std::shared_ptr<std::vector<TcpClientTPtrT>> tcp_clients_old_;
+ std::shared_ptr<std::vector<TcpClientTPtrT>> work_clients_;
+ std::shared_ptr<std::vector<TcpClientTPtrT>> work_clients_old_;
+ std::vector<TcpClientTPtrT> reserve_clients_;
- ProxyInfoVec current_proxy_vec_;
- TcpClientTPtrVecItT current_client_;
+ ProxyInfoVec current_bus_vec_;
std::queue<SendBufferPtrT> send_buf_list_;
SteadyTimerPtr send_timer_;
SteadyTimerPtr update_conf_timer_;
+ SteadyTimerPtr load_balance_timer_;
read_write_mutex send_group_mutex_;
- read_write_mutex remote_proxy_list_mutex_;
+ read_write_mutex work_clients_mutex_;
+ read_write_mutex reserve_clients_mutex_;
std::mutex mutex_;
- std::string group_id_;
+ std::string send_group_key_;
std::uint32_t send_idx_;
uint32_t max_send_queue_num_;
- SendGroup(std::string group_id);
+ uint32_t dispatch_interval_;
+ uint32_t heart_beat_interval_;
+ uint32_t load_balance_interval_;
+ uint32_t dispatch_stat_;
+ volatile bool need_balance_;
+ volatile int32_t load_threshold_;
+
+ SendGroup(std::string send_group_key);
~SendGroup();
void PreDispatchData(std::error_code error);
@@ -69,14 +79,24 @@ public:
SendBufferPtrT PopData();
uint32_t GetQueueSize();
void UpdateConf(std::error_code error);
- bool IsConfChanged(ProxyInfoVec ¤t_proxy_vec,
- ProxyInfoVec &new_proxy_vec);
+ bool IsConfChanged(ProxyInfoVec ¤t_bus_vec, ProxyInfoVec &new_bus_vec);
bool IsAvailable();
- uint32_t dispatch_interval_;
-
void ClearOldTcpClients();
+
+ // balance
+ bool NeedDoLoadBalance();
+ void LoadBalance(std::error_code error);
+ void DoLoadBalance();
+ void HeartBeat();
+ ProxyInfo GetRandomProxy(const std::string &ip = "", uint32_t port = 0);
+ TcpClientTPtrT GetReserveClient();
+ TcpClientTPtrT GetMaxLoadClient();
+ void InitReserveClient();
+ static bool UpSort(const TcpClientTPtrT &begin, const TcpClientTPtrT &end);
+ bool ExistInWorkClient(const std::string &ip, uint32_t port);
};
using SendGroupPtr = std::shared_ptr<SendGroup>;
+
} // namespace inlong
#endif // INLONG_SDK_SEND_GROUP_H
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
index abe33dde21..f2e887d29c 100644
---
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
+++
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
@@ -181,6 +181,17 @@ int32_t ProxyManager::ParseAndGet(const std::string
&inlong_group_id,
}
groupid_2_cluster_id_update_map_[inlong_group_id] =
clusterInfo["clusterId"].GetInt();
+
+ // check load
+ int32_t load = 0;
+ if (clusterInfo.HasMember("load") && clusterInfo["load"].IsInt() &&
+ !clusterInfo["load"].IsNull()) {
+ const rapidjson::Value &obj = clusterInfo["load"];
+ load = obj.GetInt();
+ } else {
+ load = 0;
+ }
+
// proxy list
for (auto &proxy : nodeList.GetArray()) {
std::string ip;
@@ -212,7 +223,7 @@ int32_t ProxyManager::ParseAndGet(const std::string
&inlong_group_id,
LOG_WARN("there is no id info of inlong_group_id");
continue;
}
- proxy_info_vec.emplace_back(id, ip, port);
+ proxy_info_vec.emplace_back(id, ip, port, load);
}
return SdkCode::kSuccess;
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
index b2af191112..eb17f56313 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
@@ -43,6 +43,9 @@ static const uint32_t kMaxStreamIdNum = 100;
static const int32_t kDispatchIntervalZip = 8;
static const int32_t kDispatchIntervalSend = 10;
+static const int32_t kLoadBalanceInterval = 300000;
+static const int32_t kHeartBeatInterval = 60000;
+static const bool kEnableBalance = true;
static const bool kEnablePack = true;
static const uint32_t kPackSize = 409600;
@@ -66,6 +69,7 @@ static const char kManagerClusterURL[] =
static const uint32_t kManagerUpdateInterval = 2;
static const uint32_t kManagerTimeout = 5;
static const uint32_t kMaxProxyNum = 8;
+static const uint32_t kReserveProxyNum = 2;
static const bool kEnableTCPNagle = true;
static const uint32_t kTcpIdleTime = 600000;
@@ -91,6 +95,12 @@ static const uint32_t kMaxAttrLen = 2048;
const uint32_t ATTR_LENGTH = 10;
static const bool kEnableIsolation = false;
+static const int32_t kDefaultLoadThreshold = 200;
+const uint32_t MAX_STAT = 10000000;
+static const int32_t kWeight[30] = {1, 1, 1, 1, 1, 2, 2, 2, 2, 2,
+ 3, 3, 3, 3, 3, 6, 6, 6, 6, 6,
+ 12, 12, 12, 12, 12, 48, 96, 192, 384,
1000};
+
} // namespace constants
} // namespace inlong
#endif // INLONG_SDK_CONSTANT_H
\ No newline at end of file