This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 912dce3b2e [INLONG-9378][SDK] Optimize proxy configuration update
(#9381)
912dce3b2e is described below
commit 912dce3b2ed886b28b4531d739d8774a3c698c56
Author: doleyzi <[email protected]>
AuthorDate: Fri Dec 1 18:37:13 2023 +0800
[INLONG-9378][SDK] Optimize proxy configuration update (#9381)
---
.../dataproxy-sdk-cpp/src/manager/proxy_manager.cc | 197 +++++++++++++--------
.../dataproxy-sdk-cpp/src/manager/proxy_manager.h | 15 +-
.../dataproxy-sdk-cpp/src/utils/capi_constant.h | 5 +-
3 files changed, 140 insertions(+), 77 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
index 8c2dfc4c1e..e3771db99e 100644
---
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
+++
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
@@ -27,6 +27,7 @@
#include <rapidjson/document.h>
namespace inlong {
+const uint64_t MINUTE = 60000;
ProxyManager *ProxyManager::instance_ = new ProxyManager();
ProxyManager::~ProxyManager() {
if (update_conf_thread_.joinable()) {
@@ -41,6 +42,7 @@ ProxyManager::~ProxyManager() {
}
void ProxyManager::Init() {
timeout_ = SdkConfig::getInstance()->manager_url_timeout_;
+ last_update_time_ = Utils::getCurrentMsTime();
if (__sync_bool_compare_and_swap(&inited_, false, true)) {
update_conf_thread_ = std::thread(&ProxyManager::Update, this);
}
@@ -65,8 +67,11 @@ void ProxyManager::Update() {
LOG_INFO("proxylist DoUpdate thread exit");
}
void ProxyManager::DoUpdate() {
- update_mutex_.try_lock();
LOG_INFO("start ProxyManager DoUpdate.");
+ if (!update_mutex_.try_lock()) {
+ LOG_INFO("DoUpdate try_lock. " << getpid());
+ return;
+ }
std::srand(unsigned(std::time(nullptr)));
@@ -76,80 +81,30 @@ void ProxyManager::DoUpdate() {
return;
}
- {
- unique_read_lock<read_write_mutex> rdlck(groupid_2_cluster_id_rwmutex_);
- for (auto &groupid2cluster : groupid_2_cluster_id_map_) {
- std::string url;
- if (SdkConfig::getInstance()->enable_manager_url_from_cluster_)
- url = SdkConfig::getInstance()->manager_cluster_url_;
- else {
- url = SdkConfig::getInstance()->manager_url_ + "/" +
- groupid2cluster.first;
- }
- std::string post_data = "ip=" + SdkConfig::getInstance()->local_ip_ +
- "&version=" + constants::kVersion +
- "&protocolType=" + constants::kProtocolType;
- LOG_WARN("get inlong_group_id:" << groupid2cluster.first.c_str()
- << "proxy cfg url " << url.c_str()
- << "post_data:" << post_data.c_str());
-
- std::string meta_data;
- int32_t ret;
- std::string urlByDNS;
- for (int i = 0; i < constants::kMaxRequestTDMTimes; i++) {
- HttpRequest request = {url,
- timeout_,
- SdkConfig::getInstance()->need_auth_,
- SdkConfig::getInstance()->auth_id_,
- SdkConfig::getInstance()->auth_key_,
- post_data};
- ret = Utils::requestUrl(meta_data, &request);
- if (!ret) {
- break;
- } // request success
- }
-
- if (ret != SdkCode::kSuccess) {
- if (groupid_2_proxy_map_.find(groupid2cluster.first) !=
groupid_2_proxy_map_.end()) {
- LOG_WARN("failed to request from manager, use previous " <<
groupid2cluster.first);
- continue;
- }
- if (!SdkConfig::getInstance()->enable_local_cache_) {
- LOG_WARN("failed to request from manager, forbid local cache!");
- continue;
- }
- meta_data = RecoverFromLocalCache(groupid2cluster.first);
- if (meta_data.empty()) {
- LOG_WARN("local cache is empty!");
- continue;
- }
- }
+ int retry = constants::MAX_RETRY;
+ do {
+ std::unordered_map<std::string, std::string> bid_2_cluster_id =
+ BuildGroupId2ClusterId();
- ProxyInfoVec proxyInfoVec;
- ret = ParseAndGet(groupid2cluster.first, meta_data, proxyInfoVec);
- if (ret != SdkCode::kSuccess) {
- LOG_ERROR("failed to parse groupid:%s json proxy list "
- << groupid2cluster.first.c_str());
- continue;
- }
- if (!proxyInfoVec.empty()) {
- unique_write_lock<read_write_mutex>
wtlck(groupid_2_proxy_map_rwmutex_);
- groupid_2_proxy_map_[groupid2cluster.first] = proxyInfoVec;
- cache_proxy_info_[groupid2cluster.first] = meta_data;
- LOG_INFO("groupid:" << groupid2cluster.first << " success update "
- << proxyInfoVec.size() << " proxy-ip.");
- }
- }
- }
+ UpdateProxy(bid_2_cluster_id);
- UpdateGroupid2ClusterIdMap();
+ UpdateGroupid2ClusterIdMap();
- UpdateClusterId2ProxyMap();
+ UpdateClusterId2ProxyMap();
+ uint64_t id_count = GetGroupIdCount();
+ if (bid_2_cluster_id.size() == id_count) {
+ break;
+ }
+ LOG_INFO("retry DoUpdate!. update size:" << bid_2_cluster_id.size()
+ << " != latest size:" <<
id_count);
+ } while (retry--);
if (SdkConfig::getInstance()->enable_local_cache_) {
WriteLocalCache();
}
+ last_update_time_ = Utils::getCurrentMsTime();
+
update_mutex_.unlock();
LOG_INFO("finish ProxyManager DoUpdate.");
}
@@ -254,7 +209,8 @@ int32_t ProxyManager::ParseAndGet(const std::string
&inlong_group_id,
int32_t ProxyManager::GetProxy(const std::string &key,
ProxyInfoVec &proxy_info_vec) {
- if (constants::IsolationLevel::kLevelOne ==
SdkConfig::getInstance()->isolation_level_) {
+ if (constants::IsolationLevel::kLevelOne ==
+ SdkConfig::getInstance()->isolation_level_) {
return GetProxyByGroupid(key, proxy_info_vec);
}
return GetProxyByClusterId(key, proxy_info_vec);
@@ -360,8 +316,9 @@ void ProxyManager::UpdateGroupid2ClusterIdMap() {
}
}
-void ProxyManager::BuildLocalCache(std::ofstream &file, int32_t groupid_index,
const std::string &groupid,
- const std::string &meta_data) {
+void ProxyManager::BuildLocalCache(std::ofstream &file, int32_t groupid_index,
+ const std::string &groupid,
+ const std::string &meta_data) {
file << "[groupid" << groupid_index << "]" << std::endl;
file << "groupid=" << groupid << std::endl;
file << "proxy_cfg=" << meta_data << std::endl;
@@ -393,7 +350,7 @@ void ProxyManager::ReadLocalCache() {
LOG_INFO("read cache file, id:" << groupid << ", local config:" <<
proxy);
cache_proxy_info_[groupid] = proxy;
}
- }catch (...){
+ } catch (...) {
LOG_ERROR("ReadLocalCache error!");
}
}
@@ -441,4 +398,102 @@ std::string ProxyManager::GetClusterID(const std::string
&groupid) {
}
return it->second;
}
+
+void ProxyManager::UpdateProxy(
+ std::unordered_map<std::string, std::string> &group_id_2_cluster_id) {
+ for (auto &groupid2cluster : group_id_2_cluster_id) {
+ if (SkipUpdate(groupid2cluster.first)) {
+ LOG_WARN("SkipUpdate group_id:" << groupid2cluster.first);
+ continue;
+ }
+ std::string url;
+ if (SdkConfig::getInstance()->enable_manager_url_from_cluster_)
+ url = SdkConfig::getInstance()->manager_cluster_url_;
+ else {
+ url =
+ SdkConfig::getInstance()->manager_url_ + "/" + groupid2cluster.first;
+ }
+ std::string post_data = "ip=" + SdkConfig::getInstance()->local_ip_ +
+ "&version=" + constants::kVersion +
+ "&protocolType=" + constants::kProtocolType;
+ LOG_WARN("get inlong_group_id:" << groupid2cluster.first.c_str()
+ << "proxy cfg url " << url.c_str()
+ << "post_data:" << post_data.c_str());
+
+ std::string meta_data;
+ int32_t ret;
+ std::string urlByDNS;
+ for (int i = 0; i < constants::kMaxRequestTDMTimes; i++) {
+ HttpRequest request = {url,
+ timeout_,
+ SdkConfig::getInstance()->need_auth_,
+ SdkConfig::getInstance()->auth_id_,
+ SdkConfig::getInstance()->auth_key_,
+ post_data};
+ ret = Utils::requestUrl(meta_data, &request);
+ if (!ret) {
+ break;
+ } // request success
+ }
+
+ if (ret != SdkCode::kSuccess) {
+ if (groupid_2_proxy_map_.find(groupid2cluster.first) !=
+ groupid_2_proxy_map_.end()) {
+ LOG_WARN("failed to request from manager, use previous "
+ << groupid2cluster.first);
+ continue;
+ }
+ if (!SdkConfig::getInstance()->enable_local_cache_) {
+ LOG_WARN("failed to request from manager, forbid local cache!");
+ continue;
+ }
+ meta_data = RecoverFromLocalCache(groupid2cluster.first);
+ if (meta_data.empty()) {
+ LOG_WARN("local cache is empty!");
+ continue;
+ }
+ }
+
+ ProxyInfoVec proxyInfoVec;
+ ret = ParseAndGet(groupid2cluster.first, meta_data, proxyInfoVec);
+ if (ret != SdkCode::kSuccess) {
+ LOG_ERROR("failed to parse groupid:%s json proxy list "
+ << groupid2cluster.first.c_str());
+ continue;
+ }
+ if (!proxyInfoVec.empty()) {
+ unique_write_lock<read_write_mutex> wtlck(groupid_2_proxy_map_rwmutex_);
+ groupid_2_proxy_map_[groupid2cluster.first] = proxyInfoVec;
+ cache_proxy_info_[groupid2cluster.first] = meta_data;
+ LOG_INFO("groupid:" << groupid2cluster.first << " success update "
+ << proxyInfoVec.size() << " proxy-ip.");
+ }
+ }
+}
+std::unordered_map<std::string, std::string>
+ProxyManager::BuildGroupId2ClusterId() {
+ std::unordered_map<std::string, std::string> bid_2_cluster_id_map_tmp;
+ unique_read_lock<read_write_mutex> rdlck(groupid_2_cluster_id_rwmutex_);
+ for (auto &bid2cluster : groupid_2_cluster_id_map_) {
+ bid_2_cluster_id_map_tmp.insert(bid2cluster);
+ }
+ return bid_2_cluster_id_map_tmp;
+}
+
+uint64_t ProxyManager::GetGroupIdCount() {
+ unique_read_lock<read_write_mutex> rdlck(groupid_2_cluster_id_rwmutex_);
+ return groupid_2_cluster_id_map_.size();
+}
+
+bool ProxyManager::SkipUpdate(const std::string &group_id) {
+ uint64_t current_time = Utils::getCurrentMsTime();
+ uint64_t diff = current_time - last_update_time_;
+ uint64_t threshold =
+ SdkConfig::getInstance()->manager_update_interval_ * MINUTE;
+ bool ret = CheckGroupid(group_id);
+ if (diff < threshold && ret) {
+ return true;
+ }
+ return false;
+}
} // namespace inlong
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
index 8ae859869f..f07333c989 100644
---
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
+++
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
@@ -51,6 +51,7 @@ private:
std::thread update_conf_thread_;
volatile bool inited_ = false;
std::unordered_map<std::string, std::string> cache_proxy_info_;
+ uint64_t last_update_time_;
int32_t ParseAndGet(const std::string &key, const std::string &meta_data,
ProxyInfoVec &proxy_info_vec);
@@ -64,7 +65,8 @@ public:
void DoUpdate();
void Init();
int32_t GetProxy(const std::string &groupid, ProxyInfoVec &proxy_info_vec);
- int32_t GetProxyByGroupid(const std::string &inlong_group_id, ProxyInfoVec
&proxy_info_vec);
+ int32_t GetProxyByGroupid(const std::string &inlong_group_id,
+ ProxyInfoVec &proxy_info_vec);
int32_t GetProxyByClusterId(const std::string &cluster_id,
ProxyInfoVec &proxy_info_vec);
std::string GetGroupKey(const std::string &groupid);
@@ -73,11 +75,18 @@ public:
bool CheckClusterId(const std::string &cluster_id);
void UpdateClusterId2ProxyMap();
void UpdateGroupid2ClusterIdMap();
- void BuildLocalCache(std::ofstream &file, int32_t groupid_index, const
std::string &groupid, const std::string &meta_data);
+ void BuildLocalCache(std::ofstream &file, int32_t groupid_index,
+ const std::string &groupid,
+ const std::string &meta_data);
void ReadLocalCache();
void WriteLocalCache();
- std::string RecoverFromLocalCache(const std::string&groupid);
+ std::string RecoverFromLocalCache(const std::string &groupid);
std::string GetClusterID(const std::string &groupid);
+ void UpdateProxy(
+ std::unordered_map<std::string, std::string> &group_id_2_cluster_id);
+ std::unordered_map<std::string, std::string> BuildGroupId2ClusterId();
+ uint64_t GetGroupIdCount();
+ bool SkipUpdate(const std::string &group_id);
};
} // namespace inlong
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
index eaa2821263..1dbebd03db 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
@@ -25,9 +25,7 @@
namespace inlong {
namespace constants {
-enum IsolationLevel{
- kLevelOne =1, kLevelSecond =2, kLevelThird =3
-};
+enum IsolationLevel { kLevelOne = 1, kLevelSecond = 2, kLevelThird = 3 };
static const int32_t kMaxRequestTDMTimes = 4;
static const char kAttrFormat[] =
"__addcol1__reptime=yyyymmddHHMMSS&__addcol2_ip=xxx.xxx.xxx.xxx";
@@ -107,6 +105,7 @@ static const int32_t kWeight[30] = {1, 1, 1, 1, 1, 2,
2, 2, 2, 2,
static const char kCacheFile[] = ".proxy_list.ini";
static const char kCacheTmpFile[] = ".proxy_list.ini.tmp";
+const int MAX_RETRY = 10;
} // namespace constants
} // namespace inlong