This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b8cb51f6a [INLONG-5251][SDK] Add proxy client config and proxylist
manager for DataProxy C++ SDK (#5252)
b8cb51f6a is described below
commit b8cb51f6a2a51c8247c015c19924e7e9441b5fb3
Author: xueyingzhang <[email protected]>
AuthorDate: Fri Jul 29 22:51:20 2022 +0800
[INLONG-5251][SDK] Add proxy client config and proxylist manager for
DataProxy C++ SDK (#5252)
---
.../dataproxy-sdk-cpp/src/base/client_config.cc | 701 +++++++++++++++
.../dataproxy-sdk-cpp/src/base/client_config.h | 113 +++
.../dataproxy-sdk-cpp/src/base/proxylist_config.cc | 990 +++++++++++++++++++++
.../dataproxy-sdk-cpp/src/base/proxylist_config.h | 201 +++++
4 files changed, 2005 insertions(+)
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/client_config.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/client_config.cc
new file mode 100644
index 000000000..cbebd1372
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/client_config.cc
@@ -0,0 +1,701 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "client_config.h"
+
+#include <rapidjson/document.h>
+
+#include "sdk_constant.h"
+#include "logger.h"
+#include "utils.h"
+
+namespace dataproxy_sdk
+{
+ bool ClientConfig::parseConfig()
+ {
+ std::string file_content;
+ if (!Utils::readFile(config_path_, file_content))
+ {
+ return false;
+ }
+
+ rapidjson::Document root;
+
+ if (root.Parse(file_content.c_str()).HasParseError())
+ {
+ LOG_ERROR("failed to parse user config file: %s",
config_path_.c_str());
+ return false;
+ }
+
+ if (!root.HasMember("init-param"))
+ {
+ LOG_ERROR("param is not an object");
+ return false;
+ }
+ const rapidjson::Value &doc = root["init-param"];
+
+ // thread_num
+ if (doc.HasMember("thread_num") && doc["thread_num"].IsInt() &&
doc["thread_num"].GetInt() > 0)
+ {
+ const rapidjson::Value &obj = doc["thread_num"];
+
+ thread_nums_ = obj.GetInt();
+ LOG_WARN("thread_num in user config is: %d", thread_nums_);
+ }
+ else
+ {
+ thread_nums_ = constants::kThreadNums;
+ LOG_WARN("thread_num in user config is not expect, then use
default: %d", thread_nums_);
+ }
+ // shared_buf_nums
+ if (doc.HasMember("shared_buf_num") && doc["shared_buf_num"].IsInt()
&& doc["shared_buf_num"].GetInt() > 0)
+ {
+ const rapidjson::Value &obj = doc["shared_buf_num"];
+
+ shared_buf_nums_ = obj.GetInt();
+ LOG_WARN("shared_buf_num in user config is: %d",
shared_buf_nums_);
+ }
+ else
+ {
+ shared_buf_nums_ = constants::kSharedBufferNums;
+ LOG_WARN("shared_buf_num in user config is not expect, then use
default: %d", shared_buf_nums_);
+ }
+
+ // inlong_group_ids, split by comma
+ if (doc.HasMember("inlong_group_ids") &&
doc["inlong_group_ids"].IsString())
+ {
+ const rapidjson::Value &obj = doc["inlong_group_ids"];
+ std::string groupIds_str = obj.GetString();
+ int32_t size = Utils::splitOperate(groupIds_str,
inlong_group_ids_, ",");
+ LOG_WARN("inlong_group_ids in user config is: <%s>",
Utils::getVectorStr(inlong_group_ids_).c_str());
+ }
+ else
+ {
+ LOG_WARN("inlong_group_ids in user config is empty");
+ }
+
+ // enable_groupId_isolation
+ if (doc.HasMember("enable_groupId_isolation") &&
doc["enable_groupId_isolation"].IsBool())
+ {
+ const rapidjson::Value &obj = doc["enable_groupId_isolation"];
+
+ enable_groupId_isolation_ = obj.GetBool();
+ LOG_WARN("enable_groupId_isolation in user config is: %s",
enable_groupId_isolation_ ? "true" : "false");
+ }
+ else
+ {
+ enable_groupId_isolation_ = constants::kEnableBidIsolation;
+ LOG_WARN("enable_groupId_isolation in user config is not expect,
then use default: %s", enable_groupId_isolation_ ? "true" : "false");
+ }
+
+ // if enable_groupId_isolation_ is true, groupIds cann't be empty
+ if (enable_groupId_isolation_ && inlong_group_ids_.empty())
+ {
+ LOG_ERROR("inlong_group_ids is empty, check config!");
+ return false;
+ }
+
+ // auth settings
+ if (doc.HasMember("need_auth") && doc["need_auth"].IsBool() &&
doc["need_auth"].GetBool()){
+ if (!doc.HasMember("auth_id") || !doc.HasMember("auth_key") ||
!doc["auth_id"].IsString() || !doc["auth_key"].IsString()
||doc["auth_id"].IsNull() || doc["auth_key"].IsNull())
+ {
+ LOG_ERROR("need_auth, but auth_id or auth_key is empty or not
string type, check config!");
+ return false;
+ }
+ need_auth_ = true;
+ auth_id_ = doc["auth_id"].GetString();
+ auth_key_ = doc["auth_key"].GetString();
+ LOG_WARN("need_auth, auth_id:%s, auth_key:%s", auth_id_.c_str(),
auth_key_.c_str());
+ }
+ else
+ {
+ need_auth_ = constants::kNeedAuth;
+ LOG_WARN("need_auth is not expect, then use default:%s",
need_auth_ ? "true" : "false");
+ }
+
+ // buffer_num_per_groupId
+ if (doc.HasMember("buffer_num_per_groupId") &&
doc["buffer_num_per_groupId"].IsInt() && doc["buffer_num_per_groupId"].GetInt()
> 0)
+ {
+ const rapidjson::Value &obj = doc["buffer_num_per_groupId"];
+
+ buffer_num_per_groupId_ = obj.GetInt();
+ LOG_WARN("buffer_num_per_groupId in user config is: %d",
buffer_num_per_groupId_);
+ }
+ else
+ {
+ buffer_num_per_groupId_ = constants::kBufferNumPerBid;
+ LOG_WARN("buffer_num_per_groupId in user config is not expect,
then use default: %d", buffer_num_per_groupId_);
+ }
+
+ // ser_ip
+ if (doc.HasMember("ser_ip") && doc["ser_ip"].IsString())
+ {
+ const rapidjson::Value &obj = doc["ser_ip"];
+
+ ser_ip_ = obj.GetString();
+ LOG_WARN("ser_ip in user config is: %s", ser_ip_.c_str());
+ }
+ else
+ {
+ ser_ip_ = constants::kSerIP;
+ LOG_WARN("ser_ip in user config is not expect, then use default:
%s", ser_ip_.c_str());
+ }
+ // enable_pack
+ if (doc.HasMember("enable_pack") && doc["enable_pack"].IsBool())
+ {
+ const rapidjson::Value &obj = doc["enable_pack"];
+
+ enable_pack_ = obj.GetBool();
+ LOG_WARN("enable_pack in user config is: %s", enable_pack_ ?
"true" : "false");
+ }
+ else
+ {
+ enable_pack_ = constants::kEnablePack;
+ LOG_WARN("enable_pack in user config is not expect, then use
default: %s", enable_pack_ ? "true" : "false");
+ }
+ // pack_size
+ if (doc.HasMember("pack_size") && doc["pack_size"].IsInt() &&
doc["pack_size"].GetInt() > 0)
+ {
+ const rapidjson::Value &obj = doc["pack_size"];
+
+ pack_size_ = obj.GetInt();
+ LOG_WARN("pack_size in user config is: %d", pack_size_);
+ }
+ else
+ {
+ pack_size_ = constants::kPackSize;
+ LOG_WARN("pack_size in user config is not expect, then use
default: %d ms", pack_size_);
+ }
+ // pack_timeout
+ if (doc.HasMember("pack_timeout") && doc["pack_timeout"].IsInt() &&
doc["pack_timeout"].GetInt() > 0)
+ {
+ const rapidjson::Value &obj = doc["pack_timeout"];
+
+ pack_timeout_ = obj.GetInt();
+ LOG_WARN("pack_timeout in user config is: %d ms", pack_timeout_);
+ }
+ else
+ {
+ pack_timeout_ = constants::kPackTimeout;
+ LOG_WARN("pack_timeout in user config is not expect, then use
default: %d", pack_timeout_);
+ }
+ // ext_pack_size
+ if (doc.HasMember("ext_pack_size") && doc["ext_pack_size"].IsInt() &&
doc["ext_pack_size"].GetInt() > 0)
+ {
+ const rapidjson::Value &obj = doc["ext_pack_size"];
+
+ ext_pack_size_ = obj.GetInt();
+ LOG_WARN("ext_pack_size in user config is: %d", ext_pack_size_);
+ }
+ else
+ {
+ ext_pack_size_ = constants::kExtPackSize;
+ LOG_WARN("ext_pack_size in user config is not expect, then use
default: %d", ext_pack_size_);
+ }
+ // enable_zip
+ if (doc.HasMember("enable_zip") && doc["enable_zip"].IsBool())
+ {
+ const rapidjson::Value &obj = doc["enable_zip"];
+
+ enable_zip_ = obj.GetBool();
+ LOG_WARN("enable_zip in user config is: %s", enable_zip_ ? "true"
: "false");
+ }
+ else
+ {
+ enable_zip_ = constants::kEnablePack;
+ LOG_WARN("enable_zip in user config is not expect, then use
default: %s", enable_zip_ ? "true" : "false");
+ }
+ // min_zip_len
+ if (doc.HasMember("min_zip_len") && doc["min_zip_len"].IsInt() &&
doc["min_zip_len"].GetInt() > 0)
+ {
+ const rapidjson::Value &obj = doc["min_zip_len"];
+
+ min_zip_len_ = obj.GetInt();
+ LOG_WARN("min_zip_len in user config is: %d", min_zip_len_);
+ }
+ else
+ {
+ min_zip_len_ = constants::kMinZipLen;
+ LOG_WARN("min_zip_len in user config is not expect, then use
default: %d", min_zip_len_);
+ }
+ // enable_retry
+ if (doc.HasMember("enable_retry") && doc["enable_retry"].IsBool())
+ {
+ const rapidjson::Value &obj = doc["enable_retry"];
+
+ enable_retry_ = obj.GetBool();
+ LOG_WARN("enable_retry in user config is: %s", enable_retry_ ?
"true" : "false");
+ }
+ else
+ {
+ enable_retry_ = constants::kEnableRetry;
+ LOG_WARN("enable_retry in user config is not expect, then use
default: %s", enable_retry_ ? "true" : "false");
+ }
+ // retry_interval
+ if (doc.HasMember("retry_ms") && doc["retry_ms"].IsInt() &&
doc["retry_ms"].GetInt() > 0)
+ {
+ const rapidjson::Value &obj = doc["retry_ms"];
+
+ retry_interval_ = obj.GetInt();
+ LOG_WARN("retry_interval in user config is: %d ms",
retry_interval_);
+ }
+ else
+ {
+ retry_interval_ = constants::kRetryInterval;
+ LOG_WARN("retry_interval in user config is not expect, then use
default: %d ms", retry_interval_);
+ }
+ // retry_num
+ if (doc.HasMember("retry_num") && doc["retry_num"].IsInt() &&
doc["retry_num"].GetInt() > 0)
+ {
+ const rapidjson::Value &obj = doc["retry_num"];
+
+ retry_num_ = obj.GetInt();
+ LOG_WARN("retry_num in user config is: %d times", retry_num_);
+ }
+ else
+ {
+ retry_num_ = constants::kRetryNum;
+ LOG_WARN("retry_num in user config is not expect, then use
default: %d times", retry_num_);
+ }
+ // log_num
+ if (doc.HasMember("log_num") && doc["log_num"].IsInt() &&
doc["log_num"].GetInt() > 0)
+ {
+ const rapidjson::Value &obj = doc["log_num"];
+
+ log_num_ = obj.GetInt();
+ LOG_WARN("log_num in user config is: %d", log_num_);
+ }
+ else
+ {
+ log_num_ = constants::kLogNum;
+ LOG_WARN("log_num in user config is not expect, then use default:
%d", log_num_);
+ }
+ // log_size
+ if (doc.HasMember("log_size") && doc["log_size"].IsInt() &&
doc["log_size"].GetInt() > 0)
+ {
+ const rapidjson::Value &obj = doc["log_size"];
+
+ log_size_ = obj.GetInt();
+ LOG_WARN("log_size in user config is: %dM", log_size_);
+ }
+ else
+ {
+ log_size_ = constants::kLogSize;
+ LOG_WARN("log_size in user config is not expect, then use default:
%dM", log_size_);
+ }
+ // log_level
+ if (doc.HasMember("log_level") && doc["log_level"].IsInt() &&
doc["log_level"].GetInt() >= 0 && doc["log_level"].GetInt() <= 4)
+ {
+ const rapidjson::Value &obj = doc["log_level"];
+
+ log_level_ = obj.GetInt();
+ LOG_WARN("log_level in user config is: %d", log_level_);
+ }
+ else
+ {
+ log_level_ = constants::kLogLevel;
+ LOG_WARN("log_level in user config is not expect, then use
default: %d", log_level_);
+ }
+ // log_file_type
+ if (doc.HasMember("log_file_type") && doc["log_file_type"].IsInt() &&
doc["log_file_type"].GetInt() > 0)
+ {
+ const rapidjson::Value &obj = doc["log_file_type"];
+
+ log_file_type_ = obj.GetInt();
+ LOG_WARN("log_file_type in user config is: %d", log_file_type_);
+ }
+ else
+ {
+ log_file_type_ = constants::kLogFileType;
+ LOG_WARN("log_file_type in user config is not expect, then use
default: %d", log_file_type_);
+ }
+ // log_path
+ if (doc.HasMember("log_path") && doc["log_path"].IsString())
+ {
+ const rapidjson::Value &obj = doc["log_path"];
+
+ log_path_ = obj.GetString();
+ LOG_WARN("log_path in user config is: %s", log_path_.c_str());
+ }
+ else
+ {
+ log_path_ = constants::kLogPath;
+ LOG_WARN("log_path in user config is not expect, then use default:
%s", log_path_.c_str());
+ }
+ // log_enable_limit
+ if (doc.HasMember("log_enable_limit") &&
doc["log_enable_limit"].IsBool())
+ {
+ const rapidjson::Value &obj = doc["log_enable_limit"];
+
+ log_enable_limit_ = obj.GetBool();
+ LOG_WARN("log_enable_limit in user config is: %s",
log_enable_limit_ ? "true" : "false");
+ }
+ else
+ {
+ log_enable_limit_ = constants::kLogEnableLimit;
+ LOG_WARN("log_enable_limit in user config is not expect, then use
default: %s", log_enable_limit_ ? "true" : "false");
+ }
+ // proxy_URL
+ if (doc.HasMember("proxy_cfg_preurl") &&
doc["proxy_cfg_preurl"].IsString())
+ {
+ const rapidjson::Value &obj = doc["proxy_cfg_preurl"];
+
+ proxy_URL_ = obj.GetString();
+ LOG_WARN("proxy_cfg_preurl in user config is: %s",
proxy_URL_.c_str());
+ }
+ else if (doc.HasMember("bus_cfg_preurl") &&
doc["bus_cfg_preurl"].IsString()) // compatible with internal usage
+ {
+ const rapidjson::Value &obj = doc["bus_cfg_preurl"];
+
+ proxy_URL_ = obj.GetString();
+ LOG_WARN("proxy_cfg_preurl in user config is: %s",
proxy_URL_.c_str());
+ }
+
+ else
+ {
+ proxy_URL_ = constants::kProxyURL;
+ LOG_WARN("proxy_cfg_url in user config is not expect, then use
default: %s", proxy_URL_.c_str());
+ }
+ // proxy_cluster_URL_, only internal usage
+ if (doc.HasMember("bus_cfg_url") && doc["bus_cfg_url"].IsString())
+ {
+ const rapidjson::Value &obj = doc["bus_cfg_url"];
+
+ proxy_cluster_URL_ = obj.GetString();
+ LOG_WARN("proxy_cluster_URL(proxy_cfg_url) in user config is:
%s", proxy_cluster_URL_.c_str());
+ }
+ else
+ {
+ proxy_cluster_URL_ = constants::kBusClusterURL;
+ LOG_WARN("proxy_cluster_URL(proxy_cfg_url) in user config is not
expect, then use default: %s", proxy_cluster_URL_.c_str());
+ }
+ // enable_proxy_URL_from_cluster
+ if (doc.HasMember("enable_proxy_cfg_url") &&
doc["enable_proxy_cfg_url"].IsBool())
+ {
+ const rapidjson::Value &obj = doc["enable_proxy_cfg_url"];
+
+ enable_proxy_URL_from_cluster_ = obj.GetBool();
+ LOG_WARN("enable_proxy_URL_from_cluster in user config is: %s",
enable_proxy_URL_from_cluster_ ? "true" : "false");
+ }
+ else
+ {
+ enable_proxy_URL_from_cluster_ =
constants::kEnableProxyURLFromCluster;
+ LOG_WARN("enable_proxy_URL_from_cluster in user config is not
expect, then use default: %s", enable_proxy_URL_from_cluster_ ? "true" :
"false");
+ }
+ // proxy_update_interval
+ if (doc.HasMember("proxy_update_interval") &&
doc["proxy_update_interval"].IsInt() && doc["proxy_update_interval"].GetInt() >
0)
+ {
+ const rapidjson::Value &obj = doc["proxy_update_interval"];
+
+ proxy_update_interval_ = obj.GetInt();
+ LOG_WARN("proxy_update_interval in user config is: %d minutes",
proxy_update_interval_);
+ }
+ else if (doc.HasMember("bus_update_interval") &&
doc["bus_update_interval"].IsInt() && doc["bus_update_interval"].GetInt() >
0)//internal usage
+ {
+ const rapidjson::Value &obj = doc["bus_update_interval"];
+
+ proxy_update_interval_ = obj.GetInt();
+ LOG_WARN("proxy_update_interval in user config is: %d minutes",
proxy_update_interval_);
+ }
+ else
+ {
+ proxy_update_interval_ = constants::kProxyUpdateInterval;
+ LOG_WARN("proxy_update_interval in user config is not expect, then
use default: %d minutes", proxy_update_interval_);
+ }
+ // proxy_URL_timeout
+ if (doc.HasMember("proxy_url_timeout") &&
doc["proxy_url_timeout"].IsInt() && doc["proxy_url_timeout"].GetInt() > 0)
+ {
+ const rapidjson::Value &obj = doc["proxy_url_timeout"];
+
+ proxy_URL_timeout_ = obj.GetInt();
+ LOG_WARN("proxy_url_timeout in user config is: %ds",
proxy_URL_timeout_);
+ }
+ else if (doc.HasMember("bus_url_timeout") &&
doc["bus_url_timeout"].IsInt() && doc["bus_url_timeout"].GetInt() > 0)
//internal usage
+ {
+ const rapidjson::Value &obj = doc["bus_url_timeout"];
+
+ proxy_URL_timeout_ = obj.GetInt();
+ LOG_WARN("proxy_url_timeout in user config is: %ds",
proxy_URL_timeout_);
+ }
+ else
+ {
+ proxy_URL_timeout_ = constants::kProxyURLTimeout;
+ LOG_WARN("proxy_url_timeout in user config is not expect, then use
default: %ds", proxy_URL_timeout_);
+ }
+ // max_active_proxy_num
+ if (doc.HasMember("max_active_proxy") &&
doc["max_active_proxy"].IsInt() && doc["max_active_proxy"].GetInt() > 0)
+ {
+ const rapidjson::Value &obj = doc["max_active_proxy"];
+
+ max_active_proxy_num_ = obj.GetInt();
+ LOG_WARN("max_active_proxy in user config is: %d",
max_active_proxy_num_);
+ }
+ else if (doc.HasMember("max_active_bus") &&
doc["max_active_bus"].IsInt() && doc["max_active_bus"].GetInt() > 0) //internal
usage
+ {
+ const rapidjson::Value &obj = doc["max_active_bus"];
+
+ max_active_proxy_num_ = obj.GetInt();
+ LOG_WARN("max_active_proxy in user config is: %d",
max_active_proxy_num_);
+ }
+ else
+ {
+ max_active_proxy_num_ = constants::kMaxActiveProxyNum;
+ LOG_WARN("max_active_proxy in user config is not expect, then use
default: %d", max_active_proxy_num_);
+ }
+ // max_buf_pool_
+ if (doc.HasMember("max_buf_pool") && doc["max_buf_pool"].IsUint() &&
doc["max_buf_pool"].GetUint() > 0)
+ {
+ const rapidjson::Value &obj = doc["max_buf_pool"];
+
+ max_buf_pool_ = obj.GetUint();
+ LOG_WARN("max_buf_pool in user config is: %lu", max_buf_pool_);
+ }
+ else
+ {
+ max_buf_pool_ = constants::kMaxBufPool;
+ LOG_WARN("max_buf_pool in user config is not expect, then use
default: %lu", max_buf_pool_);
+ }
+ // msg_type
+ if (doc.HasMember("msg_type") && doc["msg_type"].IsInt() &&
doc["msg_type"].GetInt() > 0 && doc["msg_type"].GetInt() < 9)
+ {
+ const rapidjson::Value &obj = doc["msg_type"];
+
+ msg_type_ = obj.GetInt();
+ LOG_WARN("msg_type in user config is: %d", msg_type_);
+ }
+ else
+ {
+ msg_type_ = constants::kMsgType;
+ LOG_WARN("max_active_proxy in user config is not expect, then use
default: %d", msg_type_);
+ }
+ // enable_TCP_nagle
+ if (doc.HasMember("enable_tcp_nagle") &&
doc["enable_tcp_nagle"].IsBool())
+ {
+ const rapidjson::Value &obj = doc["enable_tcp_nagle"];
+
+ enable_TCP_nagle_ = obj.GetBool();
+ LOG_WARN("enable_tcp_nagle in user config is: %s",
enable_TCP_nagle_ ? "true" : "false");
+ }
+ else
+ {
+ enable_TCP_nagle_ = constants::kEnableTCPNagle;
+ LOG_WARN("enable_tcp_nagle in user config is not expect, then use
default: %s", enable_TCP_nagle_ ? "true" : "false");
+ }
+ // enable_heart_beat
+ if (doc.HasMember("enable_hb") && doc["enable_hb"].IsBool())
+ {
+ const rapidjson::Value &obj = doc["enable_hb"];
+
+ enable_heart_beat_ = obj.GetBool();
+ LOG_WARN("enable_hb in user config is: %s", enable_heart_beat_ ?
"true" : "false");
+ }
+ else
+ {
+ enable_heart_beat_ = constants::kEnableHeartBeat;
+ LOG_WARN("enable_hb in user config is not expect, then use
default: %s", enable_heart_beat_ ? "true" : "false");
+ }
+
+ // heart_beat_interval
+ if (doc.HasMember("hb_interval") && doc["hb_interval"].IsInt() &&
doc["hb_interval"].GetInt() > 0)
+ {
+ const rapidjson::Value &obj = doc["hb_interval"];
+
+ heart_beat_interval_ = obj.GetInt();
+ LOG_WARN("heart_beat_interval in user config is: %ds",
heart_beat_interval_);
+ }
+ else
+ {
+ heart_beat_interval_ = constants::kHeartBeatInterval;
+ LOG_WARN("heart_beat_interval in user config is not expect, then
use default: %ds", heart_beat_interval_);
+ }
+ // enable_setaffinity
+ if (doc.HasMember("enable_setaffinity") &&
doc["enable_setaffinity"].IsBool())
+ {
+ const rapidjson::Value &obj = doc["enable_setaffinity"];
+
+ enable_setaffinity_ = obj.GetBool();
+ LOG_WARN("enable_setaffinity in user config is: %s",
enable_setaffinity_ ? "true" : "false");
+ }
+ else
+ {
+ enable_setaffinity_ = constants::kEnableSetAffinity;
+ LOG_WARN("enable_setaffinity in user config is not expect, then
use default: %s", enable_setaffinity_ ? "true" : "false");
+ }
+ // mask_cpu_affinity
+ if (doc.HasMember("mask_cpuaffinity") &&
doc["mask_cpuaffinity"].IsInt() && doc["mask_cpuaffinity"].GetInt() > 0)
+ {
+ const rapidjson::Value &obj = doc["mask_cpuaffinity"];
+
+ mask_cpu_affinity_ = obj.GetInt();
+ LOG_WARN("mask_cpuaffinity in user config is: %d",
mask_cpu_affinity_);
+ }
+ else
+ {
+ mask_cpu_affinity_ = constants::kMaskCPUAffinity;
+ LOG_WARN("mask_cpuaffinity in user config is not expect, then use
default: %d", mask_cpu_affinity_);
+ }
+ // is_from_DC
+ if (doc.HasMember("enable_from_dc") && doc["enable_from_dc"].IsBool())
+ {
+ const rapidjson::Value &obj = doc["enable_from_dc"];
+
+ is_from_DC_ = obj.GetBool();
+ LOG_WARN("enable_from_dc in user config is: %s", is_from_DC_ ?
"true" : "false");
+ }
+ else
+ {
+ is_from_DC_ = constants::kIsFromDC;
+ LOG_WARN("enable_from_dc in user config is not expect, then use
default: %s", is_from_DC_ ? "true" : "false");
+ }
+ // extend_field
+ if (doc.HasMember("extend_field") && doc["extend_field"].IsInt() &&
doc["extend_field"].GetInt() > 0)
+ {
+ const rapidjson::Value &obj = doc["extend_field"];
+
+ extend_field_ = obj.GetInt();
+ LOG_WARN("extend_field in user config is: %d", extend_field_);
+ }
+ else
+ {
+ extend_field_ = constants::kExtendField;
+ LOG_WARN("extend_field in user config is not expect, then use
default: %d", extend_field_);
+ }
+
+ // net_tag
+ if (doc.HasMember("net_tag") && doc["net_tag"].IsString())
+ {
+ const rapidjson::Value &obj = doc["net_tag"];
+
+ net_tag_ = obj.GetString();
+ LOG_WARN("net_tag in user config is: %s", net_tag_.c_str());
+ }
+ else
+ {
+ net_tag_ = constants::kNetTag;
+ LOG_WARN("net_tag in user config is not expect, then use default:
%s", net_tag_.c_str());
+ }
+
+ // set bufNum
+ buf_size_ = ext_pack_size_ + 400;
+ buf_num_ = max_buf_pool_ / (buf_size_);
+ LOG_WARN("sendBuf num of a pool is %d", buf_num_);
+
+ return true;
+ }
+
+ void ClientConfig::defaultInit(){
+ user_config_err_=true;
+
+ thread_nums_=constants::kThreadNums;
+ shared_buf_nums_=constants::kSharedBufferNums;
+ enable_groupId_isolation_=constants::kEnableBidIsolation;
+ buffer_num_per_groupId_=constants::kBufferNumPerBid;
+ net_tag_=constants::kNetTag;
+
+ enable_pack_=constants::kEnablePack;
+ pack_size_=constants::kPackSize;
+ pack_timeout_=constants::kPackTimeout;
+ ext_pack_size_=constants::kExtPackSize;
+
+ enable_zip_=constants::kEnableZip;
+ min_zip_len_=constants::kMinZipLen;
+
+ enable_retry_=constants::kEnableRetry;
+ retry_interval_=constants::kRetryInterval;
+ retry_num_=constants::kRetryNum;
+
+ log_num_=constants::kLogNum;
+ log_size_=constants::kLogSize;
+ log_level_=constants::kLogLevel;
+ log_file_type_=constants::kLogFileType;
+ log_path_=constants::kLogPath;
+ log_enable_limit_=constants::kLogEnableLimit;
+
+ proxy_URL_=constants::kProxyURL;
+ enable_proxy_URL_from_cluster_=constants::kEnableProxyURLFromCluster;
+
+ proxy_cluster_URL_=constants::kBusClusterURL;
+ proxy_update_interval_=constants::kProxyUpdateInterval;
+ proxy_URL_timeout_=constants::kProxyURLTimeout;
+ max_active_proxy_num_=constants::kMaxActiveProxyNum;
+
+ ser_ip_=constants::kSerIP;
+ max_buf_pool_=constants::kMaxBufPool;
+ msg_type_=constants::kMsgType;
+ enable_TCP_nagle_=constants::kEnableTCPNagle;
+ enable_heart_beat_=constants::kEnableHeartBeat;
+ heart_beat_interval_=constants::kHeartBeatInterval;
+ enable_setaffinity_=constants::kEnableSetAffinity;
+ mask_cpu_affinity_=constants::kMaskCPUAffinity;
+ is_from_DC_=constants::kIsFromDC;
+ extend_field_=constants::kExtendField;
+
+ buf_size_ = ext_pack_size_ + 400;
+ buf_num_ = max_buf_pool_ / (buf_size_);
+ }
+
+ void ClientConfig::showClientConfig()
+ {
+ if(user_config_err_){
+ LOG_ERROR("dataproxy_sdk_cpp init user config err, then use
default config values");
+ }
+
+ LOG_WARN("thread_num: %d", thread_nums_);
+ LOG_WARN("shared_buf_num: %d", shared_buf_nums_);
+ LOG_WARN("inlong_group_ids: <%s>",
Utils::getVectorStr(inlong_group_ids_).c_str());
+ LOG_WARN("enable_groupId_isolation: %s", enable_groupId_isolation_ ?
"true" : "fasle");
+ LOG_WARN("buffer_num_per_groupId: %d", buffer_num_per_groupId_);
+ LOG_WARN("ser_ip: %s", ser_ip_.c_str());
+ LOG_WARN("enable_pack: %s", enable_pack_ ? "true" : "false");
+ LOG_WARN("pack_size: %d", pack_size_);
+ LOG_WARN("pack_timeout: %dms", pack_timeout_);
+ LOG_WARN("ext_pack_size: %d", ext_pack_size_);
+ LOG_WARN("enable_zip: %s", enable_zip_ ? "true" : "false");
+ LOG_WARN("min_zip_len: %d", min_zip_len_);
+ LOG_WARN("enable_retry: %s", enable_retry_ ? "true" : "false");
+ LOG_WARN("retry_interval: %dms", retry_interval_);
+ LOG_WARN("retry_num: %d times", retry_num_);
+ LOG_WARN("log_num: %d", log_num_);
+ LOG_WARN("log_size: %dM", log_size_);
+ LOG_WARN("log_level: %d", log_level_);
+ LOG_WARN("log_file_type: %d", log_file_type_);
+ LOG_WARN("log_path: %s", log_path_.c_str());
+ LOG_WARN("log_enable_limit: %s", log_enable_limit_ ? "true" : "false");
+ LOG_WARN("proxy_cfg_preurl: %s", proxy_URL_.c_str());
+ LOG_WARN("net_tag: %s", net_tag_.c_str());
+ LOG_WARN("proxy_cluster_URL(proxy_cfg_url): %s",
proxy_cluster_URL_.c_str());
+ LOG_WARN("enable_proxy_URL_from_cluster: %s",
enable_proxy_URL_from_cluster_ ? "true" : "false");
+ LOG_WARN("proxy_update_interval: %d minutes", proxy_update_interval_);
+ LOG_WARN("proxy_url_timeout: %ds", proxy_URL_timeout_);
+ LOG_WARN("max_active_proxy: %d", max_active_proxy_num_);
+ LOG_WARN("max_buf_pool: %lu", max_buf_pool_);
+ LOG_WARN("msg_type: %d", msg_type_);
+ LOG_WARN("enable_tcp_nagle: %s", enable_TCP_nagle_ ? "true" : "false");
+ LOG_WARN("enable_hb: %s", enable_heart_beat_ ? "true" : "false");
+ LOG_WARN("heart_beat_interval: %ds", heart_beat_interval_);
+ LOG_WARN("enable_setaffinity: %s", enable_setaffinity_ ? "true" :
"false");
+ LOG_WARN("mask_cpuaffinity: %d", mask_cpu_affinity_);
+ LOG_WARN("enable_from_dc: %s", is_from_DC_ ? "true" : "false");
+ LOG_WARN("extend_field: %d", extend_field_);
+ LOG_WARN("sendBuf num of a pool: %d", buf_num_);
+ LOG_WARN("need_auth: %s", need_auth_ ? "true" : "false");
+ LOG_WARN("auth_id: %s", auth_id_.c_str());
+ LOG_WARN("auth_key: %s", auth_key_.c_str());
+ }
+} // namespace dataproxy_sdk
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/client_config.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/client_config.h
new file mode 100644
index 000000000..2e1823932
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/client_config.h
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DATAPROXY_SDK_BASE_CLIENT_CONFIG_H_
+#define DATAPROXY_SDK_BASE_CLIENT_CONFIG_H_
+
+#include <stdint.h>
+#include <string>
+#include <vector>
+
+namespace dataproxy_sdk
+{
+ class ClientConfig
+ {
+ private:
+ std::string config_path_;
+ bool user_config_err_=false;
+ int32_t buf_num_; //sendbuf num of each bufpool,
max_buf_pool_/(ext_pack_size_+400)
+
+ public:
+ //new parameters
+ int32_t thread_nums_; //network thread nums
+ int32_t shared_buf_nums_; //bufpool nums in shared method // TODO: cancel
this, using buffer_num_per_groupId
+ std::vector<std::string> inlong_group_ids_; // groupId set //TODO: using
set instead of vector?
+ bool enable_groupId_isolation_; // different groupId data are
dispatched to different bufpool
+ int32_t buffer_num_per_groupId_; // bufpool num of each groupId
+ std::string net_tag_;
+
+ //pack parameters
+ bool enable_pack_;
+ uint32_t pack_size_; //byte
+ uint32_t pack_timeout_; //ms
+ uint32_t ext_pack_size_; //byte, max length of msg
+
+ //zip parameters
+ bool enable_zip_;
+ uint32_t min_zip_len_; //ms
+
+ //resend parameters
+ bool enable_retry_;
+ uint32_t retry_interval_; //ms, resend interval if not receiving ack
+ uint32_t retry_num_; //resend times
+
+ //log parameters
+ uint32_t log_num_;
+ uint32_t log_size_; // M
+ uint8_t log_level_; // trace(4)>debug(3)>info(2)>warn(1)>error(0)
+ uint8_t log_file_type_; // output type:2->file, 1->console
+ std::string log_path_;
+ bool log_enable_limit_;
+
+ // proxy parameters
+ std::string proxy_URL_;
+ bool enable_proxy_URL_from_cluster_;
+
+ std::string proxy_cluster_URL_;
+ uint32_t proxy_update_interval_; // minute
+ uint32_t proxy_URL_timeout_; // second, request proxyList timeout
+ uint32_t max_active_proxy_num_;
+
+ //other parameters
+ std::string ser_ip_; //local ip
+ uint32_t max_buf_pool_; //byte, size of single bufpool
+ uint32_t msg_type_;
+ bool enable_TCP_nagle_;
+ bool enable_heart_beat_;
+ uint32_t heart_beat_interval_; //second
+ bool enable_setaffinity_; // cpu setaffinity
+ uint32_t mask_cpu_affinity_; // cpu setaffinity mask
+ bool is_from_DC_; // sng_dc data
+ uint16_t extend_field_;
+
+ uint32_t buf_size_; // ext_pack_size+400;
+
+ // auth settings
+ bool need_auth_;
+ std::string auth_id_;
+ std::string auth_key_;
+
+ ClientConfig(const std::string config_path) : config_path_(config_path) {}
+ bool parseConfig(); // return false if parse failed
+ void defaultInit();
+ void showClientConfig();
+
+ inline bool enableCharBid() const { return (((extend_field_)&0x4) >> 2); }
// use char type groupId、streadmId
+ inline bool enableTraceIP() const { return (((extend_field_)&0x2) >> 1); }
+ // datat type msg: datlen|data
+ inline bool isNormalDataPackFormat() const { return ((5 == msg_type_) ||
((msg_type_ >= 7) && (!(extend_field_ & 0x1)))); }
+ // data&attr type msg: datlen|data|attr_len|attr
+ inline bool isAttrDataPackFormat() const { return ((6 == msg_type_) ||
((msg_type_ >= 7) && (extend_field_ & 0x1))); }
+
+ inline int32_t bufNum() const { return buf_num_; }
+ };
+
+} // namespace dataproxy_sdk
+
+#endif // DATAPROXY_SDK_BASE_CLIENT_CONFIG_H_
\ No newline at end of file
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/proxylist_config.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/proxylist_config.cc
new file mode 100644
index 000000000..2f46f2138
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/proxylist_config.cc
@@ -0,0 +1,990 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "proxylist_config.h"
+
+#include <algorithm>
+#include <chrono>
+#include <cstdlib>
+#include <functional>
+#include <iostream>
+#include <map>
+#include <rapidjson/document.h>
+#include <stdlib.h>
+
+#include "sdk_constant.h"
+#include "sdk_core.h"
+#include "executor_thread_pool.h"
+#include "ini_help.h"
+#include "logger.h"
+#include "pack_queue.h"
+#include "socket_connection.h"
+#include "tc_api.h"
+#include "utils.h"
+namespace dataproxy_sdk
+{
+ ClusterProxyList::ClusterProxyList()
+ : cluster_id_(-1), size_(0), load_(0), is_inter_visit_(false),
switch_value_(0), get_flag_(false), msg_num_(0), active_proxy_num_(-1),
backup_proxy_num_(-1)
+ {
+ }
+
+ ClusterProxyList::~ClusterProxyList() { clearAllConn(); }
+
+ bool ClusterProxyList::isNeedLoadBalance()
+ {
+ // #if 0
+ if (!g_config->enable_heart_beat_ || !g_config->heart_beat_interval_)
+ {
+ return false;
+ }
+ if (active_proxy_num_ < 0)
+ {
+ LOG_ERROR("active_proxy_num_:%d is negative", active_proxy_num_);
+ }
+ if (load_ <= 0 || backup_proxy_num_ == 0)
+ {
+ return false;
+ }
+ // #endif
+ return true;
+ }
+
+ void ClusterProxyList::addBusAddress(const ProxyInfoPtr &proxy_info)
+ {
+ proxylist_.push_back(proxy_info);
+ unused_proxylist_[proxy_info->getString()] = proxy_info;
+ }
+
+ void ClusterProxyList::initUnusedBus()
+ {
+ for (auto it : proxylist_)
+ {
+ unused_proxylist_.emplace(it->getString(), it);
+ }
+ }
+
+ void ClusterProxyList::setActiveAndBackupBusNum(const int32_t &default_set)
+ {
+ active_proxy_num_ = std::min(default_set, size_);
+ LOG_INFO("active proxy num in config is %d, real active proxy num is
%d, available proxy num is %d", default_set, active_proxy_num_, size_);
+ backup_proxy_num_ = std::max(0, std::min(size_ - active_proxy_num_ -
1, constants::kBackupBusNum));
+ if (isNeedLoadBalance())
+ {
+ LOG_INFO("cluster(id:%d) need balance, backup proxy num is %d",
cluster_id_, backup_proxy_num_);
+ }
+ else
+ {
+ backup_proxy_num_ = 0;
+ LOG_INFO("cluster(id:%d) don't do balance, cluster load:%d,
left_avaliable proxy num:%d", cluster_id_, load_, backup_proxy_num_);
+ }
+ }
+
+ int32_t ClusterProxyList::initConn()
+ {
+ unique_write_lock<read_write_mutex> rdlck(rwmutex_);
+ return createConnSet();
+ }
+
+ int32_t ClusterProxyList::createConnSet()
+ {
+ int err_count = 0;
+ for (int i = 0; i < active_proxy_num_; i++)
+ {
+ auto res = createRandomConnForActiveSet();
+ if (!res)
+ {
+ ++err_count;
+
+ }
+ LOG_INFO("add conn%s in active_proxy_set",
res->getRemoteInfo().c_str());
+ }
+
+ if (isNeedLoadBalance())
+ {
+ for (int i = 0; i < backup_proxy_num_; i++)
+ {
+ auto res = createRandomConnForBackupSet();
+ if (!res)
+ {
+ ++err_count;
+ }
+ LOG_INFO("add conn%s in backup_proxy_set",
res->getRemoteInfo().c_str());
+ }
+ }
+ return err_count;
+ }
+
+ int32_t ClusterProxyList::clearAllConn()
+ {
+ unique_write_lock<read_write_mutex> rdlck(rwmutex_);
+
+ for (auto &it : active_proxy_set_)
+ {
+ if (it.second)
+ {
+ it.second->connClose();
+ }
+ }
+ active_proxy_set_.clear();
+ for (auto &it : backup_proxy_set_)
+ {
+ if (it.second)
+ {
+ it.second->connClose();
+ }
+ }
+ backup_proxy_set_.clear();
+ }
+
+ ConnectionPtr ClusterProxyList::getSendConn()
+ {
+ unique_write_lock<read_write_mutex> wtlck(rwmutex_); // FIXME: is
readlock ok?
+
+ if (active_proxy_set_.empty())
+ {
+ LOG_ERROR("cluster(id:%d) active_proxy_set is empty", cluster_id_);
+ return nullptr;
+ }
+
+ ConnectionPtr res = nullptr;
+
+ ++msg_num_;
+ int32_t rr_idx = msg_num_ % active_proxy_set_.size();
+ auto rr_con = std::next(std::begin(active_proxy_set_), rr_idx)->second;
+
+ srand((uint32_t)Utils::getCurrentMsTime());
+ int32_t rand_idx = random() % active_proxy_set_.size();
+ auto rand_con = std::next(std::begin(active_proxy_set_),
rand_idx)->second;
+
+ if (rr_con->getWaitingSend() < rand_con->getWaitingSend()) // choosing
less waiting conn
+ {
+ res = rr_con;
+ }
+ else
+ {
+ res = rand_con;
+ }
+
+ if (res->isStop())
+ {
+ LOG_INFO("conn %s is closed, create new conn for send",
res->getRemoteInfo().c_str());
+ active_proxy_set_.erase(res->getRemoteInfo());
+
+ auto old_proxyinfo = res->getBusInfo();
+
+ if (unused_proxylist_.empty()) //if there is no available proxy
+ {
+ unused_proxylist_[old_proxyinfo->getString()] = old_proxyinfo;
+
+ res = createRandomConnForActiveSet();
+ }
+ else
+ {
+ res = createRandomConnForActiveSet();
+
+ unused_proxylist_[old_proxyinfo->getString()] = old_proxyinfo;
+ }
+ }
+
+ return res;
+ }
+
+ ConnectionPtr ClusterProxyList::createRandomConnForActiveSet() { return
createRandomConn(active_proxy_set_); }
+
+ ConnectionPtr ClusterProxyList::createRandomConnForBackupSet() { return
createRandomConn(backup_proxy_set_); }
+
+ ConnectionPtr
ClusterProxyList::createRandomConn(std::unordered_map<std::string,
ConnectionPtr> &conn_set)
+ {
+ if (!g_executors)
+ return nullptr;
+ if (unused_proxylist_.empty())
+ {
+ clearInvalidConns(); // proxy侧短时间内全部断连,先清除close的conn
+ }
+ if (unused_proxylist_.empty())
+ {
+ LOG_ERROR("all proxyes in proxylist have already established
connections, something is wrong!");
+ return nullptr;
+ }
+
+ srand((uint32_t)Utils::getCurrentMsTime());
+ int32_t rand_idx = random() % unused_proxylist_.size();
+ auto proxy_info = std::next(std::begin(unused_proxylist_),
rand_idx)->second;
+ // create conn
+ auto executor = g_executors->nextExecutor();
+ if(!executor){
+ return nullptr;
+ }
+ auto res = std::make_shared<Connection>(executor, proxy_info);
+ if (!res)
+ {
+ LOG_ERROR("failed to create new connection %s",
proxy_info->getString().c_str());
+ return nullptr;
+ }
+ LOG_DEBUG("create new connection: post %s connect request successfully
on network_thread(id:%d)", proxy_info->getString().c_str(),
+ executor->threadId());
+ conn_set[proxy_info->getString()] = res;
+ unused_proxylist_.erase(proxy_info->getString());
+ return res;
+ }
+
+ // if switch_value_, cluster_id_ or size_ changes, return true
+ bool ClusterProxyList::enableUpdate(const ClusterProxyListPtr &other)
+ {
+ unique_read_lock<read_write_mutex> rdlck(rwmutex_);
+
+ if (this->switch_value_ != other->switchValue())
+ {
+ LOG_INFO("proxy ip switch_value is diff, new:%d, old:%d",
other->switchValue(), this->switch_value_);
+ return true;
+ }
+ if (this->cluster_id_ != other->clusterId())
+ {
+ LOG_INFO("proxy ip cluster_id is diff, new:%d, old:%d",
other->clusterId(), this->cluster_id_);
+ return true;
+ }
+ if (this->size_ != other->size())
+ {
+ LOG_INFO("proxy ip size is diff, new:%d, old:%d", other->size(),
this->size_);
+ return true;
+ }
+ return false;
+ }
+
+ void ClusterProxyList::clearInvalidConns()
+ {
+
+ // remove invalid conn in active set
+ for (auto it = active_proxy_set_.begin(); it !=
active_proxy_set_.end();)
+ {
+ if (it->second->isStop())
+ {
+ LOG_INFO("active_proxy_set remove stop conn:%s",
it->second->getRemoteInfo().c_str());
+ unused_proxylist_.emplace(it->second->getRemoteInfo(),
it->second->getBusInfo()); //close and add this proxyinfo into unused
+ it = active_proxy_set_.erase(it);
+ continue;
+ }
+ ++it;
+ }
+ // remove invalid conn in backup set
+ for (auto it = backup_proxy_set_.begin(); it !=
backup_proxy_set_.end();)
+ {
+ if (it->second->isStop())
+ {
+ LOG_INFO("backup_proxy_set_ remove stop conn:%s",
it->second->getRemoteInfo().c_str());
+ unused_proxylist_.emplace(it->second->getRemoteInfo(),
it->second->getBusInfo()); //close and add this proxyinfo into unused
+ it = backup_proxy_set_.erase(it);
+ continue;
+ }
+ ++it;
+ }
+ }
+
+ void ClusterProxyList::keepConnsAlive()
+ {
+ unique_read_lock<read_write_mutex> rdlck(rwmutex_);
+ for (auto it : active_proxy_set_)
+ {
+ it.second->sendHB(isNeedLoadBalance());
+ }
+ for (auto it : backup_proxy_set_)
+ {
+ it.second->sendHB(isNeedLoadBalance());
+ }
+ }
+
+ void ClusterProxyList::balanceConns()
+ {
+ std::map<std::string, int32_t> active_load;
+ std::map<std::string, int32_t> backup_load;
+ unique_write_lock<read_write_mutex> wtlck(rwmutex_);
+
+ if (!isNeedLoadBalance() || backup_proxy_set_.empty())
+ return; //无需进行balance
+
+ for (auto &it : active_proxy_set_)
+ {
+ int32_t avg_load = it.second->getAvgLoad();
+ if (avg_load >= 0)
+ {
+ active_load[it.first] = avg_load;
+ LOG_DEBUG("active conn:%s, avgLoad:%d", it.first.c_str(),
avg_load);
+ }
+ }
+ for (auto &it : backup_proxy_set_)
+ {
+ int32_t avg_load = it.second->getAvgLoad();
+ if (avg_load >= 0)
+ {
+ backup_load[it.first] = avg_load;
+ LOG_DEBUG("backup conn:%s, avgLoad:%d", it.first.c_str(),
avg_load);
+ }
+ }
+
+ // avg_load desc sort
+ std::vector<PAIR> active_list(active_load.begin(), active_load.end());
+ std::sort(active_list.begin(), active_list.end(),
&Utils::downValueSort);
+
+ // avg_load asec sort
+ std::vector<PAIR> backup_list(backup_load.begin(), backup_load.end());
+ std::sort(backup_list.begin(), backup_list.end(), &Utils::upValueSort);
+
+ // int32_t small_size = active_list.size() < backup_list.size() ?
active_list : backup_list;
+ int32_t small_size = 1;
+ for (int32_t i = 0; i < small_size; i++)
+ {
+ if (active_list.empty() || backup_list.empty())
+ break;
+
+ if (active_list[i].second - backup_list[i].second >= load_) // do
switch
+ {
+ LOG_INFO("do balance, active conn:%s, load:%d <--> backup
conn:%s, load:%d", active_list[i].first.c_str(), active_list[i].second,
+ backup_list[i].first.c_str(), backup_list[i].second);
+
+ ConnectionPtr &tmp = active_proxy_set_[active_list[i].first];
+ active_proxy_set_.erase(active_list[i].first);
+ active_proxy_set_[backup_list[i].first] =
backup_proxy_set_[backup_list[i].first];
+ backup_proxy_set_.erase(backup_list[i].first);
+ backup_proxy_set_[tmp->getRemoteInfo()] = tmp;
+ }
+ }
+ }
+
+ void ClusterProxyList::updateBackupConns()
+ {
+ unique_write_lock<read_write_mutex> wtlck(rwmutex_);
+
+ if (!isNeedLoadBalance())
+ return;
+
+ // update backup conns
+ for (auto it : backup_proxy_set_)
+ {
+ if (it.second)
+ {
+ LOG_DEBUG("update backup_conns regularly, close old conns and
then create new conns");
+ it.second->connClose();
+ unused_proxylist_.emplace(it.second->getRemoteInfo(),
it.second->getBusInfo()); //lose and add this proxyinfo into unused
+ }
+ }
+ backup_proxy_set_.clear();
+ for (int i = 0; i < backup_proxy_num_; i++)
+ {
+ auto res = createRandomConnForBackupSet();
+ if (!res)
+ {
+ LOG_ERROR("create backup conn error, check it");
+ continue;
+ }
+ LOG_DEBUG("new create conn%s in backup_proxy_set",
res->getRemoteInfo().c_str());
+ }
+ }
+
+ GlobalCluster::GlobalCluster()
+ : groupid2cluster_rwmutex_(), update_flag_(false), cond_mutex_(),
cond_(), exit_flag_(false),
timer_worker_(std::make_shared<ExecutorThread>(100)),
clear_timer_(timer_worker_->createSteadyTimer()),
doBalance_timer_(timer_worker_->createSteadyTimer()),
updateBackup_timer_(timer_worker_->createSteadyTimer()),
printAckNum_timer_(timer_worker_->createSteadyTimer())
+ {
+ clear_timer_->expires_after(std::chrono::seconds(kClearTimerSecond));
+ clear_timer_->async_wait([this](const std::error_code &ec)
+ { clearInvalidConn(ec); });
+
+ // whether need do balance
+ if (g_config->enable_heart_beat_ && g_config->heart_beat_interval_ > 0)
+ {
+ keepAlive_timer_ = timer_worker_->createSteadyTimer();
+
keepAlive_timer_->expires_after(std::chrono::seconds(g_config->heart_beat_interval_));
+ keepAlive_timer_->async_wait([this](const std::error_code &ec)
+ { keepConnAlive(ec); });
+ }
+
+ doBalance_timer_->expires_after(std::chrono::minutes(kDoBalanceMin));
+ doBalance_timer_->async_wait([this](const std::error_code &ec)
+ { doBalance(ec); });
+
+
printAckNum_timer_->expires_after(std::chrono::minutes(kPrintAckNumMin));
+ printAckNum_timer_->async_wait([this](const std::error_code &ec)
+ { printAckNum(ec); });
+
+
updateBackup_timer_->expires_after(std::chrono::seconds(kUpdateBackupSecond +
3));
+ updateBackup_timer_->async_wait([this](const std::error_code &ec)
+ { updateBackup(ec); });
+
+ }
+
+ GlobalCluster::~GlobalCluster()
+ {
+ // FIXME:need other close work?
+ timer_worker_->close();
+
+ closeBuslistUpdate();
+ if (worker_.joinable())
+ {
+ worker_.join();
+ }
+ }
+
+ void GlobalCluster::closeBuslistUpdate()
+ {
+ exit_flag_ = true;
+
+ std::unique_lock<std::mutex> con_lck(cond_mutex_);
+ update_flag_ = true;
+ con_lck.unlock();
+ cond_.notify_one();
+ }
+
+ int32_t GlobalCluster::initBuslistAndCreateConns()
+ {
+ for (auto &inlong_group_id : g_config->inlong_group_ids_)
+ {
+ groupid2cluster_map_[inlong_group_id] = -1;
+ }
+
+ doUpdate();
+
+ return 0;
+ }
+
+ void GlobalCluster::startUpdateSubroutine() { worker_ =
std::thread(&GlobalCluster::updateSubroutine, this); }
+
+ //FIXME: improve, using getconn err num, if it exceeds a limit, return
errcode when user send next
+ ConnectionPtr GlobalCluster::getSendConn(const std::string
&inlong_group_id)
+ {
+ unique_read_lock<read_write_mutex> rdlck1(groupid2cluster_rwmutex_);
+ auto it1 = groupid2cluster_map_.find(inlong_group_id);
+ if (it1 == groupid2cluster_map_.end())
+ {
+ LOG_ERROR("there is no proxylist and connection for
inlong_group_id:%s , please check inlong_group_id/url or retry later",
inlong_group_id.c_str());
+ return nullptr;
+ }
+
+ unique_read_lock<read_write_mutex> rdlck2(cluster_set_rwmutex_);
+ auto it2 = cluster_set_.find(it1->second);
+ if (it2 == cluster_set_.end())
+ {
+ LOG_ERROR("there is no cluster(id:%d) for inlong_group_id:%s in
cluster_set, please check inlong_group_id/url or retry later ", it1->second,
inlong_group_id.c_str());
+ return nullptr;
+ }
+
+ return it2->second->getSendConn();
+ }
+
+ ConnectionPtr GlobalCluster::createActiveConn(const std::string
&inlong_group_id, int32_t pool_id)
+ {
+ if (1 == user_exit_flag.get())// if user is closing sdk
+ {
+ return nullptr;
+ }
+
+ unique_read_lock<read_write_mutex> rdlck1(groupid2cluster_rwmutex_);
+ auto it1 = groupid2cluster_map_.find(inlong_group_id); //
inlong_group_id->cluster_id
+ if (it1 == groupid2cluster_map_.end()) // all proxy conn are broken
+ {
+ LOG_ERROR("there is no proxylist or avaliable connection for
inlong_group_id:%s , please check inlong_group_id/url or retry later",
inlong_group_id.c_str());
+ return nullptr;
+ }
+ unique_read_lock<read_write_mutex> rdlck2(cluster_set_rwmutex_);
+ auto it2 = cluster_set_.find(it1->second); // cluster_id->proxylist
+ if (it2 == cluster_set_.end())
+ {
+ LOG_ERROR("there is no cluster(id:%d) for inlong_group_id:%s in
cluster_set, please check inlong_group_id/url or retry later", it1->second,
inlong_group_id.c_str());
+ return nullptr;
+ }
+
+ unique_write_lock<read_write_mutex> wtlck(it2->second->rwmutex_);
+
+ auto res = it2->second->createRandomConnForActiveSet();
+ return res;
+ }
+
+ void GlobalCluster::updateSubroutine()
+ {
+ LOG_INFO("proxylist update thread start");
+
+ while (true)
+ {
+ std::unique_lock<std::mutex> con_lck(cond_mutex_);
+ if (cond_.wait_for(con_lck,
std::chrono::minutes(g_config->proxy_update_interval_), [this]()
+ { return update_flag_; }))
+ {
+ if (exit_flag_)
+ break;
+ update_flag_ = false;
+ con_lck.unlock();
+ LOG_DEBUG("new inlong_group_id is added, update proxylist");
+ doUpdate(); // FIXME:improve, only update new groupid
+ }
+ else
+ {
+ LOG_INFO("proxy update interval is %d mins, update proxylist",
g_config->proxy_update_interval_);
+ doUpdate();
+ }
+ }
+ LOG_INFO("proxylist update thread exit");
+ }
+
+ // add inlong_group_id's proxylist into groupid2cluster_map_, if it is
new, trigger updating groupid2cluster_map and cluster_set
+ int32_t GlobalCluster::addBuslist(const std::string &inlong_group_id)
+ {
+ {
+ unique_read_lock<read_write_mutex> rdlck(groupid2cluster_rwmutex_);
+ auto it = groupid2cluster_map_.find(inlong_group_id);
+ if (it != groupid2cluster_map_.end())
+ {
+ return 0;
+ }
+ }
+ //not exist, add
+ {
+ unique_write_lock<read_write_mutex>
wtlck(groupid2cluster_rwmutex_);
+ groupid2cluster_map_.emplace(inlong_group_id, -1);
+ }
+
+ //set proxy update notification
+ std::unique_lock<std::mutex> con_lck(cond_mutex_);
+ update_flag_ = true;
+ con_lck.unlock();
+ cond_.notify_one();
+
+ LOG_DEBUG("add inlong_group_id:%s to global bid2cluster_map, and set
notify proxy updating", inlong_group_id.c_str());
+ return 0;
+ }
+
+ void GlobalCluster::doUpdate()
+ {
+ if (groupid2cluster_map_.empty())
+ {
+ LOG_INFO("empty inlong_group_id, no need to update proxylist");
+ return;
+ }
+
+ std::ofstream outfile;
+ outfile.open(".proxy_list.ini.tmp", std::ios::out | std::ios::trunc);
+ int32_t groupId_count = 0; //flush to file, record index and count
+
+ {
+ unique_write_lock<read_write_mutex>
wtlck(groupid2cluster_rwmutex_);
+
+ // for (auto& it : proxylist_map_)
+ for (auto &bid2cluster : groupid2cluster_map_)
+ {
+ //拼接tdm请求的url
+ std::string url;
+ if (g_config->enable_proxy_URL_from_cluster_)
+ url = g_config->proxy_cluster_URL_;
+ else
+ {
+ url = g_config->proxy_URL_ + "/" + bid2cluster.first;
+ }
+ std::string post_data = "ip=" + g_config->ser_ip_ +
"&version=" + constants::kTDBusCAPIVersion;
+ LOG_WARN("get inlong_group_id:%s proxy cfg url:%s,
post_data:%s", bid2cluster.first.c_str(), url.c_str(), post_data.c_str());
+
+ // request proxylist from mananer, if failed multi-times, read
from local cache file
+ std::string meta_data;
+ int32_t ret;
+ for (int i = 0; i < constants::kMaxRequestTDMTimes; i++)
+ {
+ HttpRequest request = {url, g_config->proxy_URL_timeout_,
g_config->need_auth_, g_config->auth_id_, g_config->auth_key_, post_data};
+ ret = Utils::requestUrl(meta_data, &request);
+ if (!ret)
+ {
+ break;
+ } //request success
+ }
+
+ if (!ret) // success
+ {
+ LOG_WARN("get inlong_group_id:%s proxy json list from tdm:
%s", bid2cluster.first.c_str(), meta_data.c_str());
+ }
+ else //request manager error
+ {
+ LOG_ERROR("failed to request inlong_group_id:%s proxylist
from tdm, has tried max_times(%d)", bid2cluster.first.c_str(),
+ constants::kMaxRequestTDMTimes);
+
+ if (bid2cluster.second != -1 &&
cluster_set_.find(bid2cluster.second) != cluster_set_.end())
+ {
+ LOG_WARN("failed to request inlong_group_id:%s
proxylist from tdm, use previous proxylist", bid2cluster.first.c_str());
+ continue;
+ }
+ else //new groupid, try to read from cache proxylist
+ {
+ LOG_WARN("failed to request inlong_group_id:%s
proxylist from tdm, also no previous proxylist, then try to find from cache
file",
+ bid2cluster.first.c_str());
+ auto it =
cache_groupid2metaInfo_.find(bid2cluster.first);
+ if (it != cache_groupid2metaInfo_.end())
+ {
+ meta_data = it->second;
+ LOG_WARN("get inlong_group_id:%s proxy json from
cache file: %s", bid2cluster.first.c_str(), meta_data.c_str());
+ }
+ else
+ {
+ LOG_ERROR("failed to find inlong_group_id:%s
proxylist from cache file", bid2cluster.first.c_str());
+ continue;
+ }
+ }
+ }
+
+ ClusterProxyListPtr new_proxylist_cfg =
std::make_shared<ClusterProxyList>();
+
+ ret = parseAndGet(bid2cluster.first, meta_data,
new_proxylist_cfg);
+ if (ret)
+ {
+ LOG_ERROR("failed to parse inlong_group_id:%s json
proxylist", bid2cluster.first.c_str());
+ continue;
+ }
+
+ unique_write_lock<read_write_mutex>
wtlck_cluster_set(cluster_set_rwmutex_);
+
+ cache_groupid2metaInfo_[bid2cluster.first] = meta_data; //for
disaster
+
+ // #if 0
+
+ // case1. new groupid, but there is its clusterid in memory
+ if (bid2cluster.second == -1 &&
cluster_set_.find(new_proxylist_cfg->clusterId()) != cluster_set_.end())
+ {
+ auto cluster_id = new_proxylist_cfg->clusterId();
+ if
(cluster_set_[cluster_id]->enableUpdate(new_proxylist_cfg))
+ { //已有的cluster需要更新
+ new_proxylist_cfg->initConn();
+ cluster_set_[cluster_id] = new_proxylist_cfg;
+ LOG_INFO("update cluster(id:%d) info and connections",
cluster_id);
+ }
+ bid2cluster.second = cluster_id;
+ LOG_INFO("add inlong_group_id:%s to bid2cluster, its
cluster(id:%d) is already in global_cluster", bid2cluster.first.c_str(),
cluster_id);
+ continue;
+ }
+
+ // case2. new groupid, there is no its clusterid in memory,
update groupid2cluster, add it into cluster
+ if (bid2cluster.second == -1 &&
cluster_set_.find(new_proxylist_cfg->clusterId()) == cluster_set_.end())
+ {
+ bid2cluster.second = new_proxylist_cfg->clusterId();
+ new_proxylist_cfg->initConn();
+ cluster_set_[bid2cluster.second] = new_proxylist_cfg;
+ LOG_INFO("add inlong_group_id:%s to cluster(id:%d) map in
global_cluster, and init connections completely", bid2cluster.first.c_str(),
+ new_proxylist_cfg->clusterId());
+
+ continue;
+ }
+
+ // case3. already existing groupid, whether needs update
+ if
(cluster_set_[bid2cluster.second]->enableUpdate(new_proxylist_cfg))
+ {
+ new_proxylist_cfg->initConn();
+ cluster_set_[bid2cluster.second] = new_proxylist_cfg;
+ LOG_INFO("update cluster(id:%d) info and connections",
bid2cluster.second);
+ }
+ // #endif
+ }
+ // flush cache_bid2metaInfo to file
+ for (auto &it : cache_groupid2metaInfo_)
+ {
+ writeMetaData2File(outfile, groupId_count, it.first,
it.second);
+ groupId_count++;
+ }
+ }
+
+ if (outfile)
+ {
+ if (groupId_count)
+ {
+ outfile << "[main]" << std::endl;
+ outfile << "groupId_count=" << groupId_count << std::endl;
+ }
+ outfile.close();
+ }
+ if (groupId_count)
+ rename(".proxy_list.ini.tmp", ".proxy_list.ini");
+ }
+
+ void GlobalCluster::writeMetaData2File(std::ofstream &file, int32_t
groupId_index, const std::string &inlong_group_id, const std::string &meta_data)
+ {
+ file << "[inlong_group_id" << groupId_index << "]" << std::endl;
+ file << "inlong_group_id=" << inlong_group_id << std::endl;
+ file << "proxy_cfg=" << meta_data << std::endl;
+ }
+
+ int32_t GlobalCluster::readCacheBuslist()
+ {
+ IniFile ini = IniFile();
+ if (ini.load(".proxy_list.ini"))
+ {
+ LOG_INFO("there is no proxylist cache file");
+ return 1;
+ }
+ int32_t groupId_count = 0;
+ if (ini.getInt("main", "groupId_count", &groupId_count))
+ {
+ LOG_WARN("failed to parse .proxylist.ini file");
+ return 1;
+ }
+ for (int32_t i = 0; i < groupId_count; i++)
+ {
+ std::string bidlist = "inlong_group_id" + std::to_string(i);
+ std::string inlong_group_id, proxy;
+ if (ini.getString(bidlist, "inlong_group_id", &inlong_group_id))
+ {
+ LOG_WARN("failed to get %s name from cache file",
inlong_group_id.c_str());
+ continue;
+ }
+ if (ini.getString(bidlist, "proxy_cfg", &proxy))
+ {
+ LOG_WARN("failed to get %s cache proxylist",
inlong_group_id.c_str());
+ continue;
+ }
+ LOG_INFO("read cache file, inlong_group_id:%s, proxy_cfg:%s",
inlong_group_id.c_str(), proxy.c_str());
+ cache_groupid2metaInfo_[inlong_group_id] = proxy;
+ }
+
+ return 0;
+ }
+
+ //parse proxylist meta
+
//{"success":true,"errMsg":null,"data":{"clusterId":1,"isIntranet":null,"isSwitch":null,"load":20,"nodeList":[{"id":1,"ip":"127.0.0.1.160","port":46801}]}}
+ int32_t GlobalCluster::parseAndGet(const std::string &inlong_group_id,
const std::string &meta_data, ClusterProxyListPtr proxylist_config)
+ {
+ rapidjson::Document doc;
+ if (doc.Parse(meta_data.c_str()).HasParseError())
+ {
+ LOG_ERROR("failed to parse meta_data, error:(%d:%d)",
doc.GetParseError(), doc.GetErrorOffset());
+ return SDKInvalidResult::kErrorParseJson;
+ }
+
+ if (!(doc.HasMember("success") && doc["success"].IsBool() &&
doc["success"].GetBool()))
+ {
+ LOG_ERROR("failed to get proxy_list of inlong_group_id:%s, success:
not exist or false", inlong_group_id.c_str());
+ return SDKInvalidResult::kErrorParseJson;
+ }
+ // check data valid
+ if (!doc.HasMember("data") || doc["data"].IsNull())
+ {
+ LOG_ERROR("failed to get proxy_list of inlong_group_id:%s, data:
not exist or null", inlong_group_id.c_str());
+ return SDKInvalidResult::kErrorParseJson;
+ }
+
+ // check nodelist valid
+ const rapidjson::Value &clusterInfo = doc["data"];
+ if (!clusterInfo.HasMember("nodeList") ||
clusterInfo["nodeList"].IsNull())
+ {
+ LOG_ERROR("invalid nodeList of inlong_group_id:%s, not exist or
null", inlong_group_id.c_str());
+ return SDKInvalidResult::kErrorParseJson;
+ }
+
+ // check nodeList isn't empty
+ const rapidjson::Value &nodeList = clusterInfo["nodeList"];
+ if (nodeList.GetArray().Size() == 0)
+ {
+ LOG_ERROR("empty nodeList of inlong_group_id:%s",
inlong_group_id.c_str());
+ return SDKInvalidResult::kErrorParseJson;
+ }
+ // check clusterId
+ if (!clusterInfo.HasMember("clusterId") ||
!clusterInfo["clusterId"].IsInt() || clusterInfo["clusterId"].GetInt() < 0)
+ {
+ LOG_ERROR("clusterId of inlong_group_id:%s is not found or not a
integer", inlong_group_id.c_str());
+ return SDKInvalidResult::kErrorParseJson;
+ }
+ else
+ {
+ const rapidjson::Value &obj = clusterInfo["clusterId"];
+ proxylist_config->setClusterId(obj.GetInt());
+ }
+ // check isSwitch
+ if (clusterInfo.HasMember("isSwitch") &&
clusterInfo["isSwitch"].IsInt() && !clusterInfo["isSwitch"].IsNull())
+ {
+ const rapidjson::Value &obj = clusterInfo["isSwitch"];
+ proxylist_config->setSwitchValue(obj.GetInt());
+ }
+ else
+ {
+ LOG_WARN("switch of inlong_group_id:%s is not found or not a
integer", inlong_group_id.c_str());
+ proxylist_config->setSwitchValue(0);
+ }
+ // check load
+ if (clusterInfo.HasMember("load") && clusterInfo["load"].IsInt() &&
!clusterInfo["load"].IsNull())
+ {
+ const rapidjson::Value &obj = clusterInfo["load"];
+ proxylist_config->setLoad(obj.GetInt());
+ }
+ else
+ {
+ LOG_WARN("load of inlong_group_id:%s is not found or not a
integer", inlong_group_id.c_str());
+ proxylist_config->setLoad(0);
+ }
+ // check isIntranet
+ if (clusterInfo.HasMember("isIntranet") &&
clusterInfo["isIntranet"].IsInt() && !clusterInfo["isIntranet"].IsNull())
+ {
+ const rapidjson::Value &obj = clusterInfo["isIntranet"];
+ if (!obj.GetInt())
+ proxylist_config->setIsInterVisit(false);
+ }
+ else
+ {
+ LOG_WARN("isIntranet of inlong_group_id:%s is not found or not a
integer", inlong_group_id.c_str());
+ proxylist_config->setIsInterVisit(true);
+ }
+ // proxy list
+ for (auto &proxy: nodeList.GetArray())
+ {
+ std::string ip;
+ int32_t port, id;
+ if (proxy.HasMember("ip") && !proxy["ip"].IsNull())
+ ip = proxy["ip"].GetString();
+ else
+ {
+ LOG_ERROR("this ip info is null");
+ continue;
+ }
+ if (proxy.HasMember("port") && !proxy["port"].IsNull())
+ {
+ if (proxy["port"].IsString())
+ port = std::stoi(proxy["port"].GetString());
+ else if (proxy["port"].IsInt())
+ port = proxy["port"].GetInt();
+ }
+
+ else
+ {
+ LOG_ERROR("this ip info is null or negative");
+ continue;
+ }
+ if (proxy.HasMember("id") && !proxy["id"].IsNull())
+ {
+ if (proxy["id"].IsString())
+ id = std::stoi(proxy["id"].GetString());
+ else if (proxy["id"].IsInt())
+ id = proxy["id"].GetInt();
+ }
+ else
+ {
+ LOG_WARN("there is no id info of inlong_group_id");
+ continue;
+ }
+ proxylist_config->addBusAddress(std::make_shared<ProxyInfo>(id,
ip, port));
+
+ }
+ // set size
+ proxylist_config->setSize(nodeList.GetArray().Size());
+
+ // init unused_proxylist_
+ proxylist_config->initUnusedBus();
+
+ //set active_proxy_num and backup_proxy_num
+
proxylist_config->setActiveAndBackupBusNum(g_config->max_active_proxy_num_);
+
+ return 0;
+ }
+
+ void GlobalCluster::clearInvalidConn(const std::error_code &ec)
+ {
+ if (ec)
+ return;
+
+ {
+ unique_read_lock<read_write_mutex> rdlck(cluster_set_rwmutex_);
+ for (auto &it : cluster_set_)
+ {
+ unique_write_lock<read_write_mutex> wtlck(it.second->rwmutex_);
+ it.second->clearInvalidConns();
+ //补充连接
+ int32_t count = it.second->activeBusNeedCreate();
+ for (int32_t i = 0; i < count; i++)
+ {
+ it.second->createRandomConnForActiveSet();
+ }
+ count = it.second->backupBusNeedCreate();
+ for (int32_t i = 0; i < count; i++)
+ {
+ it.second->createRandomConnForBackupSet();
+ }
+ }
+ }
+
+ clear_timer_->expires_after(std::chrono::seconds(kClearTimerSecond));
+ clear_timer_->async_wait([this](const std::error_code &ec)
+ { clearInvalidConn(ec); });
+ }
+
+ void GlobalCluster::keepConnAlive(const std::error_code &ec)
+ {
+ if (ec)
+ return;
+
+ {
+ unique_read_lock<read_write_mutex> rdlck(cluster_set_rwmutex_);
+ for (auto &it : cluster_set_)
+ {
+ it.second->keepConnsAlive();
+ }
+ }
+
+
keepAlive_timer_->expires_after(std::chrono::seconds(g_config->heart_beat_interval_));
+ keepAlive_timer_->async_wait([this](const std::error_code &ec)
+ { keepConnAlive(ec); });
+ }
+
+ void GlobalCluster::doBalance(const std::error_code &ec)
+ {
+ if (ec)
+ return;
+
+ {
+ unique_read_lock<read_write_mutex> rdlck(cluster_set_rwmutex_);
+ for (auto &it : cluster_set_)
+ {
+ it.second->balanceConns();
+ }
+ }
+
+ doBalance_timer_->expires_after(std::chrono::minutes(kDoBalanceMin));
+ doBalance_timer_->async_wait([this](const std::error_code &ec)
+ { doBalance(ec); });
+ }
+
+ void GlobalCluster::updateBackup(const std::error_code &ec)
+ {
+ if (ec)
+ return;
+
+ {
+ unique_read_lock<read_write_mutex> rdlck(cluster_set_rwmutex_);
+ for (auto &it : cluster_set_)
+ {
+ it.second->updateBackupConns();
+ }
+ }
+
+ srand((uint32_t)Utils::getCurrentMsTime());
+ int32_t rand_idx = random() % (constants::kPrimeSize);
+
+
updateBackup_timer_->expires_after(std::chrono::seconds(kUpdateBackupSecond +
constants::kPrime[rand_idx]));
+ updateBackup_timer_->async_wait([this](const std::error_code &ec)
+ { updateBackup(ec); });
+ }
+
+ void GlobalCluster::printAckNum(const std::error_code &ec)
+ {
+ if (ec)
+ return;
+ g_queues->printAck();
+ g_queues->showState();
+
+
printAckNum_timer_->expires_after(std::chrono::minutes(kPrintAckNumMin));
+ printAckNum_timer_->async_wait([this](const std::error_code &ec)
+ { printAckNum(ec); });
+ }
+
+} // namespace dataproxy_sdk
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/proxylist_config.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/proxylist_config.h
new file mode 100644
index 000000000..edc128679
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/proxylist_config.h
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DATAPROXY_SDK_BASE_BUSLIST_CONFIG_H_
+#define DATAPROXY_SDK_BASE_BUSLIST_CONFIG_H_
+
+#include <asio.hpp>
+#include <chrono>
+#include <condition_variable>
+#include <fstream>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <thread>
+#include <unordered_map>
+
+#include "sdk_core.h"
+#include "client_config.h"
+#include "noncopyable.h"
+#include "read_write_mutex.h"
+namespace dataproxy_sdk
+{
+ // proxyinfo: {"port":46801,"ip":"100.125.58.214","id":1}
+ class ProxyInfo
+ {
+ private:
+ int32_t proxy_id_;
+ std::string ip_;
+ int32_t port_num_;
+
+ public:
+ ProxyInfo(int32_t proxy_id, std::string ip, int32_t port_num) :
proxy_id_(proxy_id), ip_(ip), port_num_(port_num) {}
+ std::string getString() const { return "[" + ip_ + ":" +
std::to_string(port_num_) + "]"; }
+
+ int32_t proxyId() const { return proxy_id_; }
+
+ std::string ip() const { return ip_; }
+ void setIp(const std::string &ip) { ip_ = ip; }
+
+ int32_t portNum() const { return port_num_; }
+ void setPortNum(const int32_t &port_num) { port_num_ = port_num; }
+ };
+ //using ProxyInfoPtr = std::shared_ptr<BusInfo>;
+
+ class ClusterProxyList;
+ using ClusterProxyListPtr = std::shared_ptr<ClusterProxyList>;
+ class ClusterProxyList
+ {
+ private:
+ int32_t cluster_id_;
+ std::vector<ProxyInfoPtr> proxylist_;
+ int32_t size_;
+ int32_t load_;
+ bool is_inter_visit_;
+ int32_t switch_value_;
+ bool get_flag_;
+ int32_t active_proxy_num_; //min(config->max_active_proxy_num_, available
proxy num)
+ int32_t backup_proxy_num_; //ess than (size_-active_proxy_num_-1)
+
+ std::unordered_map<std::string, ConnectionPtr> active_proxy_set_;
//key:ip+port
+ std::unordered_map<std::string, ConnectionPtr> backup_proxy_set_;
+
+ //should be initialized as proxylist
+ std::unordered_map<std::string, ProxyInfoPtr> unused_proxylist_;
+
+ uint32_t msg_num_; //cumulate sent msg count
+
+ public:
+ read_write_mutex rwmutex_;
+
+ public:
+ explicit ClusterProxyList();
+ virtual ~ClusterProxyList();
+
+ virtual bool isNeedLoadBalance();
+ void addBusAddress(const ProxyInfoPtr &proxy_info);
+ bool enableUpdate(const ClusterProxyListPtr &other);
+ virtual int32_t initConn(); //init active conns and backup conns
+ int32_t clearAllConn();
+ void clearInvalidConns();
+ void keepConnsAlive();
+ void balanceConns();
+ void updateBackupConns();
+ ConnectionPtr getSendConn();
+
+ ConnectionPtr createRandomConnForActiveSet();
+ ConnectionPtr createRandomConnForBackupSet();
+
+ int32_t size() const { return size_; }
+ void setSize(const int32_t &size) { size_ = size; }
+ int32_t clusterId() const { return cluster_id_; }
+ void setClusterId(const int32_t &cluster_id) { cluster_id_ = cluster_id; }
+ int32_t switchValue() const { return switch_value_; }
+ void setSwitchValue(const int32_t &switch_value) { switch_value_ =
switch_value; }
+ int32_t load() const { return load_; }
+ void setLoad(const int32_t &load) { load_ = load; }
+ bool isInterVisit() const { return is_inter_visit_; }
+ void setIsInterVisit(bool is_inter_visit) { is_inter_visit_ =
is_inter_visit; }
+ bool getFlag() const { return get_flag_; }
+ void setGetFlag(bool get_flag) { get_flag_ = get_flag; }
+ int32_t activeBusNum() const { return active_proxy_num_; }
+ void setActiveAndBackupBusNum(const int32_t &default_set);
+ int32_t backupBusNum() const { return backup_proxy_num_; }
+
+ int32_t backupBusNeedCreate() const
+ {
+ if (backup_proxy_set_.size() < backup_proxy_num_)
+ return backup_proxy_num_ - backup_proxy_set_.size();
+ return 0;
+ }
+ int32_t activeBusNeedCreate() const
+ {
+ if (active_proxy_set_.size() < active_proxy_num_)
+ return active_proxy_num_ - active_proxy_set_.size();
+ return 0;
+ }
+ void initUnusedBus();
+
+ private:
+ ConnectionPtr createRandomConn(std::unordered_map<std::string,
ConnectionPtr> &conn_set);
+ int32_t createConnSet();
+
+ };
+
+ class GlobalCluster : noncopyable
+ {
+ private:
+ std::unordered_map<std::string, int32_t> groupid2cluster_map_;
//<inlong_group_id,cluster_id>
+ std::unordered_map<int32_t, ClusterProxyListPtr> cluster_set_;
//key:cluster_id
+
+ std::unordered_map<std::string, std::string> cache_groupid2metaInfo_;
//<inlong_group_id,proxylist>, for disaster, read from local cache file
+
+ read_write_mutex groupid2cluster_rwmutex_;
+ read_write_mutex cluster_set_rwmutex_;
+
+ bool update_flag_;
+ std::mutex cond_mutex_;
+ std::condition_variable cond_;
+ bool exit_flag_; //exit proxylist update thread
+ std::thread worker_;
+ ExecutorThreadPtr timer_worker_; //clean invalid conn, send hb, and do
balance betweeen activeconn and backupconn
+
+ SteadyTimerPtr clear_timer_;
+ SteadyTimerPtr keepAlive_timer_;
+ SteadyTimerPtr doBalance_timer_; // do balance switch between active and
backup
+ SteadyTimerPtr printAckNum_timer_;
+ SteadyTimerPtr updateBackup_timer_;
+
+ enum
+ {
+ kClearTimerSecond = 10, // FIXME: whether need modification
+ kDoBalanceMin = 5,
+ kPrintAckNumMin = 1, // ack msg in one min cycle
+ kUpdateBackupSecond = 300, // kDoBalanceMin
+ };
+
+ public:
+ explicit GlobalCluster();
+ ~GlobalCluster();
+
+ int32_t initBuslistAndCreateConns();
+ void startUpdateSubroutine();
+ int32_t addBuslist(const std::string &inlong_group_id);
+ virtual ConnectionPtr getSendConn(const std::string &inlong_group_id);
+ ConnectionPtr createActiveConn(const std::string &inlong_group_id, int32_t
pool_id);
+ void closeBuslistUpdate();
+ int32_t readCacheBuslist();
+
+ private:
+ void updateSubroutine();//update proxylist
+ void doUpdate();
+ int32_t parseAndGet(const std::string &inlong_group_id, const std::string
&meta_data, ClusterProxyListPtr proxylist_config);
+ void writeMetaData2File(std::ofstream &file, int32_t group, const
std::string &inlong_group_id, const std::string &meta_data);
+
+ void clearInvalidConn(const std::error_code &ec);
+ void keepConnAlive(const std::error_code &ec); // send hb
+ void doBalance(const std::error_code &ec);
+ void updateBackup(const std::error_code &ec);
+ virtual void printAckNum(const std::error_code &ec);
+ // void cancelAllTimer();
+ };
+
+} // namespace dataproxy_sdk
+
+#endif // DATAPROXY_SDK_BASE_BUSLIST_CONFIG_H_
\ No newline at end of file