This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 667cf01cb4 [INLONG-8883][SDK] Optimize proxy config manager framework
for dataproxy cpp sdk (#8884)
667cf01cb4 is described below
commit 667cf01cb42b195e75e58ff9b06a14f967f4760a
Author: doleyzi <[email protected]>
AuthorDate: Mon Sep 11 15:38:24 2023 +0800
[INLONG-8883][SDK] Optimize proxy config manager framework for dataproxy
cpp sdk (#8884)
---
.../dataproxy-sdk-cpp/src/config/proxy_info.h | 41 ++++
.../dataproxy-sdk-cpp/src/manager/proxy_manager.cc | 234 +++++++++++++++++++++
.../dataproxy-sdk-cpp/src/manager/proxy_manager.h | 64 ++++++
.../dataproxy-sdk-cpp/src/manager/send_manager.cc | 2 +-
.../src/{uitls => utils}/capi_constant.h | 0
.../src/{uitls => utils}/logger.h | 0
6 files changed, 340 insertions(+), 1 deletion(-)
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/proxy_info.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/proxy_info.h
new file mode 100644
index 0000000000..b5fccb5213
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/proxy_info.h
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <string>
+
+#ifndef INLONG_SDK_PROXY_INFO_H
+#define INLONG_SDK_PROXY_INFO_H
+namespace inlong {
+
+class ProxyInfo {
+private:
+ std::string proxy_str_id_;
+ std::string ip_;
+ int32_t port_;
+
+public:
+ ProxyInfo(std::string proxy_str_id, std::string ip, int32_t port)
+ : proxy_str_id_(proxy_str_id), ip_(ip), port_(port) {}
+ ProxyInfo(){};
+ std::string ip() const { return ip_; }
+ int32_t port() const { return port_; }
+};
+using ProxyInfoVec = std::vector<ProxyInfo>;
+} // namespace inlong
+#endif // INLONG_SDK_PROXY_INFO_H
\ No newline at end of file
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
new file mode 100644
index 0000000000..5548a441ba
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "proxy_manager.h"
+#include "../utils/capi_constant.h"
+#include "../utils/logger.h"
+#include "../utils/utils.h"
+#include "api_code.h"
+#include <fstream>
+#include <rapidjson/document.h>
+
+namespace inlong {
+ProxyManager *ProxyManager::instance_ = new ProxyManager();
+ProxyManager::~ProxyManager() {
+ if (update_conf_thread_.joinable()) {
+ update_conf_thread_.join();
+ }
+
+ exit_flag_ = true;
+ std::unique_lock<std::mutex> con_lck(cond_mutex_);
+ update_flag_ = true;
+ con_lck.unlock();
+ cond_.notify_one();
+}
+void ProxyManager::Init() {
+ timeout_ = SdkConfig::getInstance()->bus_URL_timeout_;
+ if (__sync_bool_compare_and_swap(&inited_, false, true)) {
+ update_conf_thread_ = std::thread(&ProxyManager::Update, this);
+ }
+}
+
+void ProxyManager::Update() {
+ while (true) {
+ std::unique_lock<std::mutex> con_lck(cond_mutex_);
+ if (cond_.wait_for(con_lck,
+ std::chrono::minutes(
+ SdkConfig::getInstance()->bus_update_interval_),
+ [this]() { return update_flag_; })) {
+ if (exit_flag_)
+ break;
+ update_flag_ = false;
+ con_lck.unlock();
+ DoUpdate();
+ } else {
+ DoUpdate();
+ }
+ }
+ LOG_INFO("proxylist DoUpdate thread exit");
+}
+void ProxyManager::DoUpdate() {
+ update_mutex_.try_lock();
+ LOG_INFO("start ProxyManager DoUpdate.");
+
+ if (groupid_2_cluster_map_.empty()) {
+ LOG_INFO("empty groupid, no need to DoUpdate buslist");
+ update_mutex_.unlock();
+ return;
+ }
+
+ {
+ unique_read_lock<read_write_mutex> rdlck(bid_2_cluster_rwmutex_);
+ for (auto &groupid2cluster : groupid_2_cluster_map_) {
+ std::string url;
+ if (SdkConfig::getInstance().enable_proxy_URL_from_cluster_)
+ url = SdkConfig::getInstance().proxy_cluster_URL_;
+ else {
+ url = SdkConfig::getInstance().proxy_URL_ + "/" +
groupid2cluster.first;
+ }
+ std::string post_data = "ip=" + SdkConfig::getInstance().ser_ip_ +
+ "&version=" + constants::kTDBusCAPIVersion +
+ "&protocolType=" + constants::kProtocolType;
+ LOG_WARN("get inlong_group_id:%s proxy cfg url:%s, post_data:%s",
+ groupid2cluster.first.c_str(), url.c_str(), post_data.c_str());
+
+ std::string meta_data;
+ int32_t ret;
+ std::string urlByDNS;
+ for (int i = 0; i < constants::kMaxRequestTDMTimes; i++) {
+ ret = Utils::requestUrl(url, urlByDNS, meta_data, timeout_);
+ if (!ret) {
+ break;
+ }
+ }
+ ProxyInfoVec proxyInfoVec;
+ ret = ParseAndGet(groupid2cluster.first, meta_data, proxyInfoVec);
+ if (ret != SdkCode::kSuccess) {
+ LOG_ERROR("failed to parse groupid:%s json proxy list "
+ << groupid2cluster.first.c_str());
+ continue;
+ }
+ if (!proxyInfoVec.empty()) {
+ unique_write_lock<read_write_mutex>
wtlck(groupid_2_proxy_map_rwmutex_);
+ groupid_2_proxy_map_[groupid2cluster.first] = proxyInfoVec;
+ LOG_INFO("groupid:" << groupid2cluster.first << " success update "
+ << proxyInfoVec.size() << " proxy-ip.");
+ }
+ }
+ }
+ update_mutex_.unlock();
+ LOG_INFO("finish ProxyManager DoUpdate.");
+}
+
+int32_t ProxyManager::ParseAndGet(const std::string &groupid,
+ const std::string &meta_data,
+ ProxyInfoVec &proxy_info_vec) {
+ rapidjson::Document doc;
+ if (doc.Parse(meta_data.c_str()).HasParseError()) {
+ LOG_ERROR("failed to parse meta_data, error" << doc.GetParseError() << ":"
+ << doc.GetErrorOffset());
+ return SdkCode::kErrorParseJson;
+ }
+
+ if (doc.HasMember("size") && doc["size"].IsInt() && !doc["size"].IsNull() &&
+ doc["size"].GetInt() > 0) {
+ const rapidjson::Value &obj = doc["size"];
+ } else {
+ LOG_ERROR("can't find groupid:%s buslist from meta_data"
+ << groupid.c_str());
+ return SdkCode::kErrorParseJson;
+ }
+
+ if (doc.HasMember("cluster_id") && doc["cluster_id"].IsInt() &&
+ !doc["cluster_id"].IsNull()) {
+ const rapidjson::Value &obj = doc["cluster_id"];
+ } else {
+ LOG_ERROR("cluster_id of groupid:%s is not found or not a integer"
+ << groupid.c_str());
+ return SdkCode::kErrorParseJson;
+ }
+
+ if (doc.HasMember("address") && !doc["address"].IsNull()) // v2版本
+ {
+ const rapidjson::Value &hostlist = doc["address"];
+ for (auto &info : hostlist.GetArray()) {
+ std::string id, ip;
+ int32_t port;
+ if (info.HasMember("host") && !info["host"].IsNull())
+ ip = info["host"].GetString();
+ else {
+ LOG_ERROR("this host info is null");
+ continue;
+ }
+ if (info.HasMember("port") && !info["port"].IsNull()) {
+ if (info["port"].IsString())
+ port = std::stoi(info["port"].GetString());
+ if (info["port"].IsInt())
+ port = info["port"].GetInt();
+ }
+
+ else {
+ LOG_ERROR("this port info is null or negative");
+ continue;
+ }
+ if (info.HasMember("id") && !info["id"].IsNull()) {
+ if (info["id"].IsString())
+ id = info["id"].GetString();
+ if (info["id"].IsInt())
+ id = std::to_string(info["id"].GetInt());
+ } else {
+ LOG_ERROR("there is no id info of groupid");
+ continue;
+ }
+ proxy_info_vec.emplace_back(id, ip, port);
+ }
+ } else {
+ LOG_ERROR("there is no any host info of groupid:%s" << groupid.c_str());
+ return SdkCode::kErrorParseJson;
+ }
+ return SdkCode::kSuccess;
+}
+
+int32_t ProxyManager::GetBusByBid(const std::string &groupid,
+ BusInfoVec &proxy_info_vec) {
+ unique_read_lock<read_write_mutex> rdlck(groupid_2_proxy_map_rwmutex_);
+ auto it = groupid_2_proxy_map_.find(groupid);
+ if (it == groupid_2_proxy_map_.end()) {
+ LOG_ERROR("GetProxyByGroupid failed . Groupid " << groupid);
+ return SdkCode::kFailGetBusConf;
+ }
+ proxy_info_vec = it->second;
+ return SdkCode::kSuccess;
+}
+
+int32_t ProxyManager::CheckBidConf(const std::string &groupid, bool is_inited)
{
+ {
+ unique_read_lock<read_write_mutex> rdlck(groupid_2_cluster_rwmutex_);
+ auto it = groupid_2_cluster_map_.find(groupid);
+ if (it != groupid_2_cluster_map_.end()) {
+ return SdkCode::kSuccess;
+ }
+ }
+
+ {
+ unique_write_lock<read_write_mutex> wtlck(groupid_2_cluster_rwmutex_);
+ groupid_2_cluster_map_.emplace(groupid, -1);
+ }
+
+ LOG_INFO("CheckProxyConf groupid:" << groupid << ",isInited :" << is_inited);
+ if (is_inited) {
+ std::unique_lock<std::mutex> con_lck(cond_mutex_);
+ update_flag_ = true;
+ con_lck.unlock();
+ cond_.notify_one();
+ } else {
+ DoUpdate();
+ }
+ return SdkCode::kSuccess;
+}
+
+bool ProxyManager::IsBusExist(const std::string &groupid) {
+ unique_read_lock<read_write_mutex> rdlck(groupid_2_proxy_map_rwmutex_);
+ auto it = groupid_2_proxy_map_.find(groupid);
+ if (it == groupid_2_proxy_map_.end()) {
+ return false;
+ }
+ return true;
+}
+} // namespace inlong
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
new file mode 100644
index 0000000000..1c76f98457
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef INLONG_SDK_PROXY_MANAGER_H
+#define INLONG_SDK_PROXY_MANAGER_H
+
+#include "../config//proxy_info.h"
+#include "../utils/read_write_mutex.h"
+#include <thread>
+#include <unordered_map>
+#include <vector>
+
+namespace inlong {
+class ProxyManager {
+private:
+ static ProxyManager *instance_;
+ int32_t timeout_;
+ read_write_mutex groupid_2_cluster_rwmutex_;
+ read_write_mutex groupid_2_proxy_map_rwmutex_;
+
+ std::unordered_map<std::string, int32_t> groupid_2_cluster_map_cluster_map_;
+ std::unordered_map<std::string, ProxyInfoVec> groupid_2_proxy_map_;
+ bool update_flag_;
+ std::mutex cond_mutex_;
+ std::mutex update_mutex_;
+
+ std::condition_variable cond_;
+ bool exit_flag_;
+ std::thread update_conf_thread_;
+ volatile bool inited_ = false;
+
+ int32_t ParseAndGet(const std::string &groupid, const std::string &meta_data,
+ ProxyInfoVec &bus_info_vec);
+
+public:
+ ProxyManager(){};
+ ~ProxyManager();
+ static ProxyManager *GetInstance() { return instance_; }
+ int32_t CheckBidConf(const std::string &groupid, bool is_inited);
+ void Update();
+ void DoUpdate();
+ void Init();
+ int32_t GetBusByBid(const std::string &groupid, ProxyInfoVec
&proxy_info_vec);
+ bool IsBusExist(const std::string &groupid);
+};
+} // namespace inlong
+
+#endif // INLONG_SDK_PROXY_MANAGER_H
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc
index c5c484a2ee..a5cbda53d1 100644
---
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc
+++
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc
@@ -19,7 +19,7 @@
#include "send_manager.h"
#include "../utils/utils.h"
-#include "bus_conf_manager.h"
+#include "proxy_manager.h"
namespace inlong {
SendManager::SendManager() : send_group_idx_(0) {
for (int32_t i = 0; i < SdkConfig::getInstance()->group_ids_.size(); i++) {
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/uitls/capi_constant.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
similarity index 100%
rename from
inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/uitls/capi_constant.h
rename to
inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/uitls/logger.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/logger.h
similarity index 100%
rename from inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/uitls/logger.h
rename to inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/logger.h