This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 7178b39a3f [INLONG-9355][SDK] Optimize resource isolation for CPP SDK
(#9357)
7178b39a3f is described below
commit 7178b39a3fd30d1af9a26d5259f72b727167c519
Author: doleyzi <[email protected]>
AuthorDate: Wed Nov 29 18:21:10 2023 +0800
[INLONG-9355][SDK] Optimize resource isolation for CPP SDK (#9357)
---
.../dataproxy-sdk-cpp/release/demo/send_demo.cc | 2 +-
.../dataproxy-sdk-cpp/release/inc/sdk_conf.h | 3 +-
.../dataproxy-sdk-cpp/src/config/sdk_conf.cc | 19 +++++-----
.../dataproxy-sdk-cpp/src/group/recv_group.cc | 29 +++++++++++++---
.../dataproxy-sdk-cpp/src/group/recv_group.h | 1 +
.../dataproxy-sdk-cpp/src/manager/proxy_manager.cc | 40 +++++++++++-----------
.../dataproxy-sdk-cpp/src/manager/proxy_manager.h | 3 +-
.../dataproxy-sdk-cpp/src/manager/send_manager.cc | 24 ++++++-------
.../dataproxy-sdk-cpp/src/manager/send_manager.h | 1 +
.../dataproxy-sdk-cpp/src/utils/capi_constant.h | 3 ++
10 files changed, 76 insertions(+), 49 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/demo/send_demo.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/demo/send_demo.cc
index a4a783e0dc..b09f8463f1 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/demo/send_demo.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/demo/send_demo.cc
@@ -53,7 +53,7 @@ int main(int argc, char const *argv[]) {
cout << "---->start sdk successfully" << endl;
int count = 1000;
- string inlong_group_id = "test_cpp_sdk_20230404";
+ string inlong_group_id = "test_cpp_sdk";
string inlong_stream_id = "stream1";
if (4 == argc) {
inlong_group_id = argv[2];
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h
index 578278c5fd..6d7b23dc21 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h
@@ -83,7 +83,7 @@ private:
uint64_t max_proxy_num_;
uint64_t reserve_proxy_num_;
uint32_t msg_type_;
- bool enable_isolation_;
+ uint32_t isolation_level_;
// Network parameters
bool enable_tcp_nagle_;
@@ -92,6 +92,7 @@ private:
bool enable_balance_;
bool enable_local_cache_;
+
// auth settings
bool need_auth_;
std::string auth_id_;
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
index e9d679cd8c..319262adc2 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
@@ -92,6 +92,7 @@ void SdkConfig::defaultInit() {
load_balance_interval_ = constants::kLoadBalanceInterval;
heart_beat_interval_ = constants::kHeartBeatInterval;
enable_balance_ = constants::kEnableBalance;
+ isolation_level_=constants::IsolationLevel::kLevelSecond;
// cache parameter
send_buf_size_ = constants::kSendBufSize;
@@ -120,7 +121,6 @@ void SdkConfig::defaultInit() {
manager_update_interval_ = constants::kManagerUpdateInterval;
manager_url_timeout_ = constants::kManagerTimeout;
max_proxy_num_ = constants::kMaxProxyNum;
- enable_isolation_ = constants::kEnableIsolation;
reserve_proxy_num_ = constants::kReserveProxyNum;
enable_local_cache_ = constants::kEnableLocalCache;
@@ -360,13 +360,6 @@ void SdkConfig::InitManagerParam(const rapidjson::Value
&doc) {
std::string inlong_group_ids_str = obj.GetString();
Utils::splitOperate(inlong_group_ids_str, inlong_group_ids_, ",");
}
- // enable isolation
- if (doc.HasMember("enable_isolation") && doc["enable_isolation"].IsBool()) {
- const rapidjson::Value &obj = doc["enable_isolation"];
- enable_isolation_ = obj.GetBool();
- } else {
- enable_isolation_ = constants::kEnableIsolation;
- }
// enable local cache
if (doc.HasMember("enable_local_cache") &&
doc["enable_local_cache"].IsBool()) {
@@ -375,6 +368,14 @@ void SdkConfig::InitManagerParam(const rapidjson::Value
&doc) {
} else {
enable_local_cache_ = constants::kEnableLocalCache;
}
+
+ // isolation level
+ if (doc.HasMember("isolation_level") && doc["isolation_level"].IsInt() &&
doc["isolation_level"].GetInt() > 0) {
+ const rapidjson::Value &obj = doc["isolation_level"];
+ isolation_level_ = obj.GetInt();
+ } else {
+ isolation_level_ = constants::IsolationLevel::kLevelSecond;
+ }
}
void SdkConfig::InitTcpParam(const rapidjson::Value &doc) {
@@ -564,7 +565,7 @@ void SdkConfig::ShowClientConfig() {
LOG_INFO("auth_key: " << auth_key_.c_str());
LOG_INFO("max_group_id_num: " << max_group_id_num_);
LOG_INFO("max_stream_id_num: " << max_stream_id_num_);
- LOG_INFO("enable_isolation: " << enable_isolation_);
+ LOG_INFO("isolation_level: " << isolation_level_);
}
} // namespace inlong
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
index 79931bd434..b852095f68 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
@@ -26,11 +26,13 @@
namespace inlong {
const uint32_t DEFAULT_PACK_ATTR = 400;
+const uint64_t LOG_SAMPLE=100;
RecvGroup::RecvGroup(const std::string &group_key,std::shared_ptr<SendManager>
send_manager)
: cur_len_(0), groupId_num_(0), streamId_num_(0),
msg_type_(SdkConfig::getInstance()->msg_type_),
data_capacity_(SdkConfig::getInstance()->buf_size_),
- send_manager_(send_manager),group_key_(group_key) {
+ send_manager_(send_manager),group_key_(group_key),
+ log_stat_(0){
data_capacity_ = std::max(SdkConfig::getInstance()->max_msg_size_,
SdkConfig::getInstance()->pack_size_);
data_capacity_ = data_capacity_ + DEFAULT_PACK_ATTR;
@@ -70,22 +72,41 @@ int32_t RecvGroup::DoDispatchMsg() {
last_pack_time_ = Utils::getCurrentMsTime();
std::lock_guard<std::mutex> lck(mutex_);
if (group_key_.empty()) {
- LOG_ERROR("groupId is empty, check!!");
+ if (log_stat_++ > LOG_SAMPLE) {
+ LOG_ERROR("groupId is empty, check!!");
+ log_stat_ = 0;
+ }
return SdkCode::kInvalidInput;
}
if (msgs_.empty()) {
- LOG_ERROR("no msg in msg_set, check!");
+ if (log_stat_++ > LOG_SAMPLE) {
+ LOG_ERROR("no msg in msg_set, check!");
+ log_stat_ = 0;
+ }
return SdkCode::kMsgEmpty;
}
auto send_group = send_manager_->GetSendGroup(group_key_);
if (send_group == nullptr) {
- LOG_ERROR("failed to get send_buf, something gets wrong, checkout!");
+ if (log_stat_++ > LOG_SAMPLE) {
+ LOG_ERROR("failed to get send_buf, something gets wrong, checkout!");
+ log_stat_ = 0;
+ }
return SdkCode::kFailGetSendBuf;
}
if (!send_group->IsAvailable()) {
+ if (log_stat_++ > LOG_SAMPLE) {
+ LOG_ERROR("failed to get send group! group_key:"
+ << group_key_ << " send group is not available!");
+ log_stat_ = 0;
+ }
return SdkCode::kFailGetConn;
}
if (send_group->IsFull()) {
+ if (log_stat_++ > LOG_SAMPLE) {
+ LOG_ERROR("failed to get send group! group_key:"
+ << group_key_ << " send group is full!");
+ log_stat_ = 0;
+ }
return SdkCode::kSendBufferFull;
}
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h
index 2d1a9e6315..58d3dc70a3 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h
@@ -48,6 +48,7 @@ private:
uint64_t max_recv_size_;
std::string group_key_;
+ uint64_t log_stat_;
int32_t DoDispatchMsg();
void AddMsg(const std::string &msg, std::string client_ip,
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
index 1c5f440119..8c2dfc4c1e 100644
---
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
+++
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
@@ -254,11 +254,10 @@ int32_t ProxyManager::ParseAndGet(const std::string
&inlong_group_id,
int32_t ProxyManager::GetProxy(const std::string &key,
ProxyInfoVec &proxy_info_vec) {
- if (SdkConfig::getInstance()->enable_isolation_) {
+ if (constants::IsolationLevel::kLevelOne ==
SdkConfig::getInstance()->isolation_level_) {
return GetProxyByGroupid(key, proxy_info_vec);
- } else {
- return GetProxyByClusterId(key, proxy_info_vec);
}
+ return GetProxyByClusterId(key, proxy_info_vec);
}
int32_t ProxyManager::CheckBidConf(const std::string &inlong_group_id,
@@ -289,13 +288,14 @@ int32_t ProxyManager::CheckBidConf(const std::string
&inlong_group_id,
return SdkCode::kSuccess;
}
-bool ProxyManager::HasProxy(const std::string &inlong_group_id) {
- if (SdkConfig::getInstance()->enable_isolation_) {
- return CheckGroupid(inlong_group_id);
- } else {
- return CheckClusterId(inlong_group_id);
+bool ProxyManager::HasProxy(const std::string &group_key) {
+ if (constants::IsolationLevel::kLevelOne ==
+ SdkConfig::getInstance()->isolation_level_) {
+ return CheckGroupid(group_key);
}
+ return CheckClusterId(group_key);
}
+
int32_t ProxyManager::GetProxyByGroupid(const std::string &inlong_group_id,
ProxyInfoVec &proxy_info_vec) {
unique_read_lock<read_write_mutex> rdlck(groupid_2_proxy_map_rwmutex_);
@@ -319,15 +319,11 @@ int32_t ProxyManager::GetProxyByClusterId(const
std::string &cluster_id,
return SdkCode::kSuccess;
}
std::string ProxyManager::GetGroupKey(const std::string &groupid) {
- if (SdkConfig::getInstance()->enable_isolation_) {
- return groupid;
- }
- unique_read_lock<read_write_mutex> rdlck(groupid_2_cluster_id_rwmutex_);
- auto it = groupid_2_cluster_id_map_.find(groupid);
- if (it == groupid_2_cluster_id_map_.end()) {
- return "";
+ if (constants::IsolationLevel::kLevelThird ==
+ SdkConfig::getInstance()->isolation_level_) {
+ return GetClusterID(groupid);
}
- return it->second;
+ return groupid;
}
bool ProxyManager::CheckGroupid(const std::string &groupid) {
unique_read_lock<read_write_mutex> rdlck(groupid_2_proxy_map_rwmutex_);
@@ -346,9 +342,6 @@ bool ProxyManager::CheckClusterId(const std::string
&cluster_id) {
return true;
}
void ProxyManager::UpdateClusterId2ProxyMap() {
- if (SdkConfig::getInstance()->enable_isolation_) {
- return;
- }
unique_read_lock<read_write_mutex> rdlck(groupid_2_cluster_id_rwmutex_);
for (const auto &it : groupid_2_cluster_id_update_map_) {
ProxyInfoVec proxy_info_vec;
@@ -440,5 +433,12 @@ std::string ProxyManager::RecoverFromLocalCache(const
std::string &groupid) {
LOG_INFO("RecoverFromLocalCache:" << groupid << ",local cache:" <<
meta_data);
return meta_data;
}
-
+std::string ProxyManager::GetClusterID(const std::string &groupid) {
+ unique_read_lock<read_write_mutex> rdlck(groupid_2_cluster_id_rwmutex_);
+ auto it = groupid_2_cluster_id_map_.find(groupid);
+ if (it == groupid_2_cluster_id_map_.end()) {
+ return "";
+ }
+ return it->second;
+}
} // namespace inlong
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
index 6e1c58ef39..8ae859869f 100644
---
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
+++
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
@@ -68,7 +68,7 @@ public:
int32_t GetProxyByClusterId(const std::string &cluster_id,
ProxyInfoVec &proxy_info_vec);
std::string GetGroupKey(const std::string &groupid);
- bool HasProxy(const std::string &inlong_group_id);
+ bool HasProxy(const std::string &group_key);
bool CheckGroupid(const std::string &groupid);
bool CheckClusterId(const std::string &cluster_id);
void UpdateClusterId2ProxyMap();
@@ -77,6 +77,7 @@ public:
void ReadLocalCache();
void WriteLocalCache();
std::string RecoverFromLocalCache(const std::string&groupid);
+ std::string GetClusterID(const std::string &groupid);
};
} // namespace inlong
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc
index 6104411b9a..0af18f7ccb 100644
---
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc
+++
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc
@@ -22,22 +22,15 @@
#include "proxy_manager.h"
namespace inlong {
SendManager::SendManager() : send_group_idx_(0) {
- for (int32_t i = 0; i < SdkConfig::getInstance()->inlong_group_ids_.size();
- i++) {
- LOG_INFO("SendManager, group_id:"
- << SdkConfig::getInstance()->inlong_group_ids_[i]
- << " send group num:"
- << SdkConfig::getInstance()->per_groupid_thread_nums_);
- std::string send_group_key = ProxyManager::GetInstance()->GetGroupKey(
- SdkConfig::getInstance()->inlong_group_ids_[i]);
- AddSendGroup(send_group_key);
- }
+ LOG_INFO("SendManager,send group num:"
+ << SdkConfig::getInstance()->per_groupid_thread_nums_);
}
SendGroupPtr SendManager::GetSendGroup(const std::string &group_key) {
- SendGroupPtr send_group_ptr = DoGetSendGroup(group_key);
+ std::string send_key= GetSendKey(group_key);
+ SendGroupPtr send_group_ptr = DoGetSendGroup(send_key);
if (send_group_ptr == nullptr) {
- AddSendGroup(group_key);
+ AddSendGroup(send_key);
}
return send_group_ptr;
}
@@ -84,5 +77,10 @@ SendGroupPtr SendManager::DoGetSendGroup(const std::string
&send_group_key) {
}
return send_group_vec[send_group_idx_];
}
-
+std::string SendManager::GetSendKey(const std::string &send_group_key) {
+ if (constants::IsolationLevel::kLevelSecond ==
SdkConfig::getInstance()->isolation_level_) {
+ return ProxyManager::GetInstance()->GetClusterID(send_group_key);
+ }
+ return send_group_key;
+}
} // namespace inlong
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.h
index fa627647f2..d6ab24bcbc 100644
---
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.h
+++
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.h
@@ -33,6 +33,7 @@ private:
std::unordered_map<std::string, std::vector<SendGroupPtr>> send_group_map_;
SendGroupPtr DoGetSendGroup(const std::string &send_group_key);
void DoAddSendGroup(const std::string &send_group_key);
+ std::string GetSendKey(const std::string &send_group_key);
volatile uint32_t send_group_idx_;
public:
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
index fe25ee7861..eaa2821263 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
@@ -25,6 +25,9 @@
namespace inlong {
namespace constants {
+enum IsolationLevel{
+ kLevelOne =1, kLevelSecond =2, kLevelThird =3
+};
static const int32_t kMaxRequestTDMTimes = 4;
static const char kAttrFormat[] =
"__addcol1__reptime=yyyymmddHHMMSS&__addcol2_ip=xxx.xxx.xxx.xxx";